Amazon Web Services ブログ

Amazon MSK Connect のご紹介 – マネージドコネクタを使用して Apache Kafka クラスターとの間でデータをストリーミングする

Apache Kafka は、リアルタイムのストリーミングデータパイプラインおよびアプリケーションを構築するためのオープンソースプラットフォームです。当社は、re:Invent 2018 において、Amazon Managed Streaming for Apache Kafka を発表しました。これは、Apache Kafka を使用してストリーミングデータを処理するアプリケーションの構築と実行を容易にする完全マネージド型のサービスです。

Apache Kafka を使用すると、IoT デバイス、データベース変更イベント、ウェブサイトのクリックストリームなどのソースからリアルタイムデータをキャプチャし、データベースや永続的ストレージなどの送信先に配信します。

Kafka Connect は、データベース、key-value ストア、検索インデックス、ファイルシステムなどの外部システムと接続するためのフレームワークを提供する Apache Kafka のオープンソースコンポーネントです。ただし、Kafka Connect クラスターを手動で実行するには、必要なインフラストラクチャを計画およびプロビジョンし、クラスターオペレーションを処理し、負荷の変化に応じてスケールする必要があります。

2021 年 9 月 16 日、Kafka Connect クラスターの管理を容易にする新機能を発表します。MSK Connect を使用すると、数回クリックするだけで Kafka Connect を使用してコネクタを設定およびデプロイできます。MSK Connect は、必要なリソースをプロビジョンし、クラスターをセットアップします。コネクタのヘルスと配信の状態を継続的にモニタリングし、基盤となるハードウェアにパッチを適用して管理し、スループットの変化に合わせてコネクタをオートスケールします。その結果、インフラストラクチャの管理ではなく、アプリケーションの構築にリソースを集中させることができます。

MSK Connect は Kafka Connect と完全に互換性があります。つまり、コードを変更せずに既存のコネクタを移行できます。MSK Connect を使用するために MSK クラスターは必要ありません。Amazon MSK、Apache Kafka、および Apache Kafka 互換クラスターをソースおよびシンクとしてサポートしています。これらのクラスターは、MSK Connect がクラスターにプライベートに接続できる限り、セルフマネージドとしたり、AWS パートナーおよびサードパーティーによるマネージドとしたりできます。

Amazon Aurora および Debezium での MSK Connect の使用
MSK Connect をテストするために、これを使用して、データベースの 1 つからデータ変更イベントをストリーミングしたいと思います。そのために、Apache Kafka の上に構築された変更データキャプチャ用のオープンソースの分散プラットフォームである Debezium を使用します。

MySQL 互換の Amazon Aurora データベースをソースとして使用し、Debezium MySQL コネクタを次のアーキテクチャ図で説明するセットアップで使用します。

アーキテクチャ図。

Debezium で Aurora データベースを使用するには、DB クラスターパラメータグループでバイナリログ記録を有効にする必要があります。Amazon Aurora MySQL クラスターのバイナリログを有効にするにはどうすればよいですか? の記事の手順に従います。

次に、MSK Connect 用のカスタムプラグインを作成する必要があります。カスタムプラグインは、1 つ以上のコネクタ、トランスフォーム、またはコンバーターの実装を含む JAR ファイルのセットです。Amazon MSK は、コネクタが実行されている接続クラスターのワーカーにプラグインをインストールします。

Debezium ウェブサイトから、最新の安定版リリース用の MySQL コネクタプラグインをダウンロードします。MSK Connect は ZIP または JAR 形式のカスタムプラグインを受け入れるため、ダウンロードしたアーカイブを ZIP 形式に変換し、JAR ファイルをメインディレクトリに保持します。

$ tar xzf debezium-connector-mysql-1.6.1.Final-plugin.tar.gz
$ cd debezium-connector-mysql
$ zip -9 ../debezium-connector-mysql-1.6.1.zip *
$ cd ..

その後、AWS Command Line Interface (CLI) を使用して、MSK Connect のために使用しているのと同じ AWS リージョンAmazon Simple Storage Service (Amazon S3) バケットにカスタムプラグインをアップロードします。

$ aws s3 cp debezium-connector-mysql-1.6.1.zip s3://my-bucket/path/

Amazon MSK コンソールには、新しい MSK Connect セクションがあります。コネクタを確認し、[Create connector] (コネクタを作成) を選択します。その後、カスタムプラグインを作成し、S3 バケットを参照して、以前にアップロードしたカスタムプラグイン ZIP ファイルを選択します。

コンソールのスクリーンショット。

プラグインの名前と説明を入力し、[Next] (次へ) を選択します。

コンソールのスクリーンショット。

カスタムプラグインの設定が完了したので、コネクタの作成を開始します。コネクタの名前と説明を入力します。

コンソールのスクリーンショット。

セルフマネージド Apache Kafka クラスターを使用するか、MSK によって管理されているクラスターを使用するかを選べます。IAM 認証を使用するように設定された MSK クラスターの 1 つを選択します。選択した MSK クラスターは、Aurora データベースと同じ仮想プライベートクラウド (VPC) にあります。接続するために、MSK クラスターと Aurora データベースは VPC 用に default セキュリティグループを使用します。わかりやすくするために、auto.create.topics.enabletrue に設定したクラスター設定を使用します。

コンソールのスクリーンショット。

[Connector configuration] (コネクタの設定) では、次の設定を使用します。

connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=<aurora-database-writer-instance-endpoint>
database.port=3306
database.user=my-database-user
database.password=my-secret-password
database.server.id=123456
database.server.name=ecommerce-server
database.include.list=ecommerce
database.history.kafka.topic=dbhistory.ecommerce
database.history.kafka.bootstrap.servers=<bootstrap servers>
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
include.schema.changes=true

これらの設定の一部は汎用であり、任意のコネクタのために指定される必要があります。以下に例を示します。

  • connector.class はコネクタの Java クラスです。
  • tasks.max は、このコネクタのために作成する必要があるタスクの最大数です。

その他の設定は Debezium MySQL コネクタに固有です。

  • database.hostname には、Aurora データベースのライターインスタンスエンドポイントが含まれています。
  • database.server.name は、データベースサーバーの論理名です。Debezium によって作成された Kafka トピックの名前に使用されています。
  • database.include.list には、指定されたサーバーによってホストされているデータベースのリストが含まれます。
  • database.history.kafka.topic は、データベーススキーマの変更を追跡するために Debezium が内部的に使用する Kafka トピックです。
  • database.history.kafka.bootstrap.servers には、MSK クラスターのブートストラップサーバーが含まれています。
  • 最後の 8 行 (database.history.consumer.* および database.history.producer.*) は、データベース履歴トピックにアクセスするために IAM 認証を有効にします。

[Connector capacity] (コネクタの容量) では、オートスケールまたはプロビジョンされた容量を選択できます。このセットアップでは、[Autoscaled] (オートスケール済み) を選択し、その他の設定はすべてデフォルトのままにします。

コンソールのスクリーンショット。

オートスケールされた容量で、次のパラメータを設定できます。

  • [MSK Connect Unit (MCU) count per worker] (ワーカーあたりの MSK Connect Unit (MCU) 数) – 各 MCU は、1 vCPU のコンピューティングと 4 GB のメモリを提供します。
  • [minimum number of workers] (ワーカーの最小数) および [maximum number of workers] (ワーカーの最大数)。
  • [Autoscaling utilization thresholds] (オートスケーリング使用率のしきい値) – オートスケーリングをトリガーする MCU 消費の上限と下限のターゲット使用率のしきい値 (%)。

コンソールのスクリーンショット。

コネクタの最小および最大 MCU、メモリ、およびネットワーク帯域幅の概要が表示されます。

コンソールのスクリーンショット。

[Worker configuration] (ワーカー設定) で、Amazon MSK によって提供されるデフォルトの設定を使用するか、独自の設定を提供できます。このセットアップでは、デフォルトのものを使用します。

[Access permissions] (アクセス許可) で、IAM ロールを作成します。信頼されたエンティティで、kafkaconnect.amazonaws.com を追加して、MSK Connect がロールを引き受けられるようにします。

ロールは、MSK Connect が MSK クラスターや他の AWS のサービスとやり取りするために使用されます。このセットアップでは、次のように追加します。

Debezium コネクタは、履歴トピックの作成に使用するレプリケーションファクターを見つけるために、クラスター設定にアクセスする必要があります。このため、許可ポリシーに kafka-cluster:DescribeClusterDynamicConfiguration アクション (同等の Apache Kafka の DESCRIBE_CONFIGS クラスター ACL) を追加します。

設定によっては、ロールにさらに許可を追加する必要がある場合があります (例えば、コネクタが S3 バケットなどの他の AWS リソースにアクセスする必要がある場合など)。その場合は、コネクタを作成する前に許可を追加する必要があります。

[Security] (セキュリティ) では、認証と転送中の暗号化の設定は、MSK クラスターから取得されます。

コンソールのスクリーンショット。

[Logs] (ログ) では、コネクタの実行に関する詳細情報を取得するために、CloudWatch Logs にログを配信することを選択します。CloudWatch Logs を使用すると、CloudWatch Logs Insights で簡単に保持を管理し、ログデータをインタラクティブに検索および分析できます。ロググループ ARN (以前 IAM ロールで使用したのと同じロググループ) を入力し、[Next] (次へ) を選択します。

コンソールのスクリーンショット。

設定を確認し、[Create connector] (コネクタを作成) を選択します。数分後には、コネクタが実行されています。

Amazon Aurora および Debezium での MSK Connect のテスト
では、先ほどセットアップしたアーキテクチャをテストしてみましょう。Amazon Elastic Compute Cloud (Amazon EC2) インスタンスを起動してデータベースを更新し、いくつかの Kafka コンシューマーを起動して稼働している Debezium を確認します。MSK クラスターと Aurora データベースの両方に接続できるようにするには、同じ VPC を使用し、デフォルトのセキュリティグループを割り当てます。また、インスタンスへの SSH アクセスを許可する別のセキュリティグループを追加します。

Apache Kafka のバイナリディストリビューションをダウンロードし、アーカイブをホームディレクトリに抽出します。

$ tar xvf kafka_2.13-2.7.1.tgz

IAM を使用して MSK クラスターで認証するには、Amazon MSK デベロッパーガイドの指示に従って、IAM アクセスコントロールのクライアントを設定します。Amazon MSK Library for IAM最新の安定版リリースをダウンロードします。

$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/1.1.0/aws-msk-iam-auth-1.1.0-all.jar

~/kafka_2.13-2.7.1/config/ ディレクトリで、IAM 認証を使用するように Kafka クライアントを設定するための client-config.properties ファイルを作成します。

# 暗号化のために TLS を設定し、authN のために SASL を設定します。
security.protocol = SASL_SSL

# 使用する SASL メカニズムを特定します。
sasl.mechanism = AWS_MSK_IAM

# SASL クライアント実装をバインドします。
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# 抽出された認証情報に基づいて SigV4 署名の構築をカプセル化します。
#「sasl.jaas.config」によってバインドされた SASL クライアントがこのクラスを呼び出します。
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

次を実行するために Bash プロファイルに数行追加します。

  • Kafka バイナリを PATH に追加します。
  • MSK Library for IAM を CLASSPATH に追加します。
  • BOOTSTRAP_SERVERS 環境変数を作成して、MSK クラスターのブートストラップサーバーを保存します。
$ cat >> ~./bash_profile
export PATH=~/kafka_2.13-2.7.1/bin:$PATH
export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.1.0-all.jar
export BOOTSTRAP_SERVERS=<bootstrap servers>

その後、インスタンスに対する 3 つのターミナル接続を開きます。

最初のターミナル接続では、データベースサーバー (ecommerce-server) と同じ名前のトピック用の Kafka コンシューマーを起動します。このトピックは、Debezium がスキーマの変更をストリームするために使用します (例えば、新しいテーブルが作成されたとき)。

$ cd ~/kafka_2.13-2.7.1/
$ kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS \
                            --consumer.config config/client-config.properties \
                            --topic ecommerce-server --from-beginning

2 番目のターミナル接続では、データベースサーバー (ecommerce-server)、データベース (ecommerce)、およびテーブル (orders) を連結して構築された名前を持つトピック用の別の Kafka コンシューマーを起動します。このトピックは、Debezium がテーブル用にデータ変更をストリームするために使用します (例えば、新しいレコードが挿入されたとき)。

$ cd ~/kafka_2.13-2.7.1/
$ kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS \
                            --consumer.config config/client-config.properties \
                            --topic ecommerce-server.ecommerce.orders --from-beginning

3 番目のターミナル接続では、MariaDB パッケージを使用して MySQL クライアントをインストールし、Aurora データベースに接続します。

$ sudo yum install mariadb
$ mysql -h <aurora-database-writer-instance-endpoint> -u <database-user> -p

この接続から、ecommerce データベースと orders 用のテーブルを作成します。

CREATE DATABASE ecommerce;

USE ecommerce

CREATE TABLE orders (
       order_id VARCHAR(255),
       customer_id VARCHAR(255),
       item_description VARCHAR(255),
       price DECIMAL(6,2),
       order_date DATETIME DEFAULT CURRENT_TIMESTAMP
);

これらのデータベースの変更は、MSK Connect によって管理される Debezium コネクタによってキャプチャされ、MSK クラスターにストリーミングされます。最初のターミナルでは、スキーマの変更でトピックを消費すると、データベースとテーブルの作成に関する情報が表示されます。

Struct{source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202831473,db=ecommerce,server_id=1980402433,file=mysql-bin-changelog.000003,pos=9828,row=0},databaseName=ecommerce,ddl=CREATE DATABASE ecommerce,tableChanges=[]}
Struct{source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202878811,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10002,row=0},databaseName=ecommerce,ddl=CREATE TABLE orders ( order_id VARCHAR(255), customer_id VARCHAR(255), item_description VARCHAR(255), price DECIMAL(6,2), order_date DATETIME DEFAULT CURRENT_TIMESTAMP ),tableChanges=[Struct{type=CREATE,id="ecommerce"."orders",table=Struct{defaultCharsetName=latin1,primaryKeyColumnNames=[],columns=[Struct{name=order_id,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=1,optional=true,autoIncremented=false,generated=false}, Struct{name=customer_id,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=2,optional=true,autoIncremented=false,generated=false}, Struct{name=item_description,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=3,optional=true,autoIncremented=false,generated=false}, Struct{name=price,jdbcType=3,typeName=DECIMAL,typeExpression=DECIMAL,length=6,scale=2,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=order_date,jdbcType=93,typeName=DATETIME,typeExpression=DATETIME,position=5,optional=true,autoIncremented=false,generated=false}]}}]}

その後、3 番目のターミナルのデータベース接続に戻り、orders テーブルにいくつかのレコードを挿入します。

INSERT INTO orders VALUES ("123456", "123", "A super noisy mechanical keyboard", "50.00", "2021-08-16 10:11:12");
INSERT INTO orders VALUES ("123457", "123", "An extremely wide monitor", "500.00", "2021-08-16 11:12:13");
INSERT INTO orders VALUES ("123458", "123", "A too sensible microphone", "150.00", "2021-08-16 12:13:14");

2 番目のターミナルでは、orders テーブルに挿入されたレコードに関する情報が表示されます。

Struct{after=Struct{order_id=123456,customer_id=123,item_description=A super noisy mechanical keyboard,price=50.00,order_date=1629108672000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10464,row=0},op=c,ts_ms=1629202993614}
Struct{after=Struct{order_id=123457,customer_id=123,item_description=An extremely wide monitor,price=500.00,order_date=1629112333000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10793,row=0},op=c,ts_ms=1629202993621}
Struct{after=Struct{order_id=123458,customer_id=123,item_description=A too sensible microphone,price=150.00,order_date=1629115994000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=11114,row=0},op=c,ts_ms=1629202993630}

変更データキャプチャアーキテクチャは稼動しており、コネクタは MSK Connect によってフルマネージド状態です。

可用性と料金
MSK Connect は、アジアパシフィック (ムンバイ)、アジアパシフィック (ソウル)、アジアパシフィック (シンガポール)、アジアパシフィック (シドニー)、アジアパシフィック (東京)、カナダ (中部)、欧州 (フランクフルト)、欧州 (アイルランド)、欧州 (ロンドン)、欧州 (パリ)、欧州 (ストックホルム)、南米 (サンパウロ)、米国東部 (バージニア北部)、米国東部 (オハイオ)、米国西部 (北カリフォルニア)、米国西部 (オレゴン) といった AWS リージョンでご利用いただけます。詳細については、AWS リージョン別のサービス表を参照してください。

MSK Connect では、使用した分の料金をお支払いいただきます。コネクタによって使用されるリソースは、ワークロードに基づいて自動的にスケールできます。詳細については、Amazon MSK の料金のページを参照してください。

MSK Connect で Apache Kafka コネクタの管理を今すぐ簡素化しましょう。

Danilo

原文はこちらです。