Learn about Redis Streams and build history preserving message brokers, message queues, unified logs, and chat systems.
This blog post is the next in the series of open-source Redis 5 blogs. It discusses Redis Streams, a new feature in the open-source release of Redis 5.
You probably remember the first time you connected to an Internet Relay Chat (IRC). It felt intimate. It was futuristic and low tech at the same time. Joining a channel was like entering a room where people had been discussing for hours, only that you had no idea what the topic was. You would eventually catch up and join the conversation, but once you left, you would have no way of knowing how the conversation evolved. Like in real life.
IRC hasn't changed much over the years, but modern collaboration tools don't let you miss a single sentence. They keep logs for each conversation, so that you can catch up with the discussion or return to see what you missed after a previous session. That difference between old and modern chat tools is the analogy we will use to compare two Redis features: Pub/Sub and Streams.
Redis incorporated the publish-subscribe pattern in version 2.0.0. The pattern allows clients to subscribe to one or more channels and receive messages as long as they are connected. Other clients act as publishers who send their messages to one or more channels. If it sounds similar to IRC, it's because IRC also implements the publish-subscribe pattern.
If you need to keep a history, then the publish-subscribe pattern as implemented in Redis with PUBLISH/SUBSCRIBE is not enough. Sure, Redis is a data structures server and you can combine the messaging from Pub/Sub with lists or hashes to build the basic solution you need, but starting with version 5.0.0 you can use Streams, a new datatype implemented with a log data structure and a set of very flexible commands.
A very basic introduction to Redis Streams
Conceptually, a Stream in Redis is a list where you can append entries. Each entry has a unique ID and a value. The ID is auto-generated by default, and it includes a timestamp. The value is a hash. You can query ranges or use blocking commands to read entries as they come. Typical of Redis, you can combine different ingredients to get the result you need. As Niklaus Wirth once said, programs are algorithms plus data structures, and Redis already gives you a bit of both.
Redis streams are ideal for building history preserving message brokers, message queues, unified logs, and chat systems. Unlike Pub/Sub messages which are fire and forget, Redis streams preserve messages in perpetuity. Redis streams implement consumer groups, a feature that allows a group of clients to cooperate when consuming elements from a stream. For example, consumers in a group can lookup items by ID, have to acknowledge the processing of an item, or claim ownership of a pending message. We will explore consumer groups in a future blog post.
Chatting with Redis
Here's one way of sending messages to a channel on IRC:
/notice #foo hey!
And this is how you publish a message in Redis:
PUBLISH #foo "hey!"
It looks extremely similar and the behavior is almost the same. Now, if you want to read messages sent to an IRC channel, you need to join it first:
Likewise if you want to read messages published in Redis:
Still very similar. In both cases, clients can read incoming messages as long as they are connected. If they want to stop receiving messages, they have commands for that too. The IRC version:
The Redis counterpart:
Note that the name #foo is significant in IRC, but not in Redis. You could have used any other string.
Now, let's implement the same behavior with Streams. Publishing a message would look like this:
XADD #foo * msg "hey!"
What's going on here? There's a lot to explain, so let's get started:
XADD is the command we have to use for creating a stream and pushing entries.
#foo is the name of a Redis key that will represent the stream. The name is arbitrary, and we are using #foo just to mimic the channel names on IRC.
* is a special value that takes the place of an ID. It tells Redis to generate a new ID greater than any previous one.
msg is a field. As the value of an entry is a hash, you have to send at least one field and one value. In this example, we chose msg as the field.
"hey!" is the value for msg.
The command looks overly complex for this simple use case. That's because it's very flexible, as you will see below. For now, let's move on to reading messages:
XREAD BLOCK 1000 STREAMS #foo $
The command itself doesn’t make much sense until you break it down to its components:
XREAD is the command for fetching entries.
BLOCK 1000 means the client will block if there are no entries and it will time out after 1000 milliseconds if nothing comes in.
STREAMS is a directive that accepts a list of keys followed by a list of IDs. In our example, we are sending one key and one pseudo ID. Read below.
#foo is the name of a Redis key that will represent the stream.
$ is a pseudo ID that represents any ID created after our command blocked. It means we want to ignore any previous entries in the stream, and we will focus only on entries that arrive from now on.
Whether you get an entry or the command times out, you need to call XREAD again in order to fetch more entries. That means XREAD will usually be called inside a loop. If you want to stop reading entries, you just have to break out of the loop.
You have now seen how to get the same result using IRC, Pub/Sub and Streams. There are other uses for Streams, and incidentally you have also seen how to use a stream as a log: push entries, then use the XREAD command to read the entries as they come in. The version we provided is the equivalent of using tail -f on a file. (If you are not familiar with tail -f, run man tail for more info).
History preserving chat
Now we can model a modern chat application that lets you catch up with previous conversations. In this version, each client will be able to read all the messages, regardless if the client was connected when the messages arrived on the stream.
If you want to implement a chat where people can see previous conversations, as well as new messages when they arrive, you need the clients to keep track of the last message seen. In this case, clients need to remember the ID of the last entry they read. What a great opportunity to explore Stream IDs. You need to know how they are created and how they can be used.
Let's go back to the command for creating an entry:
XADD #foo * msg "hey!"
If you feed it to Redis 5 you will get this reply:
That's the new entry's ID, and it follows this format:
The first part is a timestamp in milliseconds, the second part is a sequence number. If Redis receives more than one XADD command during the same millisecond, the sequence number is incremented. That guarantees that no two entries will have the same ID, and also that any new ID will be greater than all previous IDs.
Now you can revisit the XREAD command from the point of view of a history-aware client. Let's say your client is connecting for the first time and it wants to read all previous messages. The command will look as follows:
XREAD STREAMS #foo 0-0
This time the client doesn't block; it just asks for all entries in #foo with IDs great than 0-0. Just to clarify, 0-0 is not a valid ID: even if you create your own IDs instead of letting Redis generate them, Redis will reject 0-0 because it's a special value. And because its use is frequent, Redis accepts 0 as a shorthand, so you can as well write the command like this:
XREAD STREAMS #foo 0
And the result you get from Redis:
1) 1) "#foo"
2) 1) 1) 1536495531827-0
2) 1) "msg"
That's what you will see if you use redis-cli. If you call the same command from, say, Python, you get this structure:
>> r.execute_command("XREAD", "STREAMS", "#foo", "0") => [["#foo", [["1536495531827-0", ["msg", "hey!"]]]]]
Let's expand the result to see its actual shape:
It's an array of streams (in this case, with only one element "#foo"), where each stream has an associated array of entries composed as an ID and a list of field/value pairs.
Continuing with the chat metaphor, the important piece of information you need to save is the last ID you've seen, so you have to store "1536495531827-0" in your program. When you are ready to check for newer entries, simply call XREAD again and pass the last ID you saw:
>>r.execute_command("XREAD", "STREAMS", "#foo", "1536495531827-0") => nil
The call returned nil because there were no new entries. Had you received any new entries, you would have updated the ID of the last-seen entry and your user-facing application would have presented the newly arrived messages as unread.
In summary, Redis Streams is an exciting new feature to build history preserving message brokers, message queues, unified logs, and chat systems. Interacting with Streams may seem complex, but this and future articles will give you a better understanding of the commands and the different use cases that can be covered with this new feature. Stay tuned for more on this topic!
Fully managed Redis on AWS
Amazon offers a fully managed Redis service, Amazon ElastiCache for Redis, available for trial at no cost with the AWS Free Tier. Amazon ElastiCache for Redis makes it easy to set up, operate, and scale Redis deployments in the cloud. With Amazon ElastiCache, you can deploy internet-scale Redis deployments in minutes, with cost-efficient and resizable hardware capacity.
Get started with free Amazon ElastiCache for Redis in three easy steps: