Redis Streams を使用する

Redis ストリームの概要について学び、履歴を保持するメッセージブローカー、メッセージキュー、統合ログ、チャットシステムを構築します。

今回のブログ記事は、オープンソースの Redis 5 のブログシリーズの続きです。ここでは Redis 5 のオープンソース版の新機能である Redis ストリームを取り上げます。

インターネットリレーチャット (IRC) に初めて接続したときのことは、まだ記憶に新しいのではないでしょうか。打ち解けた雰囲気で、未来的でありながら、ローテクな印象もありました。チャンネルに参加することは、何時間も議論が続いていた場所に、その議論の主題を理解しないまま足を踏み入れるようなところがありました。主題を理解できると自分も会話に参加するのですが、一度退室してしまうと、その後会話がどのように発展していったかは知る由もありません。まるで現実の人生のようです。

時間が経過し、IRC それ自体に大きな変化は起きていませんが、最新のコラボレーションツールによって、行われたやりとりを一行たりとも聞き逃すことはなくなりました。こうしたツールは各会話のログを記録しますので、ユーザーは会話に追いついたり、前回のセッション後の聞き逃した会話をさかのぼって確認したりすることが可能です。旧式のチャットツールと最新のチャットツールの違いは、Redis の 2 つの機能、Pub/Sub とストリームを比較する際に使用されるアナロジーにあります。

Redis には、バージョン 2.0.0 からパブリッシュ/サブスクライブパターンが組み込まれました。このパターンにより、クライアントは 1 つ以上のチャネルをサブスクライブし、それらと接続している間はメッセージを受信することができます。他のクライアントは発行者として機能し、メッセージを 1 つ以上のチャネルに送信します。IRC に似ているように感じられるのは、IRC にもパブリッシュ/サブスクライブパターンが実装されているためです。

履歴を保持する必要がある場合、Redis に PUBLISH/SUBSCRIBE として実装されているパブリッシュ/サブスクライブパターンだけでは十分ではありません。Redis はデータ構造サーバーなので、Pub/Sub からのメッセージングをリストやハッシュと組み合せて必要な基本ソリューションを構築することはもちろん可能です。しかし、バージョン 5.0.0 からは、ログのデータ構造や一連の柔軟性の高いコマンドを実装した、新しいデータタイプであるストリームを使用できるようになります。

Redis ストリームの基本的概要

Redis ストリームは、概念的にはエントリを追加することができるリストです。各エントリは一意の ID と値を持っています。ID はデフォルトで自動生成され、タイムスタンプが含まれます。値はハッシュ値です。範囲内にクエリを行ったり、ブロックコマンドを使って着信のたびにエントリを読み取ったりできます。Redis は一般的に、異なる要素を組み合せることで必要な結果が得られます。かつてニクラウス・ヴィルトが語ったように、プログラムとはアルゴリズムにデータ構造を追加したものですが、Redis はその両方をすでにユーザーに提供しています。

Redis ストリームは、履歴を保持するメッセージブローカー、メッセージキュー、統合ログ、チャットシステムの構築に適しています。いったん送信されると忘れ去られる Pub/Sub メッセージとは異なり、Redis ストリームはメッセージを永続的に保持します。Redis ストリームは、ストリームの要素を使用する際にクライアントのグループが協働することを可能にするコンシューマーグループという機能を実装しています。たとえば、グループ内のコンシューマーは、ID 別に項目を検索したり、項目の処理を確認したり、または保留中のメッセージの所有権を主張したりすることができます。コンシューマーグループについては今後のブログ記事で解説します。

Redis によるチャット

以下は、IRC のチャネルにメッセージを送信する方法の 1 つです。

/notice #foo hey!

そして以下は、Redis でメッセージを発行する方法です。

PUBLISH #foo "hey!"

非常によく似ています。動作もほとんど同じです。IRC に送信されたメッセージを読み取りたいときは、まずそこに参加する必要があります。

/join #foo

Redis に発行されたメッセージを読み取る場合も同じです。

SUBSCRIBE #foo

こちらも非常によく似ています。両方とも、接続している限り、クライアントは着信したメッセージを読み取れます。メッセージの受信を停止したい場合は、そのためのコマンドも用意されています。IRC の場合は以下です。

/part #foo

Redis で同じ働きをするのは以下です。

UNSUBSCRIBE #foo

#foo という名前は IRC では重要ですが Redis では重要ではありません。他の文字列を使うことも可能です。

それでは、同じ動作をストリームで実装してみましょう。メッセージの発行は以下のようになります。

XADD #foo * msg "hey!"

ここでは何が行われているのでしょうか? 説明すべきことがたくさんありますので、さっそく始めます。

XADD は、ストリームの作成とエントリの発行に使用されるコマンドです。

#foo は、Redis キーの名前で、ストリームを表しています。この名前は任意です。ここでは、IRC のチャネル名を模した #foo を使用しています。

* は、ID の代わりを務める特別な値です。新しい ID をこれまでより大きな値で作成するよう Redis に伝えます。

msg は、フィールドです。エントリの値はハッシュ値であるため、1 つ以上のフィールドと 1 つの値を送信する必要があります。上記の例では、msg がフィールドに選択されています。

"hey!" は、msg の値です。

このコマンドは、今回の単純なユースケースには複雑過ぎるかもしれません。以下に見ていく通り、これは非常に柔軟性の高いコマンドだからです。ひとまず、メッセージの読み取りへ進みましょう。

XREAD BLOCK 1000 STREAMS #foo $

このコマンドは、その要素まで分解してみないとあまり意味がわかりません。

XREAD は、エントリを取得するためのコマンドです。

BLOCK 1000 は、クライアントが、エントリが全くなければブロックを行い、着信が全くなければ 1000 ミリ秒後にタイムアウトすることを意味します。

STREAMS は、キーのリストとそれに続く ID のリストを受け入れるディレクティブです。ここでは、1 つのキーと 1 つの疑似 ID を送信します。以下をお読みください。

#foo は、Redis キーの名前で、ストリームを表しています。

$ は、疑似 ID で、コマンドがブロックされた後に作成された ID を表しています。つまり、ストリームに存在するそれ以前のエントリをすべて無視し、それ以降に着信したエントリにのみフォーカスすることを意味します。

エントリを取得してもコマンドがタイムアウトしても、さらにエントリを取得するには XREAD を再度呼び出す必要があります。つまり、XREAD は通常、ループ内で呼び出されます。エントリの読み取りを停止したいときは、ループから抜け出せば停止できます。

IRC、Pub/Sub、ストリームを使って同じ結果を得る方法についてみてきました。ストリームには他にも用途があります。ストリームをログとして使用する方法、つまり、エントリをプッシュし、その後 XREAD コマンドを使って着信のたびにエントリを読み取る方法についてもみてきました。当社が提供したバージョンは、ファイルで tail -f を使用する場合と同等です。(tail -f を使い慣れていない場合、詳細を入手するには man tail を実行します)。

履歴保持チャット

過去に交わされた会話に追いつくことができる、最新のチャットアプリケーションをモデル化してみましょう。このバージョンでは、各クライアントは、接続されているか否かにかかわらず、メッセージがストリームに届いたときにすべてのメッセージを読み取ることができます。

過去に交わされた会話だけでなく、入室時点で着信した新規メッセージも閲覧可能なチャットを実装したいときは、クライアントが最後に閲覧されたメッセージを追跡する必要があります。その場合、クライアントは最後に読み取ったエントリの ID を覚えておかなければなりません。ストリーム ID について学ぶこれ以上の機会はありません。ID がどのように作成され使用されるのか、知っておきましょう。

まず、エントリを作成するためのコマンドをおさらいしましょう。

XADD #foo * msg "hey!"

これを Redis 5 にフィードすると、次の応答が返ってきます。

1536495531827-0

これがエントリの新しい ID です。以下のフォーマットに従っています。

<timestamp>-<sequence>

前半はタイムスタンプ (単位はミリ秒) 、後半はシーケンス番号です。Redis が同一のミリ秒内に 2 つ以上の XADD コマンドを受信する場合、シーケンス番号の値が増加します。これにより、2 つのエントリに同じ ID が付くことはありません。また、新しい ID はそれ以前のどの ID よりも大きい値になります。

それでは、履歴を認識したクライアント側の視点で、XREAD コマンドについておさらいします。クライアントを接続するのは初めてで、現時点までのすべてのメッセージの読み取りを希望しているものと仮定します。コマンドは以下のようになります。

XREAD STREAMS #foo 0-0

今回、クライアントはブロックしていません。#foo にある、ID の値が 0-0 よりも大きいすべてのエントリを呼び出します。確認ですが、0-0 は有効な ID ではありません。Redis に ID を作成させずに自分で ID を作成しても Redis はこの値を拒否します。0-0 は特別な値であるためです。また、頻繁に使用されるため、Redis は 0 を省略形として受け入れます。したがって、コマンドは次のように記述することも可能です。

XREAD STREAMS #foo 0

Redis から得られる結果は以下の通りです。

1) 1) "#foo"
    2) 1) 1) 1536495531827-0
        2) 1) "msg"
            2) "hey!"

redis-cli を使用するとこのような表記になります。たとえば Python などから同じコマンドを呼び出すと、次のような構造になります。

>> r.execute_command("XREAD", "STREAMS", "#foo", "0")
=> [["#foo", [["1536495531827-0", ["msg", "hey!"]]]]]

結果を展開して、実際の形状を確認してみましょう。

[
    [
    "#foo",
        [
            [
            "1536495531827-0",
                [
                "msg",
                "hey!"
                ]
            ]
        ]
    ]
]

これはストリームの配列です (ここでは、要素は "#foo" のみです)。各ストリームには、ID およびフィールド/値のペアのリストから構成された、対応するエントリの配列があります。

次はチャットのメタファーです。保存すべき重要情報は最後に閲覧した ID です。したがって、自分のプログラムに "1536495531827-0" を保存する必要があります。新しいエントリをチェックする準備が整ったら、再度 XREAD を呼び出して、最後に閲覧した ID を渡します。

>>r.execute_command("XREAD", "STREAMS", "#foo", "1536495531827-0")
=> nil

新しいエントリは存在しないので、呼び出しの応答には nil が返ります。新しいエントリを受信していた場合は、最後に閲覧したエントリの ID を更新します。すると、ユーザーに対応しているアプリケーションには、新たに到着した未読のメッセージが表示されます。

まとめ

Redis ストリームは、履歴を保持するメッセージブローカー、メッセージキュー、統合ログ、チャットシステムを構築するための優れた新機能です。ストリームの操作は一見複雑ですが、今回の記事や今後の記事をお読みいただければ、この新機能がカバーしているコマンドやさまざまな使用事例についてさらに理解を深めていただくことができます。引き続き、本テーマに関する情報をお伝えしてまいります。

AWS での完全マネージド型の Redis

Amazon では、完全マネージド型の Redis サービスである Redis 用 Amazon ElastiCache を提供しています。このサービスは、AWS 無料利用枠を使用して無料でお試しいただけます。Redis 用 Amazon ElastiCache を活用すれば、クラウド内で Redis のデプロイを簡単にセットアップ、操作、スケールできます。Amazon ElastiCache を使用すると、コスト効率が良く、サイズ変更が可能なハードウェア容量で、インターネット規模の Redis を数分でデプロイできます。 

無料の Redis 用 Amazon ElastiCache は 3 つの簡単なステップで使用を開始できます。

Redis 用 Amazon ElastiCache

サインアップ

Amazon ElastiCache 無料利用枠を利用します。
Redis 用 Amazon ElastiCache の詳細を見る

シンプルなチュートリアルで学ぶ

Redis クラスターの作成方法を学習します。
Redis 用 ElastiCache の使用開始

構築を開始する

ユーザーガイドを参照しながら構築を開始します。