Amazon Web Services ブログ

Amazon Keyspaces (for Apache Cassandra) のキャッシュとしての Amazon ElastiCache の利用

本記事は 2024 年 11 月 12 日に公開された “Use Amazon ElastiCache as a cache for Amazon Keyspaces (for Apache Cassandra)” を翻訳したものです。


この投稿では、ブックアワードのデータを保存するために Amazon Keyspaces (for Apache Cassandra) テーブルを使用するアプリケーションのライトスルーキャッシュとして Amazon ElastiCache を使用する方法をご紹介します。Amazon Keyspaces にプログラムでアクセスするために Cassandra Python クライアントドライバー を使用し、ElastiCache クラスターに接続するために Redis クライアント を使用します。

Amazon Keyspaces は、スケーラブルで高可用性を備えた、フルマネージド型の Cassandra 互換データベースサービスで、どのようなスケールでも数ミリ秒の読み取りと書き込みのレスポンスタイムを提供します。
Amazon Keyspaces はサーバーレスであるため、クラスター内のノードを通じてワークロードのストレージと計算リソースをデプロイ、管理、維持する代わりに、テーブルに直接ストレージと読み取り/書き込みスループットリソースを割り当てます。

ほとんどのワークロードでは、Amazon Keyspaces が提供する 1 桁ミリ秒の応答時間で十分であり、Amazon Keyspaces が返す結果をキャッシュする必要はありません。
しかし、アプリケーションが読み取り操作でサブミリ秒の応答時間を必要とする場合や、読み取り集中型でありながらコストに敏感な場合、あるいはパーティションごとに 1 秒あたり 3,000 読み取りキャパシティユニット (RCU) を超えるような繰り返しの読み取りが必要な場合があります。
Amazon ElastiCache は、フルマネージド型で高可用性を備えた分散型の高速インメモリデータストアで、Amazon Keyspaces テーブルのキャッシュとして使用でき、読み取りレイテンシーをサブミリ秒レベルまで短縮し、読み取りスループットを向上させ、バックエンドデータベースのコストを増やすことなく、より高い負荷にスケールできます。
Amazon ElastiCache は、オープンソースのキーバリューストアである Valkey および Redis OSS と互換性があります。
この投稿で説明するアプローチとコードは、これらのエンジンの両方で動作します。

この投稿では、ライトスルーキャッシュ方式と遅延読み込みを使用しています。
ライトスルーキャッシュは、初期レスポンス時間を改善し、キャッシュされたデータを基盤となるデータベースと同期した状態に保ちます。
遅延読み込みでは、キャッシュミスが発生した場合にのみデータがキャッシュにロードされるため、キャッシュのリソース使用量が削減されます。
キャッシュ設計の詳細については、キャッシングパターンキャッシュ設計を参照してください。

前提条件

Amazon Keyspaces への接続の前提条件には、TLS 証明書のダウンロードと TLS を使用するための Python ドライバーの設定、関連する Python パッケージのダウンロード、そしてキースペースとテーブルへの接続設定が含まれます。

以下は、Amazon Keyspaces SigV4 認証プラグインを使用して Amazon Keyspaces テーブルに接続するための定型コードです。

from cassandra.cluster import *
 from ssl import SSLContext, PROTOCOL_TLSv1_2 , CERT_REQUIRED 
 from cassandra.auth import PlainTextAuthProvider 
 from cassandra_sigv4.auth import SigV4AuthProvider 
 from cassandra.query import SimpleStatement 
 import time 
 import boto3 

 ssl_context = SSLContext(PROTOCOL_TLSv1_2)
 ssl_context.load_verify_locations('/home/ec2-user/sf-class2-root.crt')
 ssl_context.verify_mode = CERT_REQUIRED 
 boto_session = boto3.Session()
 auth_provider = SigV4AuthProvider(boto_session)
 cluster = Cluster(['cassandra.us-east-1.amazonaws.com'],
                  ssl_context=ssl_context,
                  auth_provider=auth_provider,
                  port=9142)
 session = cluster.connect()

sigv4 プラグインを使用することが、推奨されるセキュリティのベストプラクティスです。ただし、Cassandra が認証とアクセス管理に使用する従来のユーザー名とパスワードとの下位互換性のために、サービス専用の認証情報を使用して Amazon Keyspaces に接続することもできます。

ElastiCache クラスターへの接続手順については、ElastiCache への接続 を参照してください。
以下は、ElastiCache クラスターに接続するための定型コードです:

from rediscluster import RedisCluster 
 import logging 

 logging.basicConfig(level=logging.ERROR)
 redis = RedisCluster(startup_nodes=[{"host": "keyspaces-cache.eeeccc.clustercfg.use1.cache.amazonaws.com",
                                     "port": "6379"}],
                     decode_responses=True,
                     skip_full_coverage_check=True)
 if redis.ping():
    logging.info("Connected to Redis")

Amazon Keyspaces テーブルスキーマ

この例では、catalog というキースペースにある book_awards という名前の Amazon Keyspaces テーブルを使用して、ブックアワードに関するデータを保存しています。
パーティションキーは award と year のカラムで構成されています。
次のスクリーンショットに示すように、このテーブルには category と rank という 2 つのクラスタリングキーカラムがあります。
このキースキーマにより、パーティション全体にデータが均等に分散され、この投稿で説明するアクセスパターンに対応できます。
Amazon Keyspaces のデータモデリングのベストプラクティスについては、データモデリングのベストプラクティス:データモデル設計のための推奨事項を参照してください。

Keyspaces Schema

以下のスクリーンショットは、このテーブルのサンプルデータを数行表示しています。

Sample rows in Keyspaces

次のセクションでは、Amazon Keyspaces の操作とそれらの操作のキャッシュ結果に関するサンプルコードスニペットを見ていきます。
このブログ記事のコードスニペットは参考用であり、入力の検証とエラー処理はサンプルには含まれていません。

単一行の INSERT および DELETE 操作

ライトスルーキャッシングプロセスでは、プライマリデータベースの更新直後にキャッシュが積極的に更新されます。
基本的なロジックは以下のようにまとめることができます:

  1. アプリケーションが Amazon Keyspaces の行を追加または削除します
  2. その直後に、キャッシュ内の行も追加または削除されます

以下の図は、ライトスルーキャッシング戦略を示しています。

Write-through Caching workflow

以下のサンプルコードは、book_awards データに対する INSERT および DELETE 操作に対して、ライトスルー戦略を実装する方法を示しています。

#Global variables 
 keyspace_name="catalog"
 table_name="book_awards"


#Write a row 
 def write_award(book_award):
    write_award_to_keyspaces(book_award)
    write_award_to_cache(book_award)

#write row to the Keyspaces table 
 def write_award_to_keyspaces(book_award):
    stmt=SimpleStatement(f"INSERT INTO {keyspace_name}.{table_name} (award, year, category, rank, author, book_title, publisher) VALUES (%s, %s, %s, %s, %s, %s, %s)",
                         consistency_level=ConsistencyLevel.LOCAL_QUORUM)
    session.execute(stmt,(book_award["award"],
                                book_award["year"],
                                book_award["category"],
                                book_award["rank"],
                                book_award["author"],
                                book_award["book_title"],
                                book_award["publisher"]))

#write row to the cache 
 def write_award_to_cache(book_award):
    #construct Redis key name 
    key_name=book_award["award"] + str(book_award["year"])+ book_award["category"] + str(book_award["rank"])

    #write to cache using Redis set, ex=300 sets TTL for this row to 300 seconds 
    redis.set(key_name, str(book_award), ex=300)


#Delete a row 
 def delete_award(award, year, category, rank):
    delete_award_from_keyspaces(award, year, category, rank)
    delete_award_from_cache(award, year, category, rank)

#delete row from Keyspaces table 
 def delete_award_from_keyspaces(award, year, category, rank):
    stmt = SimpleStatement(f"DELETE FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s ;",
                           consistency_level=ConsistencyLevel.LOCAL_QUORUM)
    session.execute(stmt, (award, int(year), category, int(rank)))

#delete row from cache 
 def delete_award_from_cache(award, year, category, rank):
    #construct Redis key name    
    key_name=award + str(year)+ category + str(rank)

    #delete the row from cache if it exists 
    if redis.get(key_name) is not None:
        book_award=redis.delete(key_name)

プライマリーキーによる単一のブックアワードの取得

遅延ロードを使用した基本的なデータ取得ロジックは、以下のようにまとめることができます。

  1. アプリケーションがデータベースからデータを読み取る必要がある場合、まずキャッシュをチェックしてデータが利用可能かどうかを確認します。データが利用可能な場合 (キャッシュヒット)、キャッシュされたデータが返され、呼び出し元にレスポンスが送信されます
  2. データが利用できない場合 (キャッシュミス):
    1. データベースにデータを問い合わせます
    2. データベースから取得したデータをキャッシュに格納し、そのデータを呼び出し元に返します

以下の図は、データ取得のロジックを示しています。

Data retrieval workflow

このユースケースで想定される最も一般的なアクセスパターンの 1 つは、すべてのプライマリキーの列が既知の場合にブックアワードを要求することです。

レイジーローディング戦略では、アプリケーションはまずキャッシュからリクエストされたデータの取得を試みます。キャッシュに行が見つからない場合、データベースから行を取得し、将来の使用のためにキャッシュします。

TTL (Time to Live) は、キーの有効期限を秒単位で指定する整数値です。Valkey や Redis OSS では、この値に秒またはミリ秒を指定できます。この例では TTL 値を 300 秒に設定していますが、アプリケーションのニーズに応じて設定を変更できます。

さらに、Python の time ライブラリを使用して、データベースからの結果取得とキャッシュからの結果取得の応答時間を比較することができます。

#Global variables 
 keyspace_name="catalog"
 table_name="book_awards"


#Read a row 
 def get_award(award, year, category, rank):
    #construct Redis key name from parameters 
    key_name=award + str(year)+ category + str(rank)
    start=time.time()
    book_award=redis.get(key_name)
    end=time.time()
    elapsed=(end - start)*1000 

    #if row not in cache, fetch it from Keyspaces table 
    if not book_award:
        print("Fetched from Cache: ", book_award)
        stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank=%s ;")
        start=time.time()
        res=session.execute(stmt, (award, int(year), category, int(rank)))
        end=time.time()
        elapsed=(end - start)*1000 
        if not res.current_rows:
            print("Fetched from Database: None")
            return None 
        else:
            #lazy-load into cache 
            book_award=redis.set(key_name, str(res.current_rows), ex=300)
            print("Fetched from Database in: ", elapsed, "ms")
            return res.current_rows 
    else:
        print("Fetched from Cache in: ", elapsed, "ms")
        return book_award 

複数のパラメータに基づく結果セットの取得

ここでのもう 1 つの一般的なアクセスパターンは、「X 年の Y カテゴリーにおける上位 N 件の受賞データを取得する」と想定されています。
この記事では、リクエストパラメータを連結し、この連結された文字列を、リクエストパラメータに一致する賞のリストの Redis キーとして使用します。

#Global variables 
 keyspace_name="catalog"
 table_name="book_awards"


#Read one or more rows based on parameters 
 def get_awards(parameters):
    #construct key name from parameters 
    key_name=""
    for p in parameters:
        key_name=key_name + str(p)
    
    start=time.time()
    book_awards=redis.lrange(key_name, 0, -1)
    end=time.time()
    elapsed=(end - start)*1000 

    #if result set not in cache, fetch it from Keyspaces table 
    if not book_awards:
        print("Fetched from Cache: ", book_awards)
        stmt = SimpleStatement(f"SELECT * FROM {keyspace_name}.{table_name} WHERE award=%s AND year=%s AND category=%s AND rank<=%s ;")
        start = time.time()
        res=session.execute(stmt, parameters)
        end=time.time()
        elapsed=(end - start)*1000 
        if not res.current_rows:
            print("Fetched from Database: None")
            return None 
        else:
            #lazy-load into cache 
            redis.rpush(key_name, str(res.current_rows))
            redis.expire(key_name, 300)
            print("Fetched from Database in: ", elapsed, "ms")
            return res.current_rows 
    else:
        print("Fetched from Cache: ", elapsed, "ms")
        return book_awards 

テストケース

最初のテストケースでは、単一行のデータ挿入、キャッシュヒット、キャッシュミス、データ削除のシナリオにおけるキャッシュと遅延ロードの動作を検証します。

def test_case_1():
    book_award={"award": "Golden Read",
                "year": 2021,
                "category": "sci-fi",
                "rank": 2,
                "author": "John Doe",
                "book_title": "Tomorrow is here",
                "publisher": "Ostrich books"}

    #insert an award to the DB and cache
    write_award(book_award)
    print("Test Case 1:")
    print("New book award inserted.")

    #cache hit - get award from cache
    print("\n")
    print("Verify cache hit:")
    res=get_award(book_award["award"],
              book_award["year"],
              book_award["category"],
              book_award["rank"])
    print(res)

    #let the cache entry expire
    print("\n")
    print("Waiting for cached entry to expire, sleeping for 300 seconds...")
    time.sleep(300)

    #cache miss - get award from DB and lazy load to cache
    print("\n")
    print("Entry expired in cache, award expected to be fetched from DB:")
    res=get_award(book_award["award"],
              book_award["year"],
              book_award["category"],
              book_award["rank"])
    print(res)

    #cache hit - get award from cache
    print("\n")
    print("Verify that award is lazy loaded into cache:")
    res=get_award(book_award["award"],
              book_award["year"],
              book_award["category"],
              book_award["rank"])
    print(res)

    #delete the award from cache and DB
    print("\n")
    print("Deleting book award.")
    delete_award(book_award["award"],
                 book_award["year"],
                 book_award["category"],
                 book_award["rank"])

    #confirm the award was deleted from cache and DB
    print("\n")
    print("Verify that the award was deleted from cache and DB:")
    res=get_award(book_award["award"],
              book_award["year"],
              book_award["category"],
              book_award["rank"])
    if res:
        print(res)

このテストケースを実行すると、予想通り以下のような出力が生成されます。
キャッシュからデータを取得する場合は 1 ミリ秒未満の往復時間が観測され、データベースへの問い合わせの場合はミリ秒単位の応答時間が観測されます。

Test Case 1:
 New book award inserted.


 Verify cache hit:
 Fetched from Cache in:  0.3809928894042969 ms 
{'award': 'Golden Read', 'year': 2021, 'category': 'sci-fi', 'rank': 2, 'author': 'John Doe', 'book_title': 'Tomorrow is here', 'publisher': 'Ostrich books'}


 Waiting for cached entry to expire, sleeping for 300 seconds...


 Entry expired in cache, award expected to be fetched from DB:
 Fetched from Cache:  None 
 Fetched from Database in:  14.202594757080078 ms 
 [Row(year=2021, award='Golden Read', category='sci-fi', rank=2, author='John Doe', book_title='Tomorrow is here', publisher='Ostrich books')] 


 Verify that award is lazy loaded into cache:
 Fetched from Cache in:  0.4191398620605469 ms 
 [Row(year=2021, award='Golden Read', category='sci-fi', rank=2, author='John Doe', book_title='Tomorrow is here', publisher='Ostrich books')] 


 Deleting book award.


 Verify that the award was deleted from cache and DB:
 Fetched from Cache:  None 
 Fetched from Database: None 

2 番目のテストケースでは、複数のパラメータに基づいて結果セットを取得する際のキャッシュと遅延ロードの動作を検証します。
このポストの Amazon Keyspaces テーブルスキーマ セクションで説明したように、Amazon Keyspaces テーブルには書籍の受賞データがあらかじめ読み込まれています。
このテストケースでは、データベースに新しい行を挿入する代わりに、事前に読み込まれたデータを扱います。

def test_case_2():
    print("Test Case 2:")
    print("Get top 3 Must Read book awards for year 2021 in the Sci-Fi category")
    print("\n")
    res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
    print(res)

    #cache-hit - get awards from cache 
    print("\n")
    print("Verify cache hit on subsequent query with same parameters:")
    res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
    print(res)

    #let the cache entry expire 
    print("\n")
    print("Waiting for cached entry to expire, sleeping for 300 seconds...")
    time.sleep(300)

    #cache miss - get award from DB and lazy load to cache 
    print("\n")
    print("Entry expired in cache, awards expected to be fetched from DB.")
    res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
    print(res)

    #cache hit - get award from cache 
    print("\n")
    print("Verify that awards are lazy loaded into cache:")
    res=get_awards(["Must Read", 2021, "Sci-Fi", 3])
    print(res)

遅延ロードとキャッシングのワークフローの動作を確認できます。
最初の呼び出しではキャッシュに結果が見つからないため、キャッシュの基盤となる Amazon Keyspaces テーブルからデータを取得し、キャッシュにロードします。
2 回目の呼び出しでは、キャッシュから結果を取得します。
キャッシュされた結果が期限切れになると、データベースから結果を再度取得し、キャッシュに遅延ロードされます。これにより、同じパラメータでの後続の get_awards 呼び出しでキャッシュからの高速な取得が可能になります。
キャッシュからのデータ取得では 1 ミリ秒未満の往復時間が、データベースへの往復ではミリ秒単位の往復時間が観察できます。

Test Case 2:
Get top 3 Must Read book awards for year 2021 in the Sci-Fi category


Fetched from Cache:  []
Fetched from Database in:  21.03400230407715 ms
[Row(year=2021, award='Must Read', category='Sci-Fi', rank=1, author='Polly Gon', book_title='The mystery of the 7th dimension', publisher='PublishGo'), Row(year=2021, award='Must Read', category='Sci-Fi', rank=2, author='Kai K', book_title='Time travellers guide', publisher='Publish123'), Row(year=2021, award='Must Read', category='Sci-Fi', rank=3, author='Mick Key', book_title='Key to the teleporter', publisher='Penguins')]


Verify cache hit on subsequent query with same parameters:
Fetched from Cache:  0.36835670471191406 ms
["[Row(year=2021, award='Must Read', category='Sci-Fi', rank=1, author='Polly Gon', book_title='The mystery of the 7th dimension', publisher='PublishGo'), Row(year=2021, award='Must Read', category='Sci-Fi', rank=2, author='Kai K', book_title='Time travellers guide', publisher='Publish123'), Row(year=2021, award='Must Read', category='Sci-Fi', rank=3, author='Mick Key', book_title='Key to the teleporter', publisher='Penguins')]"]


Waiting for cached entry to expire, sleeping for 300 seconds...


Entry expired in cache, awards expected to be fetched from DB.
Fetched from Cache:  []
Fetched from Database in:  32.64594078063965 ms
[Row(year=2021, award='Must Read', category='Sci-Fi', rank=1, author='Polly Gon', book_title='The mystery of the 7th dimension', publisher='PublishGo'), Row(year=2021, award='Must Read', category='Sci-Fi', rank=2, author='Kai K', book_title='Time travellers guide', publisher='Publish123'), Row(year=2021, award='Must Read', category='Sci-Fi', rank=3, author='Mick Key', book_title='Key to the teleporter', publisher='Penguins')]


Verify that awards are lazy loaded into cache:
Fetched from Cache:  0.3757476806640625 ms
["[Row(year=2021, award='Must Read', category='Sci-Fi', rank=1, author='Polly Gon', book_title='The mystery of the 7th dimension', publisher='PublishGo'), Row(year=2021, award='Must Read', category='Sci-Fi', rank=2, author='Kai K', book_title='Time travellers guide', publisher='Publish123'), Row(year=2021, award='Must Read', category='Sci-Fi', rank=3, author='Mick Key', book_title='Key to the teleporter', publisher='Penguins')]"]

スクリプト例

サンプルスクリプトとテスト関数は、この GitHub リポジトリで確認できます。

検討事項

この投稿では、ブックアワードのデータに対する 2 つの最も一般的なアクセスパターンの結果をキャッシュする基本的な実装を紹介します。
アクセスパターンの性質に応じて、他にもデータをキャッシュする方法があります:

  • パーティションキーの値のみに基づいて結果をキャッシュ(クライアント側でフィルタリング) – アプリケーションですべてのリクエストパラメータを処理する代わりに、パーティションキーの列のみ(例えば、賞と年)に基づいて結果セットをキャッシュすることができます。その他のフィルタリングはすべてクライアント側で処理されます。これは、異なるクエリ間で、このパーティションキー値を持つ少数の行のみを結果セットから除外する必要がある場合に有用です。結果セットを一度だけキャッシュし、クライアント側で異なるクラスタリング列の値やフィルタ条件を処理します。
  • すべてのキーパラメータを順序付け(例:昇順)してハッシュ化 – ハッシュ値をキャッシュ結果のキーとして使用できます。同じキーパラメータを使用して同じ結果が要求された場合、ハッシュは一貫性を保ち、キャッシュは必要な結果セットを返します。このオプションは、クエリが動的で、クエリ間でキー条件の順序が異なる場合に役立ちます。
  • すべてのクエリパラメータ(キー条件とフィルタ)を順序付け(例:昇順)してハッシュ化 – ハッシュ値をキャッシュ結果のキーとして使用できます。同じクエリパラメータが異なる順序で要求された場合でも、ハッシュは一貫性を保ち、キャッシュは必要な結果セットを返します。このオプションは、クエリパラメータとフィルタが動的で、クエリ間でパラメータの順序が異なる場合に役立ちます。

まとめ

このブログでは、Amazon Keyspaces にデータを保存し、1 ミリ秒未満の読み取り応答時間が必要な、読み取りの多いコストに敏感なアプリケーションのキャッシュとして Amazon ElastiCache を使用する方法を紹介しました。
Amazon ElastiCache を使用することで、クエリの性質に基づいてカスタムキャッシュ戦略を柔軟に設計できます。
また、適切な TTL 値と共に、独自のキャッシュへのデータ投入とエビクションのロジックを設定することもできます。

詳細については、ElastiCache クラスターの設計と管理をご参照ください。

この記事の翻訳は Solutions Architect の堤 勇人が担当しました。

著者について

Author Juhi Patil

Juhi Patil は、ロンドンを拠点とする NoSQL スペシャリストソリューションアーキテクトで、ビッグデータテクノロジーのバックグラウンドを持っています。
現在の役割では、お客様の Amazon Keyspaces および Amazon DynamoDB ベースのソリューションの設計、評価、最適化を支援しています。