Amazon Web Services ブログ
クライアントが API Gateway を使用した Apache Kafka との対話方法を管理する
そのうち、あなたは次のような疑問を抱くかも知れません。
- Apache Kafka (MSK) の Amazon Managed Streaming に IAM 認証または承認を実装するには、どうすればよいですか?
- クラスターにクォータを設定せずに、特定のシナリオに基づいて急増するトラフィックから Apache Kafka クラスターを保護する方法を教えてください。
- JSON スキーマに準拠したリクエストを検証する方法を教えてください。
- URI、クエリ文字列、ヘッダーにパラメータが含まれていることを確認する方法を教えてください。
- Amazon MSK で、エージェントまたはネイティブの Apache Kafka プロトコルを使用せずに、軽量クライアントにメッセージを取り込む方法を教えてください。
これらのタスクは、カスタムプロキシサーバーまたはゲートウェイを使用して実現できますが、これらのオプションを実装して管理するのは困難です。一方、API Gateway はこれらの機能を備えている完全マネージド型の AWS サービスです。
このブログ記事では、Amazon MSK クラスターとクライアント間のコンポーネントとして、Amazon API Gateway がこれらの質問にどう答えるかを示しています。
Amazon MSK は Apache Kafka 向けの完全マネージド型サービスで、サーバーをプロビジョニングしたり、ストレージを管理したり、Apache Zookeeper を手動で設定したりする必要なく、数回クリックするだけで Kafka クラスターを簡単にプロビジョニングできます。Apache Kafka は、リアルタイムストリーミングデータのパイプラインとアプリケーションを構築するためのオープンソースプラットフォームです。
一部のユースケースには、ネイティブの Kafka プロトコルをサポートしていない軽量 IoT デバイスからのメッセージの取り込みや、サードパーティー製 API を含む他のバックエンドサービスとストリーミングサービスの調整が含まれます。
このパターンには、次のトレードオフもあります。
- 実行と保守を行う別のサービスによるコストと複雑さです。
- HTTP リクエストを作成するには余分な処理が追加されるため、パフォーマンスのオーバーヘッドが発生します。さらに、REST プロキシはリクエストを解析し、リクエストの生成と読み込みといった両方の形式間でデータを変換する必要があります。
このアーキテクチャを実稼働環境に実装する場合、これらのポイントをビジネスユースケースと SLA ニーズと一緒に考慮する必要があります。
ソリューションの概要
ソリューションを実装するには、次の手順を実行します。
- MSK クラスター、Kafka クライアント、Kafka REST プロキシを作成する
- Kafka トピックを作成し、Kafka クライアントマシンで REST プロキシを設定する
- API Gateway 経由で REST プロキシ統合を使用して API を作成する
- Amazon MSK へのメッセージを生成および読み込んで、エンドツーエンドのプロセスをテストする
以下の図は、このソリューションのアーキテクチャを示しています。
このアーキテクチャ内で、MSK クラスターを作成し、REST プロキシと Kafka クライアントを使用して Amazon EC2 インスタンスをセットアップします。次に、Amazon API Gateway を介して REST プロキシを公開し、Postman を使用して Amazon MSK にメッセージを生成することでソリューションをテストします。
実稼働実装では、Auto Scaling グループを使用して、ロードバランサーの背後に REST プロキシを設定してください。
前提条件
開始する前に、次の前提条件を満たしている必要があります。
- AWS のサービスへのアクセスを提供する AWS アカウントです。
- AWS CLI を設定するためのアクセスキーとシークレットアクセスキーを持つ IAM ユーザー
- Amazon EC2 キーペア
MSK クラスター、Kafka クライアント、REST プロキシの作成
AWS CloudFormation は、VPC、サブネット、セキュリティグループ、Amazon MSK クラスター、Kafka クライアント、Kafka REST プロキシなど、必要なすべてのリソースをプロビジョニングします。これらのリソースを作成するには、次の手順を実行します。
-
us-east-1
またはus-west-2
で起動します。完了するまで約 15〜20 分かかります。 - AWS CloudFormation コンソールから、AmzonMSKAPIBlog を選択します。
- 出力で、MSKClusterARN、KafkaClientEC2InstancePublicDNS、MSKSecurityGroupID の詳細を取得します。
- 次のコードを入力して、
ZooKeeperConnectionString
およびクラスターに関するその他の情報 (リージョン、クラスター ARN、AWS 名前付きプロファイルを提供) を取得します。次のコード例は、このコマンドの出力の行の 1 つを示しています。
- 次のコードを入力して
BootstrapBrokerString
(リージョン、クラスター ARN、AWS 名前付きプロファイルを提供)を取得します。
次のコード例は、このコマンドの出力を示しています。
Kafka トピックの作成と Kafka REST プロキシの設定
Kafka トピックを作成し、Kafka クライアントマシンで Kafka REST プロキシを設定するには、次の手順を実行します。
- Kafka クライアント Amazon EC2 インスタンスに SSH で接続します。次のコードを参照してください。
- クライアントマシン上の Apache Kafka インストールの binフォルダ (
kafka/kafka_2.12-2.2.1/bin/
) に移動します。 - 次のコード (前の手順で
ZookeeperConnectString
に対して取得した値を提供) を入力してトピックを作成します。コマンドが成功すると、次のメッセージが表示されます。
トピック amazonmskapigwblog を作成しました
。 - Kafka REST サーバーを Amazon MSK クラスターに接続するには、ディレクトリ (
/home/ec2-user/confluent-5.3.1/etc/kafka-rest/
) 内のkafka-rest.properties
を変更して、Amazon MSK のZookeeperConnectString
およびBootstrapserversConnectString
情報を指すようにします。次のコードを参照してください。追加オプションの手順として、REST クライアントと REST プロキシ (HTTPS) 間の通信を保護するための SSL を作成できます。SSL が不要な場合は、手順 5 と 6 をスキップできます。
- サーバー証明書とクライアント証明書を生成します。詳細については、Confluent ウェブサイトの SLL キーと証明書の作成を参照してください。
- 必要なプロパティ設定を
kafka-rest.properties
設定ファイルに追加します。次のコード例を参照してください。詳細な手順については、Confluent ウェブサイトの SSL を使用した暗号化と認証を参照してください。
これで、Kafka トピックが作成され、Amazon MSK クラスターに接続するように Kafka REST プロキシが設定されました。
Kafka REST プロキシ統合を使用した API の作成
API Gateway を介して Kafka REST プロキシ統合を使用して API を作成するには、次の手順を実行します。
- API Gateway コンソールで、API を作成を選択します。
- API タイプには、REST API を選択します。
- 構築をクリックします。
- 新しい API を選択します。
-
API 名
に名前を入力します (amazonmsk-restapi など)。 - オプションの手順として、説明には、簡単な説明を入力します。
- API を作成を選択します。次の手順は、子リソースを作成することです。
- リソースで、親リソースアイテムを選択します。
- アクションで、リソースを作成を選択します。新しい子リソースペインが開きます。
- プロキシリソースとして設定を選択します。
- リソース名に
proxy
と入力します。 - リソースパスに
/{proxy+}
と入力します。 - API Gateway CORS を有効にするを選択します。
- リソースを作成を選択します。リソースを作成すると、メソッドを作成するウィンドウが開きます。
- 統合タイプで、HTTP プロキシを選択します。
- エンドポイント URL には、HTTP バックエンドリソース URL (Kafka クライアント Amazont EC2 インスタンス
PublicDNS
:http://KafkaClientEC2InstancePublicDNS:8082/{proxy}
またはhttps://KafkaClientEC2InstancePublicDNS:8085/{proxy}
など) を入力します。 - 残りのフィールドにはデフォルト設定を使用します。
- 保存を選択します。
- SSL の場合、エンドポイント URL には、HTTPS エンドポイントを使用します。作成したばかりの API では、API のプロキシリソースパス
{proxy+}
がhttp://YourKafkaClientPublicIP:8082/
のバックエンドエンドポイントのプレースホルダーになります。 - 作成した API を選択します。
- アクションで、API をデプロイを選択します。
- デプロイステージで、新しいステージを選択します。
- ステージ名に、ステージ名 (
dev
、test
、prod
など) を入力します。 - デプロイを選択します。
- API をデプロイした後、呼び出し URL を記録します。
API Gateway を介して公開された外部の Kafka REST プロキシは、https://YourAPIGWInvoleURL/dev/topics/amazonmskapigwblog
のようになります。次の手順でこの URL を使用します。
エンドツーエンドプロセスのテスト
Amazon MSK へのメッセージを生成および読み込んで、エンドツーエンドのプロセスをテストします。次の手順を完了します。
- Kafka クライアント Amazon EC2 インスタンスに SSH で接続します。次のコードを参照してください。
-
confluent-5.3.1/bin
ディレクトリに移動し、kafka-rest
サービスを開始します。次のコードを参照してください。サービスが既に開始されている場合は、次のコードを使用して停止できます。
- 別のターミナルウィンドウを開きます。
-
kafka/kafka_2.12-2.2.1/bin
ディレクトリで、Kafka コンソールコンシューマーを起動します。次のコードを参照してください。Postman を使用してメッセージを作成できるようになりました。Postman は、ウェブサービスをテストするための HTTP クライアントです。
Postman を実行しているシステムから Kafka クライアントセキュリティグループの TCP ポートを開いてください。
- ヘッダーで、値
application/vnd.kafka.json.v2+json
のキーContent-Type
を選択します。 - 本文で未加工を選択します。
- JSON を選択します。この記事では次のコードを入力します。
次のスクリーンショットは、API Gateway Kafka REST エンドポイントから Kafka コンシューマーに送信されるメッセージを示しています。
まとめ
この記事では、API Gateway を使用して Amazon MSK の REST API エンドポイントを設定することがいかに簡単かを示しました。このソリューションは、ネイティブの Kafka プロトコルやクライアントに依存することなく、IoT デバイスまたはプログラミング言語から Amazon MSK へのメッセージを生成して読み込む場合に役立ちます。
ご質問またはご提案は、コメント欄にお寄せください。
著者について
Prasad Alle は、AWS プロフェッショナルサービスのシニアビッグデータコンサルタントです。AWS のエンタープライズおよび戦略的顧客のために、スケーラブルで信頼性の高いビッグデータ、機械学習、人工知能、IoT ソリューションをリードし、構築するために尽力しています。Prasad の関心は、高度なエッジコンピューティング、エッジでの Machine Learning などのさまざまなテクノロジーに及びます。余暇には、家族との時間を楽しんでいます。
Francisco Oliveira は、AWS のシニアビッグデータソリューションアーキテクトです。オープンソーステクノロジーと AWS を使用したビッグデータソリューションの構築に力を注いでいます。余暇には、新しいスポーツに挑戦したり、旅行や国立公園に出かけたりします。