Redis Streams and Message Queues

Learn about Redis Streams and build reliable message queues.

This blog post is the next in the series of open-source Redis 5 articles. It discusses Redis Streams, a new feature in the open-source release of Redis 5. Its the second article on Redis Streams. You can read the first article on Redis Streams here.

Redis and message queues

“I usually talk about Redis,” said Salvatore Sanfilippo, creator of the in-memory database. Then, after a dramatic pause, added: “But not this time”.

That was three years ago.

Sanfilippo was addressing an audience in Paris for a conference on scalability, and his presentation was about Disque, a message broker he had just released. He was compelled to work on message queues because he disliked how Redis was being used for that purpose. Disque offered features and guarantees not found in Redis for dealing with messages and background jobs. Unfortunately, Disque never got enough traction and it was eventually discontinued.

Since then, Redis has changed substantially. It has incorporated the notion of modules —and indeed Disque may return at some point in the form of a Redis module— and it now has a stream data type, a key building block for message brokers.

Let's review what you could build with previous versions of Redis and preview what you would gain by using streams.

Message queues

A message queue is conceptually a list. A producer pushes an element from one side, a consumer reads from the other. Multiple producers and consumers can interact with the same queue. In Redis, a rudimentary message queue can be easily implemented with the commands LPUSH (which means "push from the left") and RPOP (which means "pop from the right"). In the best-case scenario —the happy path— the consumer pops an item, works on it, and once it's done, the customer is ready to consume and process the next item. A slight improvement would be to use a blocking command for reading. So instead of RPOP you could use BRPOP. If the list is empty, the consumer blocks and waits for an element to arrive. If the timeout elapses, the consumer can retry. So far, so good for this simplistic implementation. The problem, though, doesn't lie in the happy path. The issue is what happens when a process crashes while processing an item. 

Reliable queues

A queue is reliable if it can recover from a failure scenario. If a consumer crashes and the item it was processing is lost, the system is unreliable. A command was added to a previous version of Redis that is tailor-made for this exact situation. The command is BRPOPLPUSH. It not only pops an item, as discussed in the previous implementation, but it also pushes the item to another list. With the commands LPUSH and BRPOPLPUSH, you can design a reliable message queue:

- A producer pushes the number 42 to the list q1. 

LPUSH q1 42

- A consumer grabs an item from q1 while creating a backup at c1.

BRPOPLPUSH q1 c1 1000

-- If the consumer succeeds in processing the item, it clears the backup and moves onto the next item.

DEL c1
BRPOPLPUSH q1 c1 1000

-- If the consumer crashes, the system has a copy of the item in the backup queue c1 and can decide what to do next.

In the example, q1 stands for queue #1 and c1 stands for consumer #1. The idea is for consumers to have unique identifiers so that each consumer has its own backup queue. In a production implementation, the name of the backup queue for a given consumer can be formed by concatenating its host name and its PID. What should the system do with an item found in a backup queue once a consumer crashes?

  • Retry?
  • Report?
  • Ignore?

That's up to the application and depends on the particular use case. In any case, the queue should be agnostic when it comes to failure scenarios.

While you may end up with a reliable message queue, there are some other useful features that are more difficult to implement.

Disque and built-in reliability

Disque had many advantages over Redis in terms of reliability when it comes to queues. One of them was the fact that consumers had to acknowledge once an item had been successfully processed. If the server didn't receive the ACK command in time, the item could be enqueued. Or not: actually whatever happened with the item was user-defined, but the explicit acknowledgement mechanism was built right into Disque. For instance, when a consumer detected that it was taking too long to process an item, it was able to notify the server and postpone the timeout. In other words, with Disque there was no need to create backup queues because that use case was already handled.

Redis Streams and consumer groups

Redis 5 introduces the concept of consumer groups. The idea is that many consumers can agree on which items they will consume from a stream. Each consumer in a group has a unique identifier, and the server keeps track of which consumer fetched which item.

Consumers can send XACK commands to let the server know that an item was successfully processed. Consumers can also check the list of items that were retrieved but never acknowledged, and they can claim ownership over pending items. Consumer groups are fundamental for approaching the design decisions that were present in Disque.

Here's a step by step example of how to get started with streams and consumer groups.

First, let's create a stream at key s1:

XADD s1 * a 1

s1 is the key that will contain the stream.

* is a special placeholder that instructs Redis to create an ID for the item.

a is a field name.

1 is a field value.

Each message sent to a stream is a hash, so you can send multiple field-value pairs. Now you can create a consumer group. Let's call it g1:

XGROUP CREATE s1 g1 0

s1 is the name of the stream.

g1 is the name of the consumer group.

0 is the id of the first item you want to retrieve.

Instead of 0, another common value is $, which means you don't care about items already in the queue: you want every item arriving from now on. You can also specify a particular ID, one the application has already seen.

Now that you have the stream and the consumer group, you can start producing and consuming items. From the perspective of one consumer, it would run this command:

XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 >

g1 is the name of the consumer group.

c1 is a unique identifier for the consumer within the group.

COUNT 1 is the number of items you want to retrieve.

s1 is the name of the stream.

> is a special placeholder for a message ID, and it tells the server that you want to retrieve messages that were never delivered to consumers in this group.

Now that an item was fetched from the stream, you can check the list of pending items. The pending items are those that were retrieved by consumers but not yet acknowledged as processed.

XPENDING s1 g1

That's the basic syntax, and this is the response you will get:

1) (integer) 1 # Number of pending items
2) "1540835652651-0" # Smallest ID in the list of pending items
3) "1540835652651-0" # Largest ID in the list of pending items
4) 1) 1) "c1" # Consumer identifier
    2) "1" # Number of pending items for consumer "c1"

In this case, as there is only one pending item, you get the same value for the smallest and largest ID. With more pending items, you would get one entry for each consumer having pending items.

It is also possible to ask for a particular start and end ID, or ask for the list of pending items retrieved by a particular consumer. In those cases, you will get different information.

For example, another syntax for XPENDING allows you to send a range of IDs and a count of the items you want to retrieve:

XPENDING s1 g1 - + 10

s1 is the name of the stream.

g1 is the name of the consumer group.

- represents the smallest possible ID in a stream.

+ represents the largest possible ID in a stream.

10 is the number of items you want to retrieve.

And this is the result you will get:

1) 1) "1540835652651-0" # ID of the first item in the list
    2) "c1" # Consumer identifier of the owner
    3) (integer) 5129 # Idle time since it was claimed by "c1"
    4) (integer) 1 # Number of deliveries so far

The number of deliveries is interesting because it allows you to detect items that couldn't be processed after many retries. The behavior of that counter will be clear once we discuss the XCLAIM command. But first we will look at the happy path. Once the consumer has processed the item, it can run this other command:

XACK s1 g1 1540835652651-0

s1 is the name of the stream.

g1 is the name of the group.

1540835652651-0 is the ID of the processed item.

Once the server receives the acknowledgement from the consumer, it removes the item from the list of pending messages. Consumers can fetch more than one item at the same time, and they can also acknowledge more than one item ID with the same XACK command.

The list of pending items is what replaces the backup queues in the simple example we explored at the beginning of this article. If an item has been in the pending list for too long, a consumer may decide to claim the ownership and retry it. The proper way to do it would be for consumers to check periodically the list of pending items and examine the idle time of each item. If a consumer decides to take over an item, it can issue an XCLAIM command:

XCLAIM s1 g1 c2 300000 1540835652651-0

s1 is the name of the stream.

g1 is the name of the group.

c2 is the identifier of the consumer who wants to own the item.

300000 is the idle time threshold in milliseconds.

1540835652651-0 is the ID of the pending item.

The idle time threshold is the minimum elapsed time (in milliseconds) for an item to be claimed. In other words: if the item with ID 1540835652651-0 has an idle time of 300000 milliseconds or more, the server can change its ownership and assign it to the consumer with the identifier c2. When an item changes ownership, its idle time is reset.

Once a consumer has acquired the ownership of an item, it can read it with the XREADGROUP command:

XREADGROUP GROUP g1 c2 COUNT 1 STREAMS s1 0

The 0 is again a special ID. The server knows which item it has to deliver, so the consumer doesn't need to be precise, it can just send a placeholder.

Trimming the stream

It is a bad idea to have a stream that grows forever. As everything in Redis is stored in memory, a data structure that grows unbounded is very likely to eventually consume all the available resources. The best way to deal with growing data structures is for the application to decide what information it wants to keep.

When it comes to streams, there are two options. One is to call the XTRIM command:

XTRIM s1 MAXLEN 1000

It tells the server to keep only the most recent 1000 items. The items with lower IDs are then removed. If you don't need to be strict about the number of items to keep, you can use this alternative syntax:

XTRIM s1 MAXLEN ~ 1000

It tells the server you only need approximately 1000 items, and the server will wait for the right moment to remove a macro node. This approach lets the server manipulate the underlying data structure in the most efficient way.

The other option for trimming a stream is to send the MAXLEN argument when adding an item. For example, you can modify the original call to XADD to limit the size of the stream to approximately 1000 items:

XADD s1 MAXLEN ~ 1000 * a 1

The syntax for MAXLEN is the same in both XTRIM and XADD.

Conclusion

Redis Streams are a great building block for very diverse applications. In this case, you saw how to build reliable message queues, how to retry items that couldn't be processed, and how to limit the size of the stream. By comparing implementations with and without streams, the advantages of moving to this new data structure are evident. There are more options than what we were able to expose in this article, so make sure to explore the documentation. Hopefully, this introduction has piqued your interest.

Fully managed Redis on AWS

Amazon offers a fully managed Redis service, Amazon ElastiCache for Redis, available for trial at no cost with the AWS Free Tier. Amazon ElastiCache for Redis makes it easy to set up, operate, and scale Redis deployments in the cloud. With Amazon ElastiCache, you can deploy internet-scale Redis deployments in minutes, with cost-efficient and resizable hardware capacity. 

Get started with free Amazon ElastiCache for Redis in three easy steps:

Amazon Elasticache for Redis

Sign up

Get access to the Amazon ElastiCache Free Tier.
Learn Amazon Elasticache for Redis

Learn with simple tutorials

Explore how to create a Redis cluster.
Get Started with ElastiCache for Redis

Start building

Begin building with help from the user guide.