AWS Database Blog

Announcing aggregations on Amazon ElastiCache

Amazon ElastiCache now supports aggregation queries, making it easier to filter, group, transform, and summarize data directly in your cache with a single query. You can use aggregation queries to build real-time application experiences with latencies as low as microseconds over terabytes of data and results reflecting completed writes. Analyze the data where it already lives, at the speed your application demands, without a separate analytics layer. You can use aggregations to build real-time leaderboards, faceted catalog browsing, operational reporting, and exploratory analytical queries on data already stored in ElastiCache.

By running aggregations directly in-memory within ElastiCache, you can reduce architectural complexity and improve response times. Aggregation queries perform computations on the server, so applications can analyze data in place and return only the final summary. For example, a single aggregation query can filter a product catalog to get data from a specific category, group results by brand, compute the average rating for each brand, and return only the top ten performers. You can build these queries by chaining stages such as GROUPBY, REDUCE, APPLY, FILTER, SORTBY, and LIMIT into a pipeline, where the output of each stage feeds into the next. You can combine these stages in the order you choose and repeat them to build multi-step analytical workflows in a single command. Aggregations provide read-after-write consistency on primaries, so results reflect completed writes and scale across shards without any client code changes. This post walks through the use cases that aggregations unlock, and shows how they work by building a faceted browsing engine using ElastiCache for Valkey.

These aggregation capabilities are available in ElastiCache version 9.0 for Valkey, alongside full-text, exact-match, range, and vector search capabilities (see Full-text, exact-match, range, and hybrid search on ElastiCache). ElastiCache version 9.0 for Valkey also introduces hash field expiration for fine-grained TTL control on individual fields and up to 40% higher pipelined throughput. For the full release details, see the Announcing Valkey 9.0 for ElastiCache.

When to use aggregations

Applications often store data on ElastiCache that they need to filter, group, and summarize in real time. For example, e-commerce platforms compute average ratings and product counts by category across their catalog. Streaming services compute total views, average watch time, and top performers by genre to power trending feeds and recommendation rankings. Financial services group transactions by users and time window to compute totals, flag threshold breaches, and generate compliance reports. Applications need to analyze this data in real time to power user-facing experiences, live analytics, and operational reporting where stale or slow results degrade the user experience. Developers that need on-the-fly queries for debugging or one-off investigations can also run aggregations directly against live data without setting up a separate analytics layer or exporting data to the application layer. Aggregations support three common use cases:

Faceted search for catalog filtering: E-commerce platforms show shoppers how many products match each filter combination as they browse. When a shopper selects a category or price range, the UI updates counts for every remaining filter value instantly. A single aggregation query groups the matching catalog by brand, color, or rating and returns counts per group, so the application displays accurate facet numbers without pre-computing or caching stale counts. Because aggregations run directly in-memory, these counts return with microsecond latency even across millions of products.

Real-time trends and rankings: Gaming platforms, streaming services, and marketplaces surface trending content and top performers based on live engagement metrics. Traditionally this requires background jobs that recompute rankings on a schedule, introducing staleness, or application-side sorting over large result sets, which adds latency. A single aggregation query can group content by category, compute total views, engagement scores or player ranks, and return the top results. Because indexes update synchronously on writes, aggregation queries reflect the latest data with no polling, cache invalidation, or scheduled recomputation.

Operational reporting and analytics: Applications that use ElastiCache for fast access often need operational analytics and reporting on the same data. For example, session stores compute average session duration by device, and e-commerce platforms compute order volumes by status and fulfillment stage. Aggregations run these queries directly against in-memory data on a schedule or on demand, without provisioning a separate analytics cluster or maintaining an ETL pipeline.

Building faceted search and real-time analysis with ElastiCache

To demonstrate these capabilities together, we build a faceted browsing and analytics engine for AnyOrganization, a media streaming platform. AnyOrganization stores its content catalog in ElastiCache as hash keys, with each movie title containing metadata like genre, language, studio, ratings, and real-time view counts. The following code builds three query patterns on this data: faceted filtering, live trending items, and studio-level engagement reporting.

Prerequisites

The examples in this post use Python with the valkey-py client library. To follow along, you need the following (estimated time: 30 minutes) :

The complete sample code for this post is available in the Amazon ElastiCache samples GitHub repository.

git clone https://github.com/aws-samples/amazon-elasticache-samples.git 
cd amazon-elasticache-samples/blogs/aggregations-blog

Set up an ElastiCache for Valkey cluster

You can create an ElastiCache cluster for using aggregations with the AWS Management Console or the AWS CLI. Aggregations are available for ElastiCache version 9.0 for Valkey or later. The following example uses the CLI.

aws elasticache create-replication-group \
    --replication-group-id AnyOrganization-cache \
    --replication-group-description "AnyOrganization Valkey cluster" \
    --engine valkey \
    --engine-version 9.0 \
    --transit-encryption-enabled \
    --cache-node-type cache.r7g.large \
    --num-node-groups 2 \
    --replicas-per-node-group 1 \
    --multi-az-enabled \
    --automatic-failover-enabled 
    
# If --transit-encryption-enabled is set, add ssl=True to the
# Python client connection:
#   client = valkey.ValkeyCluster(..., ssl=True)

Create an index on the data

Create an index called catalog_index over the data stored in ElastiCache. Genre, language, and studio are indexed as exact-match tags for faceted filtering. Release_year, rating, and views_24h are indexed as numeric fields for range filters and sorting. The title is indexed as a full-text searchable field that supports keyword, prefix, and fuzzy matching.

The following code uses valkey-py’s search module to build and send Valkey Search commands. Each Python method call translates directly to a Valkey command sent over the wire. For example, client.ft("catalog_index").create_index(...) sends the FT.CREATE command, and client.ft("catalog_index").aggregate(req) sends the FT.AGGREGATE command. The equivalent Valkey commands are shown alongside each code block.

import valkey
from valkey.commands.search.field import TextField, TagField, NumericField
from valkey.commands.search.indexDefinition import IndexDefinition, IndexType

# <Input required>: Insert your ElastiCache cluster's discovery endpoint
VALKEY_CLUSTER_ENDPOINT = "placeholder_cluster.cnxa6h.clustercfg.use1.cache.amazonaws.com"

client = valkey.ValkeyCluster(
    host=VALKEY_CLUSTER_ENDPOINT,
    port=6379,
    decode_responses=True,
    ssl=True)

client.ft("catalog_index").create_index(fields=[
        TextField("title"),
        TagField("genre"),
        TagField("language"),
        TagField("studio"),
        NumericField("release_year"),
        NumericField("rating"),
        NumericField("views_24h")],
    definition=IndexDefinition(prefix=["title:"],index_type=IndexType.HASH))

Equivalent Valkey command:

FT.CREATE catalog_index ON HASH PREFIX 1 title: SCHEMA
title TEXT genre TAG language TAG studio TAG
release_year NUMERIC rating NUMERIC views_24h NUMERIC

Populate the ElastiCache for Valkey store with the catalog data. For the purposes of this post, this example uses sample data from the ElastiCache GitHub repository, but you can use other data sources.

import csv
import urllib.request
import io
import time

response = urllib.request.urlopen(
 "https://raw.githubusercontent.com/aws-samples/amazon-elasticache-samples/main/blogs/aggregations-blog/catalog_data.csv")
reader = csv.DictReader(io.TextIOWrapper(response))

count = 0
for row in reader:
 key = row.pop("id")
 client.hset(key, mapping=row)
 count += 1

print(f"Loaded {count} records")

You can create the index before or after loading data. If keys matching the prefix already exist, Valkey Search backfills them into the index automatically.

Faceted filters

The following aggregation takes the user’s selected filters, groups the matching results by genre, language, rating, and release year and returns the count of titles in each group so the UI can display facet counts alongside the results.

from valkey.commands.search.aggregation import AggregateRequest, Desc
from valkey.commands.search import reducers

def get_facet_counts(filters):
    # Build query string from user-selected filters
    clauses = []
    if "genre" in filters:
        clauses.append(f"@genre:{{{filters['genre']}}}")
    if "language" in filters:
        clauses.append(f"@language:{{{filters['language']}}}")
    if "min_rating" in filters:
        clauses.append(f"@rating:[{filters['min_rating']} +inf]")
    query = " ".join(clauses) if clauses else "@rating:[-inf +inf]"

    # Run an aggregation for each facet dimension
    dimensions = ["genre", "language", "rating"]
    facets = {}
    for dim in dimensions:
        req = AggregateRequest(query) \
            .load(f"@{dim}") \
            .group_by(f"@{dim}", reducers.count().alias("count"))
        facets[dim] = client.ft("catalog_index").aggregate(req).rows
    return facets

# Example: user filters for dramas in english, get counts for each dimension
facets = get_facet_counts({"genre": "drama", "language": "english"})

# Example output:
# {'genre': [{'genre': 'drama', 'count': '6'}],
#  'language': [{'language': 'english', 'count': '6'}],
#  'rating': [{'rating': '4', 'count': '4'},
#             {'rating': '5', 'count': '2'}]}

Equivalent Valkey command (for one facet dimension, filtering for English dramas):

FT.AGGREGATE catalog_index "@genre:{drama} @language:{english}"
    LOAD 1 @genre
    GROUPBY 1 @genre
    REDUCE COUNT 0 AS count

Live trending items

The following code retrieves the top trending title per genre, powered by aggregations over the views_24h field that updates in real time as users watch content.

def get_trending_by_genre(limit=10):
    # Get the highest view count per genre
    # sorted by most popular genre first
    req = AggregateRequest("@rating:[-inf +inf]") \
        .load("@genre", "@views_24h") \
        .group_by("@genre", reducers.max("@views_24h").alias("max_views")) \
        .sort_by(Desc("@max_views"), max=limit) 
    return client.ft("catalog_index").aggregate(req).rows
    
trending_by_genre = get_trending_by_genre()

# Example output:
# [{'genre': 'action', 'max_views': '4500'},
#  {'genre': 'comedy', 'max_views': '3800'},
#  {'genre': 'thriller', 'max_views': '3600'},
#  {'genre': 'sci-fi', 'max_views': '3400'},
#  {'genre': 'drama', 'max_views': '3200'},
#  {'genre': 'animation', 'max_views': '3100'},
#  {'genre': 'romance', 'max_views': '2800'},
#  {'genre': 'horror', 'max_views': '2600'},
#  {'genre': 'documentary', 'max_views': '1900'}]

Equivalent Valkey command (for one facet dimension, filtering for English dramas):

FT.AGGREGATE catalog_index "@rating:[-inf +inf]"
    LOAD 2 @genre @views_24h
    GROUPBY 1 @genre
    REDUCE MAX 1 @views_24h AS max_views
    SORTBY 2 @max_views DESC MAX 10

Live trending items

The following code retrieves the top trending title per genre, powered by aggregations over the views_24h field that updates in real time as users watch content.

def get_trending_by_genre(limit=10):
    # Get the highest view count per genre
    # sorted by most popular genre first
    req = AggregateRequest("@rating:[-inf +inf]") \
        .load("@genre", "@views_24h") \
        .group_by("@genre", reducers.max("@views_24h").alias("max_views")) \
        .sort_by(Desc("@max_views"), max=limit) 
    return client.ft("catalog_index").aggregate(req).rows
    
trending_by_genre = get_trending_by_genre()

# Example output:
# [{'genre': 'action', 'max_views': '4500'},
#  {'genre': 'comedy', 'max_views': '3800'},
#  {'genre': 'thriller', 'max_views': '3600'},
#  {'genre': 'sci-fi', 'max_views': '3400'},
#  {'genre': 'drama', 'max_views': '3200'},
#  {'genre': 'animation', 'max_views': '3100'},
#  {'genre': 'romance', 'max_views': '2800'},
#  {'genre': 'horror', 'max_views': '2600'},
#  {'genre': 'documentary', 'max_views': '1900'}]

Equivalent Valkey command:

FT.AGGREGATE catalog_index "@rating:[-inf +inf]"
    LOAD 2 @genre @views_24h
    GROUPBY 1 @genre
    REDUCE MAX 1 @views_24h AS max_views
    SORTBY 2 @max_views DESC MAX 10

On-demand engagement reporting 

AnyOrganization runs daily reporting jobs to measure performance of content by production studio. The following code computes studio-level metrics including title count, average rating, and total engagement, using aggregations over the same index.

def get_studio_report():
    # Studio performance: title count, average rating, total 24h views
    req = AggregateRequest("@rating:[-inf +inf]") \
        .load("@studio", "@rating", "@views_24h") \
        .group_by("@studio", reducers.count().alias("title_count"),
                             reducers.avg("@rating").alias("avg_rating"),
                             reducers.sum("@views_24h").alias("total_views")) \
        .sort_by(Desc("@total_views"))
    return client.ft("catalog_index").aggregate(req).rows

studio_report = get_studio_report()

# Example output:
# [{'studio': 'StreamFlix Originals', 'title_count': '18',
#   'avg_rating': '4.3333333333', 'total_views': '46200'},
#  {'studio': 'Summit Pictures', 'title_count': '13',
#   'avg_rating': '3.8461538462', 'total_views': '30000'},
#  {'studio': 'Crimson Studios', 'title_count': '11',
#   'avg_rating': '4.4545454545', 'total_views': '23100'},
#  {'studio': 'Emerald Films', 'title_count': '8',
#   'avg_rating': '4', 'total_views': '13600'}]

Equivalent Valkey command:

FT.AGGREGATE catalog_index "@rating:[-inf +inf]"
    LOAD 3 @studio @rating @views_24h
    GROUPBY 1 @studio
        REDUCE COUNT 0 AS title_count
        REDUCE AVG 1 @rating AS avg_rating
        REDUCE SUM 1 @views_24h AS total_views
    SORTBY 2 @total_views DESC

Best practices

To improve aggregation query latencies and throughput, filter early to reduce the number of documents flowing through each pipeline stage. A broader match in the query increases the number of keys entering the pipeline, adding cost to the initial scan and early stages. For example, the faceted filtering example above passes the user’s active filters in the query string so only matching documents enter the GROUPBY stage. You can also add FILTER after GROUPBY stages to prune groups that don’t meet a threshold, for example dropping genres with fewer than 5 titles before returning results. Further, when you only need the top results, add MAX to SORTBY so the engine tracks only the top results rather than sorting the entire working set, as the trending items example shows.

You can use LOAD to pull fields directly from the underlying hash data into the aggregation pipeline even if they aren’t part of the index. For example, if you store an actors field in your hash but didn’t index it, you can LOAD it at query time and then group or sort by it. However, LOAD requires fetching raw data from the underlying key for every matching document, adding latency that grows with result set size. Keep the number of loaded fields to a minimum to avoid this overhead.

Clean up

If you created an ElastiCache cluster for this walkthrough and no longer need it, delete the cluster using the following AWS CLI command to avoid incurring future charges:

aws elasticache delete-replication-group --replication-group-id AnyOrganization-cache

Getting started

In this post, we explored aggregations for ElastiCache, covering faceted filtering, live trending recommendations, and on-demand engagement reporting, all built on a single Valkey Search index. Aggregations are available in all commercial AWS Regions, AWS GovCloud (US) Regions, and China Regions, for node-based clusters running ElastiCache version 9.0 for Valkey at no additional cost. Valkey is the most permissive open source and vendor-neutral alternative to Redis and the recommended engine on ElastiCache. To get started, create a new cluster with Valkey 9.0 or later or upgrade an existing cluster using the AWS Management Console, AWS SDK, or AWS CLI. To learn more, visit the ElastiCache documentation. For questions and feedback, visit AWS re:Post for ElastiCache


About the authors

Chaitanya Nuthalapati

Chaitanya Nuthalapati

Chaitanya is a Senior Technical Product Manager in AWS In-Memory Database Services, focused on Amazon ElastiCache for Valkey. Previously, he built solutions with generative AI, machine learning, and graph networks. Off the clock, Chaitanya is busy collecting hobbies, which currently include tennis, skateboarding, and paddle-boarding.

Karthik Subbarao

Karthik Subbarao

Karthik is a Senior Software Engineer at Amazon ElastiCache and an active contributor to the open-source Valkey project. He is passionate about distributed systems, databases, Rust, and, in general, innovating through software development / technology.

Allen Samuels

Allen Samuels

Allen is a Principal Engineer at AWS. He is passionate about distributed, performant systems. When not traveling the world for pleasure or playing duplicate bridge, Allen can be found in San Jose, California.

Siva Subramaniam

Siva Subramaniam

Siva is a Senior Solutions Architect at AWS with two decades of experience in technical leadership and database architecture. He helps customers migrate and innovate with purpose-built databases on AWS. Outside of work, Siva enjoys playing cricket, farming, and learning to cook from his wife.

Special thanks to Ian Childress for his guidance, and to Miles Song for his hands-on engineering contributions throughout the project.