AWS Database Blog

Streaming Changes in a Database with Amazon Kinesis

Emmanuel Espina is a software development engineer at Amazon Web Services.

In this blog post, I will discuss how to integrate a central relational database with other systems by streaming its modifications through Amazon Kinesis.

The following diagram shows a common architectural design in distributed systems. It includes a central storage referred to as a “single source of truth” and several derived “satellite” systems that consume this central storage.

SingleSourceOfTruth

You could use this design architecture and have a relational database as the central data store, taking advantage of the transactional capabilities of this system for maintaining the integrity of the data. A derived system in this context could be a full-text search system that observes this single source of truth for changes, transforms and filters those modifications, and finally updates its internal indexes. Another example could be a columnar storage more appropriate for OLAP queries. In general, any system that requires taking action upon modification of individual rows of the central relational system is a good candidate to become a derived data store.

A naive implementation for these kinds of architectures will have the derived systems issuing queries periodically to retrieve modified rows, essentially polling the central database with a SELECT-based query.

A better implementation for this architecture is one that uses an asynchronous stream of updates. Because databases usually have a transaction log where all of the changes in rows are stored, if this stream of changes is exposed to external observer systems, those systems could attach to these streams and start processing and filtering row modifications.  I will show a basic implementation of this schema using MySQL as the central database and Amazon Kinesis as the message bus.

Normally, MYSQL binlog is exposed to read replicas that read all of the changes on the master and then apply them locally. In this post, I am going to create a generalized read replica that will publish changes to an Amazon Kinesis stream instead of applying the modifications to a local database.

Fetch

One important detail of this method is that the consumers won’t receive SQL queries. Those can be exposed too, but in general observers won’t be very interested in SQL unless they maintain a SQL-compatible replica of the data themselves. Instead, they will receive modified entities (rows) one by one. The benefits of this approach are that consumers do not need to understand SQL and the single source of truth does not need to know who will be consuming its changes. That means that different teams can work without coordinating among themselves on the required data format. Even better, given the capabilities of Amazon Kinesis clients to read from a specific point in time, each consumer will process messages at its own pace. This is why a message bus is one of the less coupled ways to integrate your systems.

In the example used in this post, the rows fetcher is a regular Python process that will attach to the central database, simulating a read replica.

The database can be either Amazon RDS or any installation of MySQL. In the case of RDS, the fetcher process must be installed on a different host (for example, EC2) because it is not possible to install custom software on RDS instance hosts. For external installations, the fetcher process can be installed on the same host as the database.

Prepare the master MySQL instance

The MySQL master (the single source of truth) must be configured as if it were a master for regular replication. Binlogs must be enabled and working in ROW format to receive individual modified rows. (Otherwise, you would end up with SQL queries only.) For information, see The Binary Log on the MySQL site.

To enable the binlog, add these two lines to your my.cnf configuration file:
log_bin=<path to binlog>
binlog_format=ROW

It is possible to get row-based logging by setting the transaction isolation level to READ-COMMITTED at the global or session level for all connections (for example, using init_connect or a database API like JDBC).

If you are using RDS (MySql 5.6+), things are easy! You can create the required configuration by enabling periodic backups (binlogs are disabled if backups are not enabled) and updating the parameter group variable binlog_format to ROW. (You can do this from the RDS Dashboard under Parameter Groups.)

ParameterGroup

Add permissions

If you are using the default user created by RDS, you might already have these permissions. If not, you’ll need to create a user with REPLICATION SLAVE permissions. For information, see Creating a User for Replication.

mysql> CREATE USER 'repl'@'%.mydomain.com' IDENTIFIED BY 'slavepass';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%.mydomain.com';

Create an Amazon Kinesis stream

You need an Amazon Kinesis stream and  boto3 client credentials. For information about client credentials, see the Boto 3 documentation.

Open the Amazon Kinesis console and choose Create Stream.

WelcomeToKinesis

Enter the name of your stream and the number of shards. In this example, there is a single shard.

Stream

After a few minutes, your stream will be ready to accept row modifications!

CreateStreamAssign permissions to your CLI user

You can use the AWS Identity and Access Management (IAM) to give permissions to the CLI user that will be accessing this stream.

ServiceList

In this example, that user is KinesisRDSIntegration. You can create a user or use an existing one, but you need to add permissions for writing to the Amazon Kinesis stream.

AttachPolicy

You can create a policy specific for your stream. This example uses a standard policy that gives complete access to Amazon Kinesis.

AttachPolicy1

Connecting to the master and publishing changes

To install libraries required by the Python publisher, run the following command:

pip install mysql-replication boto3

For more detailed instructions, see:

https://github.com/noplay/python-mysql-replication

https://boto3.readthedocs.io/en/latest/guide/quickstart.html

Here is the Python script that performs the magic. Remember to replace the <HOST>, <PORT>, <USER>, <PASSWORD> and <STREAM_NAME> variables with the values for your configuration.

import json
import boto3

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
  DeleteRowsEvent,
  UpdateRowsEvent,
  WriteRowsEvent,
)

def main():
  kinesis = boto3.client("kinesis")

  stream = BinLogStreamReader(
    connection_settings= {
      "host": "<HOST>",
      "port": <PORT>,
      "user": "<USER>",
      "passwd": "<PASSWORD>"},
    server_id=100,
    blocking=True,
    resume_stream=True,
    only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

  for binlogevent in stream:
    for row in binlogevent.rows:
      event = {"schema": binlogevent.schema,
      "table": binlogevent.table,
      "type": type(binlogevent).__name__,
      "row": row
      }

      kinesis.put_record(StreamName="<STREAM_NAME>", Data=json.dumps(event), PartitionKey="default")
      print json.dumps(event)

if __name__ == "__main__":
   main()

This script will publish each modified row as an Amazon Kinesis record, serialized in JSON format.

Consuming the messages

Now you are ready to consume the modified records. Any consumer code would work. If you use the code in this post, you will get messages in this format:

  • {"table": "Users", "row": {"values": {"Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
  • {"table": "Users", "row": {"values": {"Name": "Bar user", "idUsers": 124}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
  • {"table": "Users", "row": {"before_values": {"Name": "Foo User", "idUsers": 123}, "after_values": {"Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}

Summary

In this blog post, I have shown how to expose the changes stream to the records of a database using a fake read replica and Amazon Kinesis. Many data-oriented companies are using architectures similar to this. The example provided in this post, while not ready for a real production environment, can be used to experiment with this integration style and improve the scaling capabilities of your enterprise architecture. The most complex part is probably what is already solved behind the scenes by Amazon Kinesis. You only need to provide the glue!

Additional resources

What every software engineer should know about real-time data’s unifying abstraction

All aboard the Databus: LinkedIn’s scalable consistent change data capture platform