「Redis ストリームとメッセージキュー」

Redis ストリームと信頼できるメッセージキューの構築について

このブログ記事は、オープンソースの Redis 5 の記事シリーズにおける続きの記事です。この記事では Redis 5 のオープンソース版の新機能である Redis ストリームについて扱っており、Redis ストリームに関する 2 つ目の記事となります。Redis ストリームに関する最初の記事は、こちらから読むことができます。

Redis とメッセージキュー

「通常は Redis について話をするのです」とメモリ内データベースの作成者である Salvatore Sanfilippo 氏は述べました。それから緊張感のある沈黙の後、「しかし、今回は違います」と付け加えました。

これは 3 年前の話です。

Sanfilippo は、パリで開催されたスケーラビリティに関する会議で、聴衆を前に話をしていました。Sanfilippo のプレゼンテーションは自身がリリースしたばかりのメッセージブローカー Disque についてのものでした。Sanfilippo は、メッセージキューに対する Redis の使用方法が気に入らなかったため、メッセージキューの改善に取り組まなければなりませんでした。Disque は、メッセージとバックグラウンドジョブを処理するために、Redis にはない機能と保証を提供しました。残念なことに、Disque は十分な支持を得られず、最終的に廃止されました。

その後、Redis は大きく改善されました。Redis にはモジュールの概念が組み込まれており (実際に Disque はある時点で Redis モジュールの形で再登場するかもしれません)、現在ではメッセージブローカーの重要な構成要素であるストリームデータ型が組み込まれています。

旧バージョンの Redis で構築できるものをおさらいした後、ストリームによって実現できるものをみていきましょう。

メッセージキュー

メッセージキューとは、概念的にはリストです。プロデューサーは要素を一方からプッシュし、コンシューマは反対側から要素を読み込みます。複数のプロデューサーとコンシューマは、同じキューでやりとりができます。Redis では、基本的なメッセージキューは LPUSH (「左側からのプッシュ」の意味) と RPOP (「右側からのポップ」 の意味) のコマンドで容易に実装できます。最良のシナリオ (happy path) では、コンシューマがアイテムをポップして処理し、処理が完了するとカスタマーは次のアイテムを消費して処理を行う準備ができた状態になります。若干改善するとすれば、読み取りに際してブロックコマンドを使用することが挙げられます。つまり、RPOP の代わりに BRPOP を使用できます。リストが空の場合、コンシューマはブロック状態になり、要素の到着を待ちます。タイムアウトが経過すると、コンシューマは再試行できます。今までのところ、こうした単純な実装において問題はありません。しかし、問題はこのような happy path にはないのです。問題となるのは、アイテムの処理中にプロセスがクラッシュした場合です。 

信頼できるキュー

障害シナリオから回復できる場合、キューは信頼できるとみなされます。コンシューマがクラッシュして処理中のアイテムが失われた場合、システムは信頼できません。まさにこうした状況に合わせてつくられたコマンドが、旧バージョンの Redis に追加されました。そのコマンドは、BRPOPLPUSH です。以前の実装で説明したように、アイテムをポップするだけではなく、アイテムを別のリストにプッシュすることもできます。以下の LPUSH と BRPOPLPUSH のコマンドを使用すると、信頼できるメッセージキューを設計できます。

- プロデューサーは番号 42 をリスト q1 にプッシュします。 

LPUSH q1 42

- コンシューマは q1 からアイテムを取得し、同時に c1 でバックアップを作成します。

BRPOPLPUSH q1 c1 1000

-- コンシューマがアイテムの処理に成功すると、バックアップを消去して次のアイテムに移動します。

DEL c1
BRPOPLPUSH q1 c1 1000

-- コンシューマがクラッシュした場合、システムはバックアップキュー c1 にアイテムのコピーを保持し、次の処理を決定できます。

この例では、q1 はキュー #1 を表し、c1 はコンシューマ #1 を表します。目的は、コンシューマが一意の識別子を保持するようにして、各コンシューマが独自のバックアップキューを持つようにすることです。実稼働環境では、ホスト名と PID を連結することで特定のコンシューマのバックアップキューの名前を形成できます。コンシューマがクラッシュした場合、バックアップキューにあるアイテムを使用してシステムが行うべきことはなんですか ?

  • 再試行?
  • 報告?
  • 無視?

これはアプリケーションと個々のユースケースによって決まります。いずれの場合も、キューは様々な障害シナリオに関しては対応できるようである必要があります。

メッセージキューの信頼性は高くなる可能性がありますが、実装が難しい他の便利な機能もあります。

Disque と本来備わった信頼性

Disque には、キューに関する信頼性において Redis より多くの利点がありました。その 1 つは、アイテムが正常に処理された後にコンシューマが確認する必要があったということでした。サーバーが時間内に ACK コマンドを受信しなかった場合、アイテムをエンキューできました。エンキューしない場合、実際にアイテムに起こることはすべてユーザーにより定義されていましたが、明確な確認メカニズムは Disque に組み込まれました。たとえば、コンシューマがアイテムを処理するのに時間がかかりすぎることを検出した場合、サーバーに通知してタイムアウトを延期できました。つまり Disque では、バックアップキューの処理はすでに行われていたため、バックアップキューを作成する必要はありませんでした。

Redis ストリームとコンシューマグループ

Redis 5 には、コンシューマグループという概念が導入されています。この目的は、ストリームにおいてどのアイテムを消費するのか、多くのコンシューマが同意することです。グループ内の各コンシューマは一意の識別子を保持し、サーバーはどのコンシューマがどのアイテムをフェッチしたのか記録します。

コンシューマは、XACK コマンドを送信して、アイテムが正常に処理されたことをサーバーに認識させることができます。コンシューマは、取得されたが承認されることのなかったアイテムのリストを確認することもでき、さらに保留中のアイテムの所有権を主張できます。コンシューマグループは、Disque における設計決定方法の基本的な要素です。

ここでは、ストリームとコンシューマグループの使用法を、段階的な例を用いて説明します。

まず、s1 キーでストリームを作成しましょう。

XADD s1 * a 1

s1 はストリームを含むことになるキーです。

*は、Redis に指示してアイテムの ID を作成する特別なプレースホルダーです。

a はフィールド名です。

1 はフィールド値です。

ストリームに送信される各メッセージはハッシュなので、複数のフィールド値のペアを送信できます。それでは、コンシューマグループを作成していきます。このグループを g1 と呼びましょう。

XGROUP CREATE s1 g1 0

s1 はストリームの名前です。

g1 はコンシューマグループの名前です。

0 は取得する最初のアイテムの ID です。

0 の代わりとなる別の共通の値は $ です。すでにキューにあるアイテムは関係ありません。つまり、この時点から追加されるすべてのアイテムが必要になります。アプリケーションがすでに認識した特定の ID を指定することもできます。

この時点で、ストリームとコンシューマグループが存在するので、アイテムの作成と消費を開始できます。1 つのコンシューマでは、以下のコマンドを実行することになります。

XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 >

g1 はコンシューマグループの名前です。

c1 は、グループ内にあるコンシューマの一意の識別子です。

COUNT 1 は取得するアイテムの数です。

s1 はストリームの名前です。

> はメッセージ ID の特別なプレースホルダーであり、このグループのコンシューマには配信されなかったメッセージを取得するようサーバーに指示します。

アイテムがストリームからフェッチされ、保留中のアイテムのリストを確認できるようになります。保留中のアイテムは、コンシューマによって取得されたアイテムですが、まだ処理済みであると確認されていないアイテムです。

XPENDING s1 g1

以上が基本構文であり、受信することになるレスポンスは以下のとおりです。

1 ) (整数) 1 # 保留中のアイテムの数
2 ) 「1540835652651-0」 # 保留中のアイテムのリスト内にある最小の ID
3 ) 「1540835652651-0」 # 保留中のアイテムのリスト内にある最大の ID
4) 1) 1) 「c1」 # コンシューマ識別子
    2 ) 「1」 #消費者 「c1」 の保留中のアイテムの数

この場合、保留中のアイテムは 1 つだけなので、最小の ID と最大の ID として同じ値を取得することになります。保留中のアイテムが複数の場合、保留中のアイテムを持つ各コンシューマに対し、1 つのエントリを取得します。

また、特定の開始 ID と終了 ID を取得したり、特定のコンシューマが取得した保留中のアイテムのリストを取得したりすることもできます。この場合、さまざまな情報を得ることができます。

たとえば、XPENDING の別の構文では、ID の範囲や取得するアイテムの数を送信できます。

XPENDING s1 g1 - + 10

s1 はストリームの名前です。

g1 はコンシューマグループの名前です。

- は、ストリーム内で存在しうる最小の ID を表します。

+ は、ストリーム内で存在しうる最大の ID を表します。

10 は取得するアイテムの数です。

取得結果は以下のとおりです。

1) 1) 「1540835652651-0」 # リスト内にある最初のアイテムの ID
    2) 「c1」 # 所有者のコンシューマ識別子
    3) (整数) 5129 # 「c1」 が主張してからのアイドル時間
    4) (整数) 1 # これまでの配信の数

配信の数は興味深いデータです。なぜなら、多くの再試行の後に処理できなかったアイテムを検出できるからです。この数に関しては、XCLAIM コマンドについて説明することで、より明確になります。しかし、まずは happy path について説明します。コンシューマはアイテムを処理すると、以下に示すような、別のコマンドを実行できます。

XACK s1 g1 1540835652651-0

s1 はストリームの名前です。

g1 はグループの名前です。

1540835652651-0 は、処理されたアイテムの ID です。

サーバーは、コンシューマから受信確認応答を受信すると、保留中のメッセージのリストからアイテムを削除します。コンシューマは、同時に複数のアイテムを取得でき、同じ XACK コマンドで複数のアイテム ID を確認することもできます。

この保留中のアイテムのリストは、この記事の冒頭でみた簡単な例におけるバックアップキューを置き換えるものです。アイテムが保留中のリストに長時間ある場合、コンシューマがこれを所有し再試行することがあります。これを適切に実行する方法としては、保留中のアイテムのリストを定期的にチェックし、各アイテムのアイドル時間を調べることが挙げられます。コンシューマがアイテムを引き継ぐことを決定した場合、XCLAIM コマンドを発行できます。

XCLAIM s1 g1 c2 300000 1540835652651-0

s1 はストリームの名前です。

g1 はグループの名前です。

c2 は、アイテムを所有しようとするコンシューマの識別子です。

300000 はアイドル時間のミリ秒単位のしきい値です。

1540835652651-0 は、保留中のアイテムの ID です。

アイドル時間のしきい値は、アイテムを所有できるようになるために必要な最小経過時間 (ミリ秒単位) です。つまり、ID が 1540835652651-0 のアイテムのアイドル時間が 300000 ミリ秒以上になると、サーバーは所有権を変更して、識別子 c2 を持つコンシューマに新たな所有権を割り当てることができます。アイテムの所有権が変更されると、アイドル時間がリセットされます。

コンシューマがアイテムの所有権を取得すると、XREADGROUP コマンドを使用してその所有権を読み取ることができます。

XREADGROUP GROUP g1 c2 COUNT 1 STREAMS s1 0

0 は、この場合も特別な ID です。サーバーは配信しなければならないアイテムを認識しているので、コンシューマとしては正確である必要はなく、単にプレースホルダーを送信するだけで大丈夫です。

ストリームのトリミング

永続的に成長するストリームを保持するのはよくありません。Redis のすべてがメモリに格納されるため、無制限に成長するデータ構造は最終的にすべての利用可能なリソースを消費する可能性が非常に高いです。成長するデータ構造に対処する最良の方法は、アプリケーションとしてどのような情報を保持するのかを決定することです。

ストリームには、2 つのオプションがあります。1 つは、XTRIM コマンドを呼び出すことです。

XTRIM s1 MAXLEN 1000

このコマンドは、最新の 1000 のアイテムのみを保持するようサーバーに指示します。小さい ID のアイテムから削除されます。保持するアイテムの数を厳密にする必要がない場合は、次の代替構文を使用できます。

XTRIM s1 MAXLEN ~ 1000

約 1000 のアイテムのみが必要であることをサーバーに認識させ、サーバーは最適なタイミングを選んでマクロノードを削除します。このアプローチにより、サーバーは最も効率的な方法で基礎となるデータ構造を操作できるようになります。

ストリームをトリミングするもう 1 つのオプションは、アイテムを追加するときに MAXLEN 引数を送信することです。たとえば、XADD への元々のコールを変更して、ストリームのサイズを約 1000 のアイテムに制限できます。

XADD s1 MAXLEN ~ 1000 * a 1

MAXLEN の構文は、XTRIM と XADD の両方で同じです。

まとめ

Redis Streams は、非常に多様なアプリケーション対して、最適な基盤要素となります。本記事では、信頼性の高い メッセージキュー を構築する方法、処理できなかったアイテムを再試行する方法、ストリームのサイズを制限する方法について解説しました。ストリームがある実装とそうでない実装を比較すると、この新しいデータ構造に移行することの利点は明確です。 この記事で紹介した方法以外にも多くのオプションがあるので、資料を必ず参照してください。この記事で、お客様に Redis の魅力が伝わることを期待しています。

AWS での完全マネージド型の Redis

Amazon では、完全マネージド型の Redis サービスである Redis 用 Amazon ElastiCache を提供しています。このサービスは、AWS 無料利用枠を使用して無料でお試しいただけます。Redis 用 Amazon ElastiCache を活用すれば、クラウド内で Redis のデプロイを簡単にセットアップ、操作、スケールできます。Amazon ElastiCache を使用すると、コスト効率が良く、サイズ変更が可能なハードウェア容量で、インターネット規模の Redis を数分でデプロイできます。 

無料の Redis 用 Amazon ElastiCache は 3 つの簡単なステップで使用を開始できます。

Redis 用 Amazon ElastiCache

サインアップ

Amazon ElastiCache 無料利用枠を利用します。
Redis 用 Amazon ElastiCache の詳細を見る

シンプルなチュートリアルで学ぶ

Redis クラスターの作成方法を学習します。
Redis 用 ElastiCache の使用開始

構築を開始する

ユーザーガイドを参照しながら構築を開始します。