Amazon Web Services ブログ

Amazon ElastiCache での集計機能のお知らせ

本記事は 2026 年 5 月 7 日に公開された “Announcing aggregations on Amazon ElastiCache” を翻訳したものです。

Amazon ElastiCache が集計クエリをサポートするようになり、単一のクエリでキャッシュ内のデータを直接フィルタリング、グループ化、変換、集計することがより簡単になりました。集計クエリを使用することで、テラバイト規模のデータに対してマイクロ秒単位の低レイテンシーで、最新の書き込みが反映された結果を返す、リアルタイムなアプリケーション体験を構築できます。データがすでに存在する場所で、アプリケーションが要求する速度で分析を行うことができ、別途分析レイヤーを用意する必要はありません。集計機能を使用すると、ElastiCache に保存済みのデータに対して、リアルタイムのリーダーボード、ファセット検索によるカタログブラウジング、運用レポート、探索的な分析クエリなどを構築できます。

ElastiCache 内のメモリ上で集約処理を直接実行することで、アーキテクチャの複雑さを軽減し、応答時間を改善できます。集約クエリはサーバー側で計算を実行するため、アプリケーションはデータをその場で分析し、最終的なサマリーのみを返すことができます。例えば、1 つの集約クエリで、製品カタログをフィルタリングして特定のカテゴリのデータを取得し、結果をブランドごとにグループ化し、各ブランドの平均評価を計算し、パフォーマンス上位 10 件のみを返すといったことが可能です。これらのクエリは、GROUPBY、REDUCE、APPLY、FILTER、SORTBY、LIMIT などのステージをパイプラインに連結することで構築でき、各ステージの出力が次のステージへの入力になります。これらのステージを任意の順序で組み合わせ、繰り返し使用することで、1 つのコマンドで複数ステップの分析ワークフローを構築できます。集約はプライマリ上で Read-after-Write 整合性 (書き込み後読み取り整合性) を提供するため、結果には最新の書き込みが反映され、クライアントコードを変更することなくシャード間でスケールします。本投稿では、集約によって実現できるユースケースを紹介し、ElastiCache for Valkey を使ってファセットブラウジングエンジンを構築しながら、その仕組みを解説します。

これらの集約機能は、ElastiCache version 9.0 for Valkey で、全文検索、完全一致検索、範囲検索、ベクトル検索機能 (Amazon ElastiCache での全文検索、完全一致検索、範囲検索、ハイブリッド検索 を参照) と並んで利用可能です。ElastiCache version 9.0 for Valkey ではまた、個々のフィールドに対するきめ細やかな TTL 制御を可能にするハッシュフィールドの有効期限機能や、最大 40% 向上したパイプラインスループットも導入されています。リリースの詳細については、Amazon ElastiCache 向け Valkey 9.0 のお知らせ をご覧ください。

集約の使用が適している場面

アプリケーションは、リアルタイムでフィルタリング、グループ化、集計する必要があるデータを ElastiCache に保存することがよくあります。例えば、E コマースプラットフォームでは、カタログ全体にわたってカテゴリ別の平均評価や商品数を計算します。ストリーミングサービスでは、ジャンル別の総視聴数、平均視聴時間、再生数上位の作品を算出し、トレンドフィードやレコメンデーションランキングを構築します。金融サービスでは、ユーザーや時間枠ごとに取引をグループ化して合計を計算し、しきい値違反を検出し、コンプライアンスレポートを生成します。アプリケーションは、ユーザー向けのエクスペリエンス、ライブ分析、運用レポートを支えるためにこのデータをリアルタイムに分析する必要があり、古いデータや遅い結果はユーザーエクスペリエンスを低下させます。デバッグや単発の調査のためにアドホックなクエリが必要な開発者は、別の分析レイヤーを設定したり、データをアプリケーションレイヤーにエクスポートしたりすることなく、ライブデータに対して直接集計を実行することもできます。集計は、次の 3 つの一般的なユースケースをサポートします。

カタログフィルタリングのためのファセット検索: E コマースプラットフォームでは、買い物客がブラウジングする際に、各フィルタの組み合わせに一致する商品数を表示します。買い物客がカテゴリや価格帯を選択すると、UI は残りのすべてのフィルタ値のカウントを即座に更新します。1 つの集計クエリで、一致するカタログをブランド、色、または評価ごとにグループ化し、グループごとのカウントを返すため、アプリケーションは事前計算や古いカウントのキャッシュなしに正確なファセット数を表示できます。集計はインメモリで直接実行されるため、数百万の商品にまたがる場合でも、これらのカウントはマイクロ秒のレイテンシで返されます。

リアルタイムのトレンドとランキング: ゲームプラットフォーム、ストリーミングサービス、マーケットプレイスでは、ライブのエンゲージメント指標に基づいてトレンドコンテンツや上位ランカーを表示します。従来、これにはスケジュールに従ってランキングを再計算するバックグラウンドジョブが必要で、データの陳腐化を招いていました。あるいは、大量の結果セットに対するアプリケーション側でのソートが必要で、レイテンシーが増加していました。単一の集計クエリで、コンテンツをカテゴリーごとにグループ化し、総視聴回数、エンゲージメントスコア、プレイヤーランクを計算して、上位の結果を返すことができます。インデックスは書き込み時に同期的に更新されるため、集計クエリはポーリング、キャッシュ無効化、定期的な再計算を行うことなく最新のデータを反映します。

運用レポートと分析: ElastiCache を高速アクセスのために利用するアプリケーションでは、同じデータに対する運用分析やレポートが必要になることがよくあります。例えば、セッションストアではデバイスごとの平均セッション時間を計算し、E コマースプラットフォームではステータスやフルフィルメント段階ごとの注文量を計算します。Aggregations は、別途の分析用クラスターをプロビジョニングしたり ETL パイプラインを維持したりすることなく、スケジュールに基づいて、またはオンデマンドで、これらのクエリをインメモリデータに対して直接実行します。

ElastiCache を使ったファセット検索とリアルタイム分析の構築

これらの機能を組み合わせて実演するために、メディアストリーミングプラットフォームである AnyOrganization 向けに、ファセットブラウジングと分析エンジンを構築します。AnyOrganization はコンテンツカタログを ElastiCache にハッシュキーとして保存しており、各映画タイトルにはジャンル、言語、スタジオ、評価、リアルタイムの視聴回数といったメタデータが含まれています。以下のコードでは、このデータに対して 3 つのクエリパターンを構築します。ファセットフィルタリング、ライブトレンドアイテム、そしてスタジオレベルのエンゲージメントレポートです。

前提条件

この記事の例では、Python と valkey-py クライアントライブラリを使用します。手順を実行するには、以下が必要です (所要時間の目安: 30 分):

この投稿の完全なサンプルコードは、Amazon ElastiCache samples GitHub リポジトリで入手できます。

git clone https://github.com/aws-samples/amazon-elasticache-samples.git
cd amazon-elasticache-samples/blogs/aggregations-blog

ElastiCache for Valkey クラスターのセットアップ

集計機能を利用するための ElastiCache クラスターは、AWS Management Console または AWS CLI で作成できます。集計機能は ElastiCache version 9.0 for Valkey 以降で利用可能です。以下の例では CLI を使用しています。

aws elasticache create-replication-group \
    --replication-group-id AnyOrganization-cache \
    --replication-group-description "AnyOrganization Valkey cluster" \
    --engine valkey \
    --engine-version 9.0 \
    --transit-encryption-enabled \
    --cache-node-type cache.r7g.large \
    --num-node-groups 2 \
    --replicas-per-node-group 1 \
    --multi-az-enabled \
    --automatic-failover-enabled

# --transit-encryption-enabled が設定されている場合、Python クライアント接続に
# ssl=True を追加します:
#   client = valkey.ValkeyCluster(..., ssl=True)

データへのインデックス作成

ElastiCache に保存されているデータに対して、catalog_index というインデックスを作成します。Genrelanguagestudio は、ファセットフィルタリング用の完全一致タグとしてインデックス化されます。Release_yearratingviews_24h は、範囲フィルタやソート用の数値フィールドとしてインデックス化されます。タイトルは、キーワード、プレフィックス、あいまい一致をサポートする全文検索可能なフィールドとしてインデックス化されます。

以下のコードは、valkey-py の search モジュールを使用して Valkey Search コマンドを構築し送信します。各 Python メソッド呼び出しは、ネットワーク越しに送信される Valkey コマンドに直接対応しています。例えば、client.ft("catalog_index").create_index(...)FT.CREATE コマンドを送信し、client.ft("catalog_index").aggregate(req)FT.AGGREGATE コマンドを送信します。各コードブロックの横に、対応する Valkey コマンドを示しています。

import valkey
from valkey.commands.search.field import TextField, TagField, NumericField
from valkey.commands.search.indexDefinition import IndexDefinition, IndexType

# : Insert your ElastiCache cluster's discovery endpoint
VALKEY_CLUSTER_ENDPOINT = "placeholder_cluster.cnxa6h.clustercfg.use1.cache.amazonaws.com"

client = valkey.ValkeyCluster(
    host=VALKEY_CLUSTER_ENDPOINT,
    port=6379,
    decode_responses=True,
    ssl=True)

client.ft("catalog_index").create_index(fields=[
        TextField("title"),
        TagField("genre"),
        TagField("language"),
        TagField("studio"),
        NumericField("release_year"),
        NumericField("rating"),
        NumericField("views_24h")],
    definition=IndexDefinition(prefix=["title:"],index_type=IndexType.HASH))

同等の Valkey コマンド:

FT.CREATE catalog_index ON HASH PREFIX 1 title: SCHEMA
title TEXT genre TAG language TAG studio TAG
release_year NUMERIC rating NUMERIC views_24h NUMERIC

ElastiCache for Valkey ストアにカタログデータを投入します。本記事では、ElastiCache GitHub リポジトリのサンプルデータを使用しますが、他のデータソースを使用することもできます。

import csv
import urllib.request
import io
import time

response = urllib.request.urlopen(
"https://raw.githubusercontent.com/aws-samples/amazon-elasticache-samples/main/blogs/aggregations-blog/catalog_data.csv")
reader = csv.DictReader(io.TextIOWrapper(response))

count = 0
for row in reader:
key = row.pop("id")
client.hset(key, mapping=row)
count += 1

print(f"Loaded {count} records")

インデックスはデータをロードする前でも後でも作成できます。プレフィックスに一致するキーが既に存在する場合、Valkey Search はそれらを自動的にインデックスにバックフィルします。

ファセットフィルター

以下の集計は、ユーザーが選択したフィルターを受け取り、一致する結果をジャンル、言語、評価、公開年でグループ化し、各グループのタイトル数を返します。これにより、UI は結果と並べてファセットの件数を表示できます。

from valkey.commands.search.aggregation import AggregateRequest, Desc
from valkey.commands.search import reducers

def get_facet_counts(filters):
    # Build query string from user-selected filters
    clauses = []
    if "genre" in filters:
        clauses.append(f"@genre:{{{filters['genre']}}}")
    if "language" in filters:
        clauses.append(f"@language:{{{filters['language']}}}")
    if "min_rating" in filters:
        clauses.append(f"@rating:[{filters['min_rating']} + inf]")
    query = " ".join(clauses) if clauses else "@rating:[-inf + inf]"

    # Run an aggregation for each facet dimension
    dimensions = ["genre", "language", "rating"]
    facets = {}
    for dim in dimensions:
        req = AggregateRequest(query) \
            .load(f"@{dim}") \
            .group_by(f"@{dim}", reducers.count().alias("count"))
        facets[dim] = client.ft("catalog_index").aggregate(req).rows
    return facets

# Example: user filters for dramas in english, get counts for each dimension
facets = get_facet_counts({"genre": "drama", "language": "english"})

# Example output:
# {'genre': [{'genre': 'drama', 'count': '6'}],
#  'language': [{'language': 'english', 'count': '6'}],
#  'rating': [{'rating': '4', 'count': '4'},
#             {'rating': '5', 'count': '2'}]}

同等の Valkey コマンド (1 つのファセットディメンション、英語のドラマでフィルタリングする場合):

FT.AGGREGATE catalog_index "@genre:{drama} @language:{english}"
    LOAD 1 @genre
    GROUPBY 1 @genre
    REDUCE COUNT 0 AS count

リアルタイムで急上昇中のアイテム

以下のコードは、ジャンルごとのトップトレンドタイトルを取得します。これは、ユーザーがコンテンツを視聴するとリアルタイムで更新される views_24h フィールドに対する集計によって実現されています。

def get_trending_by_genre(limit=10):
    # Get the highest view count per genre
    # sorted by most popular genre first
    req = AggregateRequest("@rating:[-inf + inf]") \
        .load("@genre", "@views_24h") \
        .group_by("@genre", reducers.max("@views_24h").alias("max_views")) \
        .sort_by(Desc("@max_views"), max=limit)
    return client.ft("catalog_index").aggregate(req).rows

trending_by_genre = get_trending_by_genre()

# Example output:
# [{'genre': 'action', 'max_views': '4500'},
#  {'genre': 'comedy', 'max_views': '3800'},
#  {'genre': 'thriller', 'max_views': '3600'},
#  {'genre': 'sci-fi', 'max_views': '3400'},
#  {'genre': 'drama', 'max_views': '3200'},
#  {'genre': 'animation', 'max_views': '3100'},
#  {'genre': 'romance', 'max_views': '2800'},
#  {'genre': 'horror', 'max_views': '2600'},
#  {'genre': 'documentary', 'max_views': '1900'}]

同等の Valkey コマンド (1 つのファセットディメンションで、英語のドラマをフィルタリングする場合):

FT.AGGREGATE catalog_index "@rating:[-inf + inf]"
    LOAD 2 @genre @views_24h
    GROUPBY 1 @genre
    REDUCE MAX 1 @views_24h AS max_views
    SORTBY 2 @max_views DESC MAX 10

リアルタイムトレンドアイテム

以下のコードは、ジャンルごとにトレンドの上位タイトルを取得するもので、ユーザーがコンテンツを視聴するとリアルタイムに更新される views_24h フィールドに対する集計によって実現されています。

def get_trending_by_genre(limit=10):
    # Get the highest view count per genre
    # sorted by most popular genre first
    req = AggregateRequest("@rating:[-inf + inf]") \
        .load("@genre", "@views_24h") \
        .group_by("@genre", reducers.max("@views_24h").alias("max_views")) \
        .sort_by(Desc("@max_views"), max=limit)
    return client.ft("catalog_index").aggregate(req).rows

trending_by_genre = get_trending_by_genre()

# Example output:
# [{'genre': 'action', 'max_views': '4500'},
#  {'genre': 'comedy', 'max_views': '3800'},
#  {'genre': 'thriller', 'max_views': '3600'},
#  {'genre': 'sci-fi', 'max_views': '3400'},
#  {'genre': 'drama', 'max_views': '3200'},
#  {'genre': 'animation', 'max_views': '3100'},
#  {'genre': 'romance', 'max_views': '2800'},
#  {'genre': 'horror', 'max_views': '2600'},
#  {'genre': 'documentary', 'max_views': '1900'}]

同等の Valkey コマンド:

FT.AGGREGATE catalog_index "@rating:[-inf + inf]"
    LOAD 2 @genre @views_24h
    GROUPBY 1 @genre
    REDUCE MAX 1 @views_24h AS max_views
    SORTBY 2 @max_views DESC MAX 10

オンデマンドエンゲージメントレポート

AnyOrganization は、制作スタジオ別のコンテンツパフォーマンスを測定するために、日次のレポートジョブを実行しています。次のコードは、同じインデックスに対する集計を使用して、タイトル数、平均評価、総エンゲージメントなどのスタジオレベルのメトリクスを計算します。

def get_studio_report():
    # Studio performance: title count, average rating, total 24h views
    req = AggregateRequest("@rating:[-inf + inf]") \
        .load("@studio", "@rating", "@views_24h") \
        .group_by("@studio", reducers.count().alias("title_count"),
                             reducers.avg("@rating").alias("avg_rating"),
                             reducers.sum("@views_24h").alias("total_views")) \
        .sort_by(Desc("@total_views"))
    return client.ft("catalog_index").aggregate(req).rows

studio_report = get_studio_report()

# Example output:
# [{'studio': 'StreamFlix Originals', 'title_count': '18',
#   'avg_rating': '4.3333333333', 'total_views': '46200'},
#  {'studio': 'Summit Pictures', 'title_count': '13',
#   'avg_rating': '3.8461538462', 'total_views': '30000'},
#  {'studio': 'Crimson Studios', 'title_count': '11',
#   'avg_rating': '4.4545454545', 'total_views': '23100'},
#  {'studio': 'Emerald Films', 'title_count': '8',
#   'avg_rating': '4', 'total_views': '13600'}]

同等の Valkey コマンド:

FT.AGGREGATE catalog_index "@rating:[-inf + inf]"
    LOAD 3 @studio @rating @views_24h
    GROUPBY 1 @studio
        REDUCE COUNT 0 AS title_count
        REDUCE AVG 1 @rating AS avg_rating
        REDUCE SUM 1 @views_24h AS total_views
    SORTBY 2 @total_views DESC

ベストプラクティス

集計クエリのレイテンシーとスループットを改善するには、各パイプラインステージを通過するドキュメント数を減らすために早い段階でフィルタリングします。マッチする範囲が広いクエリは、パイプラインに入るキーの数が増え、初期スキャンと初期ステージのコストが増加します。例えば、上記のファセットフィルタリングの例では、ユーザーのアクティブなフィルターをクエリ文字列で渡すことで、マッチするドキュメントのみが GROUPBY ステージに入ります。また、しきい値を満たさないグループを削除するために GROUPBY ステージの後に FILTER を追加することもできます。例えば、結果を返す前にタイトル数が 5 未満のジャンルを除外する場合などです。さらに、上位の結果のみが必要な場合は、SORTBYMAX を追加することで、トレンドアイテムの例で示すように、エンジンはワーキングセット全体をソートするのではなく、上位の結果のみを追跡します。

LOAD を使うと、インデックスに含まれていないフィールドであっても、基となるハッシュデータから直接フィールドを取得して集約パイプラインに取り込むことができます。例えば、ハッシュに actors フィールドを保存しているがインデックス化していない場合、クエリ実行時に LOAD で読み込み、それを使ってグループ化やソートを行うことができます。ただし、LOAD は一致するドキュメントごとに基となるキーから生データを取得する必要があるため、結果セットのサイズに応じてレイテンシーが増加します。このオーバーヘッドを避けるため、ロードするフィールドの数は最小限に抑えてください。

クリーンアップ

このウォークスルーのために ElastiCache クラスターを作成し、不要になった場合は、今後の課金を避けるために、次の AWS CLI コマンドを使用してクラスターを削除してください。

aws elasticache delete-replication-group --replication-group-id AnyOrganization-cache

はじめに

この記事では、ElastiCache のアグリゲーションについて、ファセットフィルタリング、ライブトレンドのレコメンデーション、オンデマンドのエンゲージメントレポートを取り上げ、これらすべてを単一の Valkey Search インデックス上に構築する方法を解説しました。アグリゲーションは、すべての商用 AWS リージョン、AWS GovCloud (US) リージョン、および中国リージョンにおいて、ElastiCache for Valkey バージョン 9.0 を実行するノードベースのクラスターで追加費用なしでご利用いただけます。Valkey は、Redis に代わる最も寛容なライセンスのオープンソースかつベンダー中立な選択肢であり、ElastiCache で推奨されるエンジンです。使い始めるには、AWS Management Console、AWS SDK、または AWS CLI を使用して、Valkey 9.0 以降の新しいクラスターを作成するか、既存のクラスターをアップグレードしてください。詳細については、ElastiCache のドキュメントをご覧ください。質問やフィードバックがある場合は、AWS re:Post for ElastiCache をご覧ください。

著者について

Chaitanya Nuthalapati

Chaitanya Nuthalapati

Chaitanya は AWS インメモリデータベースサービスのシニアテクニカルプロダクトマネージャーで、Amazon ElastiCache for Valkey に注力しています。以前は、生成 AI、機械学習、グラフネットワークを活用したソリューションを構築していました。仕事以外の時間では、Chaitanya は趣味を集めるのに忙しく、現在はテニス、スケートボード、パドルボードを楽しんでいます。

Karthik Subbarao

Karthik Subbarao

Karthik は Amazon ElastiCache のシニアソフトウェアエンジニアであり、オープンソースの Valkey プロジェクトに積極的に貢献しています。分散システム、データベース、Rust、そして全般的にソフトウェア開発/テクノロジーを通じたイノベーションに情熱を注いでいます。

Allen Samuels

Allen Samuels

Allen は AWS のプリンシパルエンジニアです。分散型で高性能なシステムに情熱を注いでいます。世界中を旅したりデュプリケートブリッジをプレイしたりしていない時は、カリフォルニア州サンノゼで過ごしています。

Siva Subramaniam

Siva Subramaniam

Siva は AWS のシニアソリューションアーキテクトで、技術リーダーシップとデータベースアーキテクチャにおいて 20 年の経験を持っています。お客様が AWS の専用データベースを使用して移行とイノベーションを実現できるよう支援しています。仕事以外では、Siva はクリケット、農作業、そして妻から料理を学ぶことを楽しんでいます。

ご指導いただいた Ian Childress 氏、そしてプロジェクト全体を通じて実装面で貢献いただいた Miles Song 氏に特に感謝いたします。


本記事は、Announcing aggregations on Amazon ElastiCache を翻訳したものです。翻訳は Solutions Architect の Hayato Tsutsumi が担当しました。