AWS Architecture Blog
Field Notes: How to Scale OpenTravel Messaging Architecture with Amazon Kinesis Data Streams
The travel industry relies on OpenTravel messaging systems to collect and distribute travel data—like hotel inventory and pricing—to many independent ecommerce travel sites. These travel sites need immediate access to the most current hotel inventory and pricing data. This allows shoppers access to the available rooms at the right prices. Each time a room is reserved, unreserved, or there is a price change, an ordered message with minimal latency must be sent from the hotel system to the travel site.
Overview of Solution
In this blog post, we will describe the architectural considerations needed to scale a low latency FIFO messaging system built with Amazon Kinesis Data Streams.
The architecture must satisfy the following constraints:
- Messages must arrive in order to each destination, with respect to their hotel code.
- Messages are delivered to multiple destinations independently.
Kinesis Data Streams is a managed service that enables real-time processing of streaming records. The service provides ordering of records, as well as the ability to read and replay records in the same order to multiple Amazon Kinesis applications. Kinesis data stream applications can also consume data in parallel from a stream through parallelization of consumers.
We will start with an architecture that supports one hotel with one destination, and scale it to support 1,000 hotels and 100 destinations (38,051 records per second). With the OpenTravel use case, the order of messages matters only within a stream of messages from a given hotel.
Smallest scale: One hotel with one processed delivery destination. One million messages a month.
Full Scale: 1,000 Hotels with 100 processed delivery destination. 100 billion messages a month.
In the example application, OpenTravel XML message data is the record payload. The record is written into the stream with a producer. The record is read from the stream shard by the consumer and sent to several HTTP destinations.
Each Kinesis data stream shard supports writes up to 1,000 records per second, up to a maximum data write total of 1 MB per second. Each shard supports reads up to five transactions per second, up to a maximum data read total of 2 MB per second. There is no upper quota on the number of streams you can have in an account.
Streams | Shards | Writes/Input limit | Reads/Output limit |
---|---|---|---|
1 | 1 | 1 MB per second 1,000 records per second |
2 MB per second |
1 | 500 | 500 MB per second 500,000 records per second |
1,000 MB per second |
OpenTravel message
In the following OpenTravel message, there is only one field that is important to our use case: HotelCode
. In OTA (OpenTravel Agency) messaging, order matters, but it only matters in the context of a given hotel, specified by the HotelCode
. As we scale up our solution, we will use the HotelCode
as a partition key. Each destination receives the same messages. If we are sending 100 destinations, then we will send the message 100 times. The average message size is 4 KB.
<HotelAvailNotif>
<request>
<POS>
<Source>
<RequestorID ID = "1000">
</RequestorID>
<BookingChannel>
<CompanyName Code = "ClientTravelAgency1">
</CompanyName>
</BookingChannel>
</Source>
</POS>
<AvailStatusMessages HotelCode="100">
<AvailStatusMessage BookingLimit="9">
<StatusApplicationControl Start="2013-12-20" End="2013-12-25" RatePlanCode="BAR" InvCode="APT" InvType="ROOM" Mon="true" Tue="true" Weds="true" Thur="false" Fri="true" Sat="true" Sun="true"/>
<LengthsOfStay ArrivalDateBased="true">
<LengthOfStay Time="2" TimeUnit="Day" MinMaxMessageType="MinLOS"/>
<LengthOfStay Time="8" TimeUnit="Day" MinMaxMessageType="MaxLOS"/>
</LengthsOfStay>
<RestrictionStatus Status="Open" SellThroughOpenIndicator="false" MinAdvancedBookingOffset="5"/>
</AvailStatusMessage>
</AvailStatusMessages>
</request>
</HotelAvailNotif>
Source: https://github.com/XML-Travelgate/xtg-content-articles-pub/blob/master/docs/hotel-push/HotelPUSH.md
Producer and consumer message error handling
Asynchronous message processing presents challenges with error handling, because an error can be logged to the producer or consumer application logs. Short-lived errors can be resolved with a delayed retry. However, there are cases when the data is bad and the message will always cause an error; these messages should be added to a dead letter queue (DLQ).
Producer retries and consumer retries are some of reasons why records may be delivered more than once. Kinesis Data Streams does not automatically remove duplicate records. If the destination needs a strict guarantee of uniqueness, the message must have a primary key to remove duplicates when processed in the client (Handling Duplicate Records). In most cases, the destination can mitigate duplicate messages by processing the same message multiple times in a way that produces the same result (idempotence). If a destination endpoint becomes unavailable or is not performant, the consumer should back off and retry until the endpoint is available. The failure of a single destination endpoint should not impact the performance of delivery of messages to other destinations.
Messaging with one hotel and one destination
When starting at the smallest scale—one hotel and one destination—the system must support one million messages per month. This volume expects input of 4 GB of message data per month at a rate of 0.3805 records per second, and .0015 MB per second both input and output. For the system to process one hotel to one destination, it needs at least one producer, one stream, one shard, and a single consumer.
Hotels/Destinations | Streams | Shards | Consumers (standard) | Input | Output |
---|---|---|---|---|---|
1 hotel, 1 destination (4-KB average record size) |
1 | 1 | 1 | 0.3805 records per second, or .0015 MB per second | 0.3805 records per second, or .0015 MB per second |
Maximum stream limits | 1 | 1 | 1 | 250 (4-KB records) per second 1 MB per second (per stream) |
500 (4-KB records) per second 2 MB per second (per stream) |
With this design, the single shard supports writes up to 250 records per second, up to a maximum data write total of 1 MB per second. PutRecords
request supports up to 500 records and each record in the request can be as large as 1 MB. Because the average message size is 4 KB, it enables 250 records per second to be written to the shard.
The single shard can also support consumer reads up to five transactions per second, up to a maximum data read total of 2 MB per second. The producer writes to the shard in FIFO order. A consumer pulls from the shard and delivers to the destination in the same order as received. In this one hotel and one destination example, the consumer read capacity would be 500 records per second.
Messaging with 1,000 hotels and one destination
Next, we will maintain a single destination and scale the number of hotels from one hotel to 1,000 hotels. This scale increases the producer inputs from one million messages to one billion messages a month. This volume expects an input of 4 TB of message data per month at a rate of 380 records per second, and 1.486 MB per second.
Hotels/Destinations | Streams | Shards | Consumers (standard) | Input | Output |
---|---|---|---|---|---|
1,000 hotels, 1 destination (4-KB average record size) |
1 | 2 | 1 | 380 records per second, or 1.486 MB per second |
380 records per second, or 1.486 MB per second |
Maximum stream limits | 1 | 2 | 1 | 500 (4-KB records) per second 2 MB per second (per stream) |
1,000 (4-KB records) per second 4 MB per second (per stream) |
The volume of incoming messages increased to 1.486 MB per second, requiring one additional shard. A shard can be split using the API to increase the overall throughput capacity. The write capacity is increased by distributing messages to the shards. We will use the HotelCode
as a partition key, to maintain order of messages with respect to a given hotel. This will distribute the writes into the two shards. The messages for each HotelCode will be written into the same shard and maintain FIFO order. By using the HotelCode as a partition key, it doubles the stream write capacity to 2 MB per second.
A single consumer can read the records from each shard at 2 MB per second per shard. The consumer will iterate through each shard, processing the records in FIFO order based on the HotelCode
. It then sends messages to the destination in the same order they were sent from the producer.
Messaging with 1,000 hotels and five destinations
As we scale up further, we maintain 1,000 hotels and scale the delivery to five destinations. This increases the consumer reads from one billion messages to five billion messages a month. It has the same input of 4 TB of message data per month at a rate of 380 records per second, and 1.486 MB per second. The output is now 326 GB of message data per month at a rate of 1,902.5 records per second, and 7.43 MB per second.
Hotels/Destinations | Streams | Shards | Consumers (standard) | Input | Output |
---|---|---|---|---|---|
1,000 hotels, 5 destinations (4-KB average record size) |
1 | 4 | 5 | 380 records per second, or 1.486 MB per second | 1,902.5 records per second, or 7.43 MB per second |
Maximum stream capacity | 1 | 4 | 5 | 1,000 (4-KB records) records per second (per shard) 4 MB per second (per stream) |
2,000 (4-KB records) records per second per shard with a standard consumer 8 MB per second (per stream) |
To support this scale, we increase the number of consumers to match the destinations. A dedicated consumer for each destination allows the system to have committed capacity to each destination. If a destination fails or becomes slow, it will not impact other destination consumers.
We increase to four shards to support additional read throughput required for the increased destination consumers. Each destination consumer iterates through the messages in each shard in FIFO order based on the HotelCode
. It then sends the messages to the assigned destination maintaining the hotel specific ordering. Like in the previous 1,000-to-1 design, the messages for each HotelCode are written into the same shard while maintaining its FIFO order.
The distribution of messages per HotelCode should be monitored with the metrics like WriteProvisionedThroughputExceeded and ReadProvisionedThroughputExceeded to ensure an even distribution between shards, because HotelCode is the partition key. If there is uneven distribution, it may require a dedicated shard for each HotelCode. In the following examples, it is assumed that there is an even distribution of messages.
Messaging with 1,000 hotels and 100 destinations
With 1,000 hotels and 100 destinations, the system processes 100 billion messages per month. It continues to have the same number of incoming messages, 4 TB of message data per month, at a rate of 380 records per second, and 1.486 MB per second. The number of destinations has increased from 4 to 100, resulting in 390 TB of message data per month, at a rate of 38,051 records per second, and 148.6 MB per second.
Hotels/Destinations | Streams | Shards | Consumers (standard) | Input | Output |
---|---|---|---|---|---|
1,000 hotels, 100 destinations (4-KB average record size) |
1 | 78 | 100 | 380 records per second, or 1.486 MB per second |
38,051 records per second, or 148.6 MB per second |
Maximum stream capacity | 1 | 78 | 100 | 19,500 (4-KB records) records per second (per stream) 78 MB per second (per stream) |
39,000 (4-KB records) records per second per stream with a standard consumer 156 MB per second (per stream) |
The number of shards increases from four to 78 to support the required read throughput needed for 100 consumers, to attain a read capacity of 156 MB per second with 78 shards.
Next, we will look at a different architecture using a different type of consumer called the enhanced fan-out consumer. The enhanced fan-out consumer will improve the efficiency of the stream throughput and processing efficiency.
Lambda enhanced fan-out consumers
Enhanced fan-out consumers can increase the per shard read consumption throughput through event-based consumption, reduce latency with parallelization, and support error handling. The enhanced fan-out consumer increases the read capacity of consumers from a shared 2 MB per second, to a dedicated 2 MB per second for each consumer.
When using Lambda as an enhanced fan-out consumer, you can use the Event Source Mapping Parallelization Factor to have one Lambda pull from one shard concurrently with up to 10 parallel invocations per consumer. Each parallelized invocation contains messages with the same partition key (HotelCode) and maintains order. The invocations complete each message before processing with the next parallel invocation.
Consumers can gain performance improvements by setting a larger batch size for each invocation. When setting the Event Source Mapping batch size, the consumer can set the maximum number of records that the Lambda will retrieve from the stream at the time of invoking the function. The Event Source Mapping batch window can be used to adjust the time to wait to gather records for the batch before invoking the function.
The Lambda Event Source Mapping has a setting ReportBatchItemFailures that can be used to keep track of the last successfully processed record. When the next invocation of the Lambda starts the batch from the checkpoint, it starts with the failed record. This will occur until the maximum number of retries for the failed record occurs and it is expired. If this feature is enabled and a failure occurs, Lambda will prioritize checkpointing, over other set mechanisms, to minimize duplicate processing.
Lambda has built-in support for sending old or exhausted retries to an on-failure destination with the option of Amazon Simple Queue Service as a DLQ or SNS topic. The Lambda consumer can be configured with maximum record retries and maximum record age, so repeated failures are sent to a DLQ and handled with a separate Lambda function.
Messaging with Lambda fan-out consumers at scale
In the 1,000 hotel and 100 destination scenario, we will scale by shard and stream. Lambda fan-out consumers have a hard quota of 20 fanout-out consumers per stream. With one consumer per destination, 100 fan-out consumers will need five streams. The producer will write to one stream, which has a consumer that writes to the five streams that our destination consumers are reading from.
Hotels/Destinations | Streams | Shards | Consumers (fan-out) | Input | Output |
---|---|---|---|---|---|
1,000 hotels, 100 destinations (4-KB average record size) |
5 | 10 (2 each stream) | 100 | 380 records per second, or 74.3 MB per second |
38,051 records per second, or 148.6 MB per second |
Maximum stream capacity | 5 | 10 (2 each stream) | 100 | 500 (4-KB records) per second per stream 2 MB per second per stream |
40,000 (4-KB records) per second per stream 200,000 (4-KB records) per second with 5 streams 40 MB per second per stream 200 MB per second with 5 streams |
This architecture increases throughput to 200 MB per second, with only 12 shards, compared to 78 shards with standard consumers.
Conclusion
In this blog post, we explained how to use Kinesis Data Streams to process low latency ordered messages at scale with OpenTravel data. We reviewed the aspects of efficient processing and message consumption, scaling considerations, and error handling scenarios. We explored multiple architectures, one dimension of scale at a time, to demonstrate several considerations for scaling the OpenTravel messaging system.