Amazon Web Services ブログ

Amazon MSK で IAM アクセスコントロールと Kafka クォータを使用したマルチテナント Apache Kafka クラスター – パート 2

本記事は 2023 年 6 月 19 日 に公開された「Multi-tenancy Apache Kafka clusters in Amazon MSK with IAM access control and Kafka quotas – Part 2」を翻訳したものです。

Kafka クォータはマルチテナント Kafka クラスターに不可欠です。アプリケーションがクラスターリソースを過剰に消費してパフォーマンスが低下するのを防ぎます。また、中央のストリーミングデータプラットフォームをマルチテナントで運用し、複数のビジネスラインのアップストリーム/ダウンストリームアプリケーションで共有できます。Kafka は 2 種類のクォータをサポートしています: ネットワーク帯域幅クォータリクエストレートクォータです。ネットワーク帯域幅クォータは、クライアントアプリケーションが各ブローカーで生成・消費できるバイトレート (バイト/秒) のしきい値を定義します。リクエストレートクォータは、各ブローカーがクライアントのリクエスト処理に費やす時間の割合を制限します。Kafka クォータは特定のユーザー、特定のクライアント ID、またはその両方に対して設定できます。

この 2 部構成シリーズのパート 1 では、Amazon Managed Streaming for Apache Kafka (Amazon MSK) クラスターで AWS Identity and Access Management (IAM) アクセスコントロールを使用しながら Kafka クォータの適用方法を説明しました。

本記事では、IAM アクセスコントロールを使用した MSK クラスターでの Kafka クォータの設定手順を、サンプルクライアントアプリケーションによるテストを交えて解説します。

ソリューションの概要

パート 1 で紹介した以下の図は、Kafka クライアントアプリケーション (ProducerApp-1ConsumerApp-1ConsumerApp-2) が書き込みおよび読み取り IAM ロールを引き受けて MSK クラスター内の Topic-B にアクセスする方法を示しています。各プロデューサーおよびコンシューマークライアントアプリケーションには、生成または消費できるデータ量 (バイト/秒) を決定するクォータがあります。ProducerApp-1 のクォータでは、ブローカーあたり最大 1024 バイト/秒の生成が許可されています。同様に、ConsumerApp-1ConsumerApp-2 のクォータでは、ブローカーあたりそれぞれ 5120 バイト/秒と 1024 バイト/秒の消費が許可されています。アーキテクチャ図のフローを簡単に説明します:

  • P1ProducerApp-1 は (ProducerApp-1-Role IAM ロールを使用して) Topic-B-Write-Role IAM ロールを引き受け、Topic-B にメッセージを送信します
  • P2Topic-B-Write-Role IAM ロールを引き受けた状態で、ProducerApp-1Topic-B へのメッセージ送信を開始します
  • C1ConsumerApp-1 (ConsumerApp-1-Role IAM ロールを使用) と ConsumerApp-2 (ConsumerApp-2-Role IAM ロールを使用) は Topic-B-Read-Role IAM ロールを引き受け、Topic-B からメッセージを読み取ります
  • C2Topic-B-Read-Role IAM ロールを引き受けた状態で、ConsumerApp-1ConsumerApp-2Topic-B からのメッセージ消費を開始します

本記事では、AWS リソースのプロビジョニングと変更に AWS Command Line Interface (AWS CLI)、AWS CloudFormation テンプレート、AWS マネジメントコンソールを使用します。作成したリソースには AWS アカウントへの課金が発生します。

手順の概要は以下のとおりです:

  1. IAM アクセスコントロールを有効にした MSK クラスターと、クライアントアプリケーション用の Amazon Elastic Compute Cloud (Amazon EC2) インスタンスをプロビジョニングします。
  2. MSK クラスターに Topic-B を作成します。
  3. クライアントアプリケーションが Topic-B にアクセスするための IAM ロールを作成します。
  4. クォータを設定せずにプロデューサーおよびコンシューマーアプリケーションを実行します。
  5. クライアントアプリケーションの生成および消費クォータを設定します。
  6. クォータ設定後にアプリケーションを再実行します。

前提条件

先に進む前に、パート 1 を読むことをお勧めします。以下が必要です:

  • 本記事ではデモアカウントと呼ぶ AWS アカウント (アカウント ID は 1111 1111 1111 と仮定)
  • デモアカウントで AWS リソースを作成、削除、変更する権限

IAM アクセスコントロールを有効にした MSK クラスターと EC2 インスタンスのプロビジョニング

デモアカウントの VPC 内に IAM アクセスコントロールを有効にした MSK クラスターをプロビジョニングします。さらに、MSK クラスターの設定変更やクライアントアプリケーションのホスト用に 4 つの EC2 インスタンスを作成します。

CloudFormation スタックのデプロイ

  1. CloudFormation テンプレートファイルとサンプルクライアントアプリケーションをダウンロードするため、GitHub リポジトリをクローンします:
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git
  1. AWS CloudFormation コンソールで、ナビゲーションペインの Stacks を選択します。
  2. Create stack を選択します。
  3. Prepare templateTemplate is ready を選択します。
  4. Template sourceUpload a template file を選択します。
  5. amazon-msk-kafka-quotas/cfn-templates ディレクトリから cfn-msk-stack-1.yaml ファイルをアップロードし、Next を選択します。
  6. Stack nameMSKStack と入力します。
  7. パラメータはデフォルトのまま Next を選択します。
  8. Configure stack options ページの最下部までスクロールし、Next を選択します。
  9. Review ページの最下部までスクロールし、I acknowledge that CloudFormation may create IAM resources チェックボックスを選択して Submit を選択します。

スタックの完了には約 30 分かかります。スタックが正常に作成されると、以下のリソースが作成されます:

  • 3 つのプライベートサブネットと 1 つのパブリックサブネットを持つ VPC
  • IAM アクセスコントロールを有効にした 3 つのブローカーを持つ MSK クラスター
  • MSK クラスターの設定変更や AWS リソースを作成・変更するための MSKAdminInstance EC2 インスタンス
  • ProducerApp-1ConsumerApp-1ConsumerApp-2 用の EC2 インスタンス (各クライアントアプリケーションに 1 つずつ)
  • アーキテクチャ図のとおり、クライアントアプリケーションをホストする各 EC2 インスタンスに個別の IAM ロール
  1. スタックの Outputs タブから MSKClusterArn の値をメモします。

MSK クラスターにトピックを作成する

MSK クラスターに Topic-B を作成するには、以下の手順を実行します:

  1. Amazon EC2 コンソールで、実行中の EC2 インスタンスの一覧に移動します。
  2. MSKAdminInstance EC2 インスタンスを選択し、Connect を選択します。
  3. Session Manager タブで Connect を選択します。
  4. ブラウザで開いた新しいタブで以下のコマンドを実行します:
sudo su - ec2-user

# Add Kafka binaries to the path
sed -i 's|HOME/bin|HOME/bin:~/kafka/bin|' .bash_profile

# Set your AWS region
aws configure set region <AWS Region>
  1. MSK クラスターのブローカー IAM エンドポイントを指す環境変数を設定します:
MSK_CLUSTER_ARN=<Use the value of MSKClusterArn that you noted earlier>
echo "export BOOTSTRAP_BROKERS_IAM=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerStringSaslIam)" >> .bash_profile
source .bash_profile
echo $BOOTSTRAP_BROKERS_IAM
  1. BOOTSTRAP_BROKERS_IAM の値をメモします。
  2. 以下の Kafka CLI コマンドを実行して、MSK クラスターに Topic-B を作成します:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--create --topic Topic-B \
--partitions 3 --replication-factor 3 \
--command-config config_iam.properties

MSK クラスターは IAM アクセスコントロールでプロビジョニングされているため、--command-config オプションは MSKStack CloudFormation スタックで作成された IAM アクセスコントロールに必要なプロパティを含む config_iam.properties を指定しています。

Kafka CLI コマンドの実行時に以下の警告が表示される場合がありますが、無視して問題ありません:

The configuration 'sasl.jaas.config' was supplied but isn't a known config. 
The configuration 'sasl.client.callback.handler.class' was supplied but isn't a known config.
  1. Topic-B が作成されたことを確認するため、すべてのトピックを一覧表示します:
kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties --list

クライアントアプリケーションが Topic-B にアクセスするための IAM ロールの作成

アーキテクチャ図に示す Topic-B-Write-RoleTopic-B-Read-Role を作成します。Topic-B-Write-RoleTopic-B への書き込み権限を持ち、ProducerApp-1 が引き受けられます。同様に、ConsumerApp-1ConsumerApp-2Topic-B-Read-Role を引き受けて Topic-B を読み取れます。Topic-B を読み取るには、ConsumerApp-1ConsumerApp-2 は次のステップの MSKStack スタック更新時に指定するコンシューマーグループに属している必要があります。

以下の手順でロールを作成します:

  1. AWS CloudFormation コンソールで、ナビゲーションペインの Stacks を選択します。
  2. MSKStack を選択し、Update を選択します。
  3. Prepare template で Replace current template を選択します。
  4. Template sourceUpload a template file を選択します。
  5. amazon-msk-kafka-quotas/cfn-templates ディレクトリから cfn-msk-stack-2.yaml ファイルをアップロードし、Next を選択します。
  6. 以下の追加スタックパラメータを入力します:
    • Topic B ARNTopic-B の ARN を入力します。

ARN は arn:aws:kafka:region:account-id:topic/msk-cluster-name/msk-cluster-uuid/Topic-B の形式にする必要があります。先ほどメモした MSK クラスター ARN からクラスター名とクラスター UUID を使用し、AWS リージョンを指定してください。詳細については、IAM access control for Amazon MSK を参照してください。

    • ConsumerApp-1 Consumer Group nameConsumerApp-1 のコンシューマーグループ ARN を入力します。

arn:aws:kafka:region:account-id:group/msk-cluster-name/msk-cluster-uuid/consumer-group-name の形式にする必要があります。

    • ConsumerApp-2 Consumer Group nameConsumerApp-2 のコンシューマーグループ ARN を入力します。

前の ARN と同様の形式を使用します。

  1. Next を選択します。
  2. Configure stack options ページの最下部までスクロールし、Next を選択します。
  3. Review ページの最下部までスクロールし、I acknowledge that CloudFormation may create IAM resources チェックボックスを選択して Update stack を選択します。

スタックの更新には約 3 分かかります。スタックが正常に更新されると、以下のリソースが作成されます:

  • Topic-B-Write-RoleTopic-B への書き込み操作の権限を持つ IAM ロール。信頼ポリシーにより ProducerApp-1-Role IAM ロールが引き受けられます。
  • Topic-B-Read-RoleTopic-B への読み取り操作の権限を持つ IAM ロール。信頼ポリシーにより ConsumerApp-1-RoleConsumerApp-2-Role IAM ロールが引き受けられます。さらに、ConsumerApp-1ConsumerApp-2Topic-B を読み取るために、スタック更新時に指定したコンシューマーグループに属している必要があります。
  1. スタックの Outputs タブから TopicBReadRoleARNTopicBWriteRoleARN の値をメモします。

クォータを設定せずにプロデューサーおよびコンシューマーアプリケーションを実行する

ここでは、クォータを設定せずに ProducerApp-1ConsumerApp-1ConsumerApp-2 を実行します。前のステップで取得した BOOTSTRAP_BROKERS_IAM の値、Topic-B-Write-Role の ARN、Topic-B-Read-Role の ARN が必要です。クライアントアプリケーションのソースコードとパッケージ版は GitHub リポジトリで入手できます。

ConsumerApp-1 アプリケーションの実行

ConsumerApp-1 アプリケーションを実行するには、以下の手順を実行します:

  1. Amazon EC2 コンソールで ConsumerApp-1 EC2 インスタンスを選択し、Connect を選択します。
  2. Session Manager タブで Connect を選択します。
  3. ブラウザで開いた新しいタブで以下のコマンドを実行します:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. ConsumerApp-1 アプリケーションを実行して Topic-B からのメッセージ消費を開始します:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Read-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--consumer-group <ConsumerApp-1 consumer group name> \
--role-session-name <role session name for ConsumerApp-1 to use during STS assume role call> \
--client-id <ConsumerApp-1 client.id> \
--print-consumer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

ソースコードは GitHub で参照できます。コマンドラインパラメータの詳細は以下のとおりです:

  • –bootstrap-servers – MSK クラスターのブートストラップブローカー IAM エンドポイント。
  • –assume-role-arnTopic-B-Read-Role IAM ロールの ARN。ConsumerApp-1 はこのロールを引き受けてトピックからメッセージを読み取ります。
  • –region – 使用するリージョン。
  • –topic-nameConsumerApp-1 がメッセージを読み取るトピック名。デフォルトは Topic-B です。
  • –consumer-group – スタック更新時に指定した ConsumerApp-1 のコンシューマーグループ名。
  • –role-session-nameConsumerApp-1AWS Security Token Service (AWS STS) SDK を使用して Topic-B-Read-Role を引き受けます。ConsumerApp-1assumeRole 関数の呼び出し時にこのロールセッション名を使用します。
  • –client-idConsumerApp-1 のクライアント ID。
  • –print-consumer-quota-metricsConsumerApp-1 のクライアントメトリクスをターミナルに出力するかのフラグ。
  • –cw-dimension-nameConsumerApp-1 からクライアントスロットリングメトリクスを発行する際に使用する Amazon CloudWatch ディメンション名。
  • –cw-dimension-valueConsumerApp-1 からクライアントスロットリングメトリクスを発行する際に使用する CloudWatch ディメンション値。
  • –cw-namespaceConsumerApp-1 がスロットリング監視用の CloudWatch メトリクスを発行する名前空間。
  1. 残りのパラメータに問題がなければ、以下のコマンドを使用し、--assume-role-arn--region を環境に合わせて変更してください:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBReadRole-xxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--consumer-group consumerapp-1-cg \
--role-session-name consumerapp-1-role-session \
--client-id consumerapp-1-client-id \
--print-consumer-quota-metrics Y \
--cw-dimension-name ConsumerApp \
--cw-dimension-value ConsumerApp-1 \
--cw-namespace ConsumerApps

fetch-throttle-time-avgfetch-throttle-time-max クライアントメトリクスは 0.0 と表示され、ConsumerApp-1 にスロットリングが発生していないことを示しています。ConsumerApp-1 の消費クォータはまだ設定していません。しばらく実行させておきます。

ConsumerApp-2 アプリケーションの実行

ConsumerApp-2 アプリケーションを実行するには、以下の手順を実行します:

  1. Amazon EC2 コンソールで ConsumerApp-2 EC2 インスタンスを選択し、Connect を選択します。
  2. Session Manager タブで Connect を選択します。
  3. ブラウザで開いた新しいタブで以下のコマンドを実行します:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. ConsumerApp-2 アプリケーションを実行して Topic-B からのメッセージ消費を開始します:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Read-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--consumer-group <ConsumerApp-2 consumer group name> \
--role-session-name <role session name for ConsumerApp-2 to use during STS assume role call> \
--client-id <ConsumerApp-2 client.id> \
--print-consumer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

コマンドラインパラメータは前述の ConsumerApp-1 と同様ですが、以下が異なります:

  • –consumer-group – スタック更新時に指定した ConsumerApp-2 のコンシューマーグループ名。
  • –role-session-nameConsumerApp-2 は AWS STS SDK を使用して Topic-B-Read-Role を引き受けます。ConsumerApp-2assumeRole 関数の呼び出し時にこのロールセッション名を使用します。
  • –client-idConsumerApp-2 のクライアント ID。
  1. 残りのパラメータに問題がなければ、以下のコマンドを使用し、--assume-role-arn--region を環境に合わせて変更してください:
java -jar kafka-consumer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBReadRole-xxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--consumer-group consumerapp-2-cg \
--role-session-name consumerapp-2-role-session \
--client-id consumerapp-2-client-id \
--print-consumer-quota-metrics Y \
--cw-dimension-name ConsumerApp \
--cw-dimension-value ConsumerApp-2 \
--cw-namespace ConsumerApps

fetch-throttle-time-avgfetch-throttle-time-max クライアントメトリクスは 0.0 と表示され、ConsumerApp-2 にスロットリングが発生していないことを示しています。ConsumerApp-2 の消費クォータはまだ設定していません。しばらく実行させておきます。

ProducerApp-1 アプリケーションの実行

ProducerApp-1 アプリケーションを実行するには、以下の手順を実行します:

  1. Amazon EC2 コンソールで ProducerApp-1 EC2 インスタンスを選択し、Connect を選択します。
  2. Session Manager タブで Connect を選択します。
  3. ブラウザで開いた新しいタブで以下のコマンドを実行します:
sudo su - ec2-user

# Set your AWS region
aws configure set region <aws region>

# Set BOOTSTRAP_BROKERS_IAM variable to MSK cluster's IAM endpoint
BOOTSTRAP_BROKERS_IAM=<Use the value of BOOTSTRAP_BROKERS_IAM that you noted earlier> 

echo "export BOOTSTRAP_BROKERS_IAM=$(echo $BOOTSTRAP_BROKERS_IAM)" >> .bash_profile

# Clone GitHub repository containing source code for client applications
git clone https://github.com/aws-samples/amazon-msk-kafka-quotas.git

cd amazon-msk-kafka-quotas/uber-jars/
  1. ProducerApp-1 アプリケーションを実行して Topic-B へのメッセージ送信を開始します:
java -jar kafka-producer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn <Topic-B-Write-Role-ARN> \
--topic-name <Topic-Name> \
--region <AWS Region> \
--num-messages <Number of events> \
--role-session-name <role session name for ProducerApp-1 to use during STS assume role call> \
--client-id <ProducerApp-1 client.id> \
--producer-type <Producer Type, options are sync or async> \
--print-producer-quota-metrics Y \
--cw-dimension-name <CloudWatch Metrics Dimension Name> \
--cw-dimension-value <CloudWatch Metrics Dimension Value> \
--cw-namespace <CloudWatch Metrics Namespace>

ソースコードは GitHub で参照できます。コマンドラインパラメータの詳細は以下のとおりです:

  • –bootstrap-servers – MSK クラスターのブートストラップブローカー IAM エンドポイント。
  • –assume-role-arnTopic-B-Write-Role IAM ロールの ARN。ProducerApp-1 はこのロールを引き受けてトピックにメッセージを書き込みます。
  • –topic-nameProducerApp-1 がメッセージを送信するトピック。デフォルトは Topic-B です。
  • –region – 使用する AWS リージョン。
  • –num-messagesProducerApp-1 アプリケーションがトピックに送信するメッセージ数。
  • –role-session-nameProducerApp-1 は AWS STS SDK を使用して Topic-B-Write-Role を引き受けます。ProducerApp-1assumeRole 関数の呼び出し時にこのロールセッション名を使用します。
  • –client-idProducerApp-1 のクライアント ID。
  • –producer-typeProducerApp-1同期または非同期で実行できます。オプションは sync または async です。
  • –print-producer-quota-metricsProducerApp-1 のクライアントメトリクスをターミナルに出力するかのフラグ。
  • –cw-dimension-nameProducerApp-1 からクライアントスロットリングメトリクスを発行する際に使用する CloudWatch ディメンション名。
  • –cw-dimension-valueProducerApp-1 からクライアントスロットリングメトリクスを発行する際に使用する CloudWatch ディメンション値。
  • –cw-namespaceProducerApp-1 がスロットリング監視用の CloudWatch メトリクスを発行する名前空間。
  1. 残りのパラメータに問題がなければ、以下のコマンドを使用し、--assume-role-arn--region を環境に合わせて変更してください。同期 Kafka プロデューサーを実行するには、--producer-type sync オプションを使用します:
java -jar kafka-producer.jar --bootstrap-servers $BOOTSTRAP_BROKERS_IAM \
--assume-role-arn arn:aws:iam::111111111111:role/MSKStack-TopicBWriteRole-xxxxxxxxxxxx \
--topic-name Topic-B \
--region <AWS Region> \
--num-messages 10000000 \
--role-session-name producerapp-1-role-session \
--client-id producerapp-1-client-id \
--producer-type sync \
--print-producer-quota-metrics Y \
--cw-dimension-name ProducerApp \
--cw-dimension-value ProducerApp-1 \
--cw-namespace ProducerApps

または、--producer-type async で非同期プロデューサーも実行できます。詳細については、Asynchronous send を参照してください。

produce-throttle-time-avgproduce-throttle-time-max クライアントメトリクスは 0.0 と表示され、ProducerApp-1 にスロットリングが発生していないことを示しています。ProducerApp-1 の生成クォータはまだ設定していません。ConsumerApp-1ConsumerApp-2 がメッセージを消費でき、スロットリングされていないことを確認してください。それぞれのブラウザタブで Ctrl+C を押して、コンシューマーおよびプロデューサークライアントアプリケーションを停止します。

クライアントアプリケーションの生成および消費クォータの設定

クォータなしでの実行を確認したので、次はクォータを設定して再実行します。

前述の手順で MSKAdminInstance EC2 インスタンスの Sessions Manager ターミナルを開き、以下のコマンドを実行して MSK クラスター内のブローカーのデフォルト設定を確認します。MSK クラスターはデフォルトの Kafka クォータ設定でプロビジョニングされています。

# Describe Broker-1 default configurations
kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--entity-type brokers \
--entity-name 1 \
--all --describe > broker1_default_configurations.txt
cat broker1_default_configurations.txt | grep quota.consumer.default
cat broker1_default_configurations.txt | grep quota.producer.default

上のスクリーンショットは、Broker-1quota.consumer.defaultquota.producer.default のデフォルト値を示しています。

ProducerApp-1 のクォータ設定

以下のコマンドのプレースホルダーは、アカウントに対応する値に置き換えてください。

前述のアーキテクチャ図に従い、ProducerApp-1 の生成クォータを 1024 バイト/秒に設定します。<ProducerApp-1 Client Id><ProducerApp-1 Role Session> には、先ほど ProducerApp-1 を実行した際に使用した値 (それぞれ producerapp-1-client-idproducerapp-1-role-session) を使用してください:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'producer_byte_rate=1024' \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

以下のコマンドで ProducerApp-1 の生成クォータを確認します:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

以下のコマンドで ProducerApp-1 の生成クォータを削除できますが、次にクォータをテストするため実行しないでください

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --delete-config producer_byte_rate \
--entity-type clients --entity-name <ProducerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBWriteRole-xxxxxxxxxxx/<ProducerApp-1 Role Session>

ConsumerApp-1 のクォータ設定

以下のコマンドのプレースホルダーは、アカウントに対応する値に置き換えてください。

ConsumerApp-1 の消費クォータを 5120 バイト/秒に設定します。<ConsumerApp-1 Client Id><ConsumerApp-1 Role Session> には、先ほど ConsumerApp-1 を実行した際に使用した値 (それぞれ consumerapp-1-client-idconsumerapp-1-role-session) を使用してください:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'consumer_byte_rate=5120' \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

以下のコマンドで ConsumerApp-1 の消費クォータを確認します:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

以下のコマンドで ConsumerApp-1 の消費クォータを削除できますが、次にクォータをテストするため実行しないでください

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --delete-config consumer_byte_rate \
--entity-type clients --entity-name <ConsumerApp-1 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-1 Role Session>

ConsumerApp-2 のクォータ設定

以下のコマンドのプレースホルダーは、アカウントに対応する値に置き換えてください。

ConsumerApp-2 の消費クォータを 1024 バイト/秒に設定します。<ConsumerApp-2 Client Id><ConsumerApp-2 Role Session> には、先ほど ConsumerApp-2 を実行した際に使用した値 (それぞれ consumerapp-2-client-idconsumerapp-2-role-session) を使用してください:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--alter --add-config 'consumer_byte_rate=1024' \
--entity-type clients --entity-name <ConsumerApp-2 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-2 Role Session>

以下のコマンドで ConsumerApp-2 の消費クォータを確認します:

kafka-configs.sh --bootstrap-server $BOOTSTRAP_BROKERS_IAM \
--command-config config_iam.properties \
--describe \
--entity-type clients --entity-name <ConsumerApp-2 Client Id> \
--entity-type users --entity-name arn:aws:sts::<AWS Account Id>:assumed-role/MSKStack-TopicBReadRole-xxxxxxxxxxx/<ConsumerApp-2 Role Session>

ConsumerApp-1 と同様に、ConsumerApp-2 のクライアントおよびユーザー情報で同じコマンドを使い消費クォータを削除できます。

クォータ設定後にプロデューサーおよびコンシューマーアプリケーションを再実行する

クォータの効果を確認するため、アプリケーションを再実行します。

ProducerApp-1 の再実行

先ほどと同じコマンドで ProducerApp-1同期モードで再実行します。以下のスクリーンショットは、ProducerApp-1 がいずれかのブローカーでクォータに達すると、produce-throttle-time-avgproduce-throttle-time-max クライアントメトリクスの値が 0.0 を超えることを示しています。0.0 を超える値は ProducerApp-1 がスロットリングされていることを意味します。ProducerApp-1 を数秒間実行してから Ctrl+C で停止します。

ProducerApp-1非同期モード (--producer-type async) で再実行して、生成クォータの効果をテストすることもできます。同期実行と同様に、以下のスクリーンショットは、ProducerApp-1 がいずれかのブローカーでクォータに達すると、produce-throttle-time-avgproduce-throttle-time-max クライアントメトリクスの値が 0.0 を超えることを示しています。0.0 を超える値は ProducerApp-1 がスロットリングされていることを意味します。非同期 ProducerApp-1 をしばらく実行させておきます。

最終的に TimeoutException が表示されます: org.apache.kafka.common.errors.TimeoutException: Expiring xxxxx record(s) for Topic-B-2:xxxxxxx ms has passed since batch creation

非同期プロデューサーでクォータの上限を超えるレートでメッセージを送信すると、メッセージはまずクライアントアプリケーションのメモリにキューイングされます。メッセージの送信レートが受け入れレートを超え続けると、クライアントは最終的にバッファ領域を使い果たし、次の Producer.send() 呼び出しがブロックされます。タイムアウトまでにブローカーが処理しきれない場合、Producer.send() は最終的に TimeoutException をスローします。Ctrl+CProducerApp-1 を停止します。

ConsumerApp-1 の再実行

先ほどと同じコマンドで ConsumerApp-1 を再実行します。以下のスクリーンショットは、ConsumerApp-1 がクォータに達すると、fetch-throttle-time-avgfetch-throttle-time-max クライアントメトリクスの値が 0.0 を超えることを示しています。0.0 を超える値は ConsumerApp-1 がスロットリングされていることを意味します。

ConsumerApp-1 を数秒間実行してから Ctrl+C で停止します。

ConsumerApp-2 の再実行

先ほどと同じコマンドで ConsumerApp-2 を再実行します。同様に、ConsumerApp-2 がクォータに達すると、fetch-throttle-time-avgfetch-throttle-time-max クライアントメトリクスの値が 0.0 を超えます。0.0 を超える値は ConsumerApp-2 がスロットリングされていることを意味します。ConsumerApp-2 を数秒間実行してから Ctrl+C で停止します。

Amazon CloudWatch のクライアントクォータメトリクス

パート 1 で、クライアントメトリクスは Kafka クラスターに接続するクライアントが公開するメトリクスであると説明しました。CloudWatch でクライアントメトリクスを確認しましょう。

  1. CloudWatch コンソールで All metrics を選択します。
  2. Custom Namespaces で、クライアントアプリケーションの実行時に指定した名前空間を選択します。
  3. ディメンション名を選択し、すべてのアプリケーションの produce-throttle-time-maxproduce-throttle-time-avgfetch-throttle-time-maxfetch-throttle-time-avg メトリクスを選択します。

上記のメトリクスは、前のセクションでクォータを設定してテストした ProducerApp-1ConsumerApp-1ConsumerApp-2 アプリケーションのスロットリング動作を示しています。以下のスクリーンショットは、ネットワーク帯域幅クォータに基づく ProducerApp-1ConsumerApp-1ConsumerApp-2 のスロットリングを示しています。各アプリケーションはそれぞれのクライアントメトリクスを CloudWatch に送信します。ソースコードは GitHub で参照できます。

クライアント ID とロールセッション名のセキュリティ

アプリケーションのクライアント ID と認証済みユーザープリンシパルで Kafka クォータを設定する方法を説明しました。クライアントアプリケーションが IAM 認証を有効にした MSK クラスターの Kafka トピックにアクセスするために IAM ロールを引き受けると、認証済みユーザープリンシパルは以下の形式で表されます (詳細については IAM identifiers を参照):

arn:aws:sts::111111111111:assumed-role/Topic-B-Write-Role/producerapp-1-role-session

ここには、クライアントアプリケーションが AWS STS SDK で IAM ロールを引き受ける際に使用するロールセッション名 (この場合は producerapp-1-role-session) が含まれています。クライアントアプリケーションのソースコードは参照用に公開されています。クライアント ID は、アプリケーションチームがアプリケーションコードで設定する論理名の文字列 (例: producerapp-1-client-id) です。そのため、あるアプリケーションが別のアプリケーションのクライアント IDロールセッション名を取得し、同じ IAM ロールを引き受ける権限がある場合、そのアプリケーションになりすますことができます。

アーキテクチャ図のとおり、ConsumerApp-1ConsumerApp-2 はそれぞれ独自のクォータ割り当てを持つ別々のクライアントアプリケーションです。両方ともデモアカウントで同じ IAM ロール (Topic-B-Read-Role) を引き受ける権限があるため、Topic-B からメッセージを消費できます。そのため、MSK クラスターのブローカーはクライアント IDユーザー (それぞれのロールセッション名の値を含む) で区別します。ConsumerApp-2 が何らかの方法で ConsumerApp-1ロールセッション名クライアント ID を取得した場合、アプリケーションコードで ConsumerApp-1ロールセッション名クライアント ID を指定することで ConsumerApp-1 になりすますことができます。

ConsumerApp-1クライアント ID として consumerapp-1-client-idロールセッション名として consumerapp-1-role-session を使用するとします。ConsumerApp-1Topic-B-Read-Role IAM ロールを引き受けると、認証済みユーザープリンシパルは以下のようになります:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-1-role-session

同様に、ConsumerApp-2クライアント ID として consumerapp-2-client-idロールセッション名として consumerapp-2-role-session を使用します。ConsumerApp-2Topic-B-Read-Role IAM ロールを引き受けると、認証済みユーザープリンシパルは以下のようになります:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-2-role-session

ConsumerApp-2ConsumerApp-1クライアント IDロールセッション名を取得してアプリケーションコードで指定すると、MSK クラスターのブローカーは ConsumerApp-2ConsumerApp-1 として扱い、クライアント IDconsumerapp-1-client-id、認証済みユーザープリンシパルを以下のように認識します:

arn:aws:sts::<AWS Account Id>:assumed-role/Topic-B-Read-Role/consumerapp-1-role-session

その結果、ConsumerApp-2 は本来のクォータ割り当てである 1024 バイト/秒ではなく、最大 5120 バイト/秒で MSK クラスターからデータを消費できるようになります。ConsumerApp-2 が同時に実行されると、ConsumerApp-1 のスループットに悪影響を及ぼします。

拡張アーキテクチャ

アプリケーションのクライアント IDロールセッション名を保護するため、アーキテクチャに AWS Secrets ManagerAWS Key Management Service (AWS KMS) を導入できます。ガバナンスを強化するため、アプリケーションのクライアント ID とロールセッション名は Secrets Manager に暗号化されたシークレットとして保存します。暗号化されたシークレットと KMS カスタマーマネージドキー (CMK) に関連付けられた IAM リソースポリシーにより、アプリケーションは自身のクライアント ID とロールセッション名のみにアクセスして復号できます。アプリケーション間でのなりすましを防止できます。以下の図は拡張アーキテクチャを示しています。

更新されたフローには以下のステージがあります:

  • P1ProducerApp-1 は Secrets Manager から client-idrole-session-name のシークレットを取得します
  • P2ProducerApp-1 はシークレットの client-id をアプリケーションコードで CLIENT_ID_CONFIG として設定し、(ProducerApp-1-Role IAM ロールで) シークレットの role-session-name を AWS STS SDK の assumeRole 関数呼び出しに渡して Topic-B-Write-Role を引き受けます
  • P3Topic-B-Write-Role IAM ロールを引き受けた状態で、ProducerApp-1Topic-B へのメッセージ送信を開始します
  • C1ConsumerApp-1ConsumerApp-2 はそれぞれ Secrets Manager から client-idrole-session-name のシークレットを取得します
  • C2ConsumerApp-1ConsumerApp-2 はそれぞれのシークレット client-id をアプリケーションコードで CLIENT_ID_CONFIG として設定し、(それぞれ ConsumerApp-1-RoleConsumerApp-2-Role IAM ロールで) シークレットの role-session-name を AWS STS SDK の assumeRole 関数呼び出しに渡して Topic-B-Read-Role を引き受けます
  • C3Topic-B-Read-Role IAM ロールを引き受けた状態で、ConsumerApp-1ConsumerApp-2Topic-B からのメッセージ消費を開始します

アーキテクチャへの組み込み方法の詳細については、AWS Secrets ManagerAWS KMS のドキュメントを参照してください。

リソースのクリーンアップ

CloudFormation コンソールに移動し、MSKStack スタックを削除します。本記事で作成したすべてのリソースが削除されます。

まとめ

本記事では、Amazon MSK クォータの設定手順を詳しく説明し、サンプルクライアントアプリケーションでその効果を検証しました。さらに、クライアントメトリクスでスロットリングの有無を判断する方法も説明しました。また、平文のクライアント ID とロールセッション名のセキュリティリスクも指摘しました。ゼロトラストアーキテクチャの実現には、改訂版アーキテクチャ図のとおり Secrets Manager と AWS KMS の活用をお勧めします。

本記事 (改訂版アーキテクチャを含む) についてのフィードバックやご質問をお待ちしています。

著者について

Vikas Bajaj

Vikas Bajaj

Amazon Web Services の金融サービス担当シニアマネージャー、ソリューションアーキテクトです。金融サービスやデジタルネイティブビジネスで 20 年以上の経験を持ち、プロダクト設計、テクノロジーロードマップ、アプリケーションアーキテクチャについてお客様にアドバイスしています。


この記事は Kiro が翻訳を担当し、Solutions Architect の 榎本 貴之 がレビューしました。