AWS Architecture Blog
Reactive Microservices Architecture on AWS
Microservice-application requirements have changed dramatically in recent years. These days, applications operate with petabytes of data, need almost 100% uptime, and end users expect sub-second response times. Typical N-tier applications can’t deliver on these requirements.
Reactive Manifesto, published in 2014, describes the essential characteristics of reactive systems including: responsiveness, resiliency, elasticity, and being message driven.
Being message driven is perhaps the most important characteristic of reactive systems. Asynchronous messaging helps in the design of loosely coupled systems, which is a key factor for scalability. In order to build a highly decoupled system, it is important to isolate services from each other. As already described, isolation is an important aspect of the microservices pattern. Indeed, reactive systems and microservices are a natural fit.
Implemented Use Case
This reference architecture illustrates a typical ad-tracking implementation.
Many ad-tracking companies collect massive amounts of data in near-real-time. In many cases, these workloads are very spiky and heavily depend on the success of the ad-tech companies’ customers. Typically, an ad-tracking-data use case can be separated into a real-time part and a non-real-time part. In the real-time part, it is important to collect data as fast as possible and ask several questions including:, “Is this a valid combination of parameters?,””Does this program exist?,” “Is this program still valid?”
Because response time has a huge impact on conversion rate in advertising, it is important for advertisers to respond as fast as possible. This information should be kept in memory to reduce communication overhead with the caching infrastructure. The tracking application itself should be as lightweight and scalable as possible. For example, the application shouldn’t have any shared mutable state and it should use reactive paradigms. In our implementation, one main application is responsible for this real-time part. It collects and validates data, responds to the client as fast as possible, and asynchronously sends events to backend systems.
The non-real-time part of the application consumes the generated events and persists them in a NoSQL database. In a typical tracking implementation, clicks, cookie information, and transactions are matched asynchronously and persisted in a data store. The matching part is not implemented in this reference architecture. Many ad-tech architectures use frameworks like Hadoop for the matching implementation.
The system can be logically divided into the data collection partand the core data updatepart. The data collection part is responsible for collecting, validating, and persisting the data. In the core data update part, the data that is used for validation gets updated and all subscribers are notified of new data.
Components and Services
Main Application
The main application is implemented using Java 8 and uses Vert.x as the main framework. Vert.x is an event-driven, reactive, non-blocking, polyglot framework to implement microservices. It runs on the Java virtual machine (JVM) by using the low-level IO library Netty. You can write applications in Java, JavaScript, Groovy, Ruby, Kotlin, Scala, and Ceylon. The framework offers a simple and scalable actor-like concurrency model. Vert.x calls handlers by using a thread known as an event loop. To use this model, you have to write code known as “verticles.” Verticles share certain similarities with actors in the actor model. To use them, you have to implement the verticle interface. Verticles communicate with each other by generating messages in a single event bus. Those messages are sent on the event bus to a specific address, and verticles can register to this address by using handlers.
With only a few exceptions, none of the APIs in Vert.x block the calling thread. Similar to Node.js, Vert.x uses the reactor pattern. However, in contrast to Node.js, Vert.x uses several event loops. Unfortunately, not all APIs in the Java ecosystem are written asynchronously, for example, the JDBC API. Vert.x offers a possibility to run this, blocking APIs without blocking the event loop. These special verticles are called worker verticles. You don’t execute worker verticles by using the standard Vert.x event loops, but by using a dedicated thread from a worker pool. This way, the worker verticles don’t block the event loop.
Our application consists of five different verticles covering different aspects of the business logic. The main entry point for our application is the HttpVerticle, which exposes an HTTP-endpoint to consume HTTP-requests and for proper health checking. Data from HTTP requests such as parameters and user-agent information are collected and transformed into a JSON message. In order to validate the input data (to ensure that the program exists and is still valid), the message is sent to the CacheVerticle.
This verticle implements an LRU-cache with a TTL of 10 minutes and a capacity of 100,000 entries. Instead of adding additional functionality to a standard JDK map implementation, we use Google Guava, which has all the features we need. If the data is not in the L1 cache, the message is sent to the RedisVerticle. This verticle is responsible for data residing in Amazon ElastiCache and uses the Vert.x-redis-client to read data from Redis. In our example, Redis is the central data store. However, in a typical production implementation, Redis would just be the L2 cache with a central data store like Amazon DynamoDB. One of the most important paradigms of a reactive system is to switch from a pull- to a push-based model. To achieve this and reduce network overhead, we’ll use Redis pub/sub to push core data changes to our main application.
Vert.x also supports direct Redis pub/sub-integration, the following code shows our subscriber-implementation:
vertx.eventBus().<JsonObject>consumer(REDIS_PUBSUB_CHANNEL_VERTX, received -> { JsonObject value = received.body().getJsonObject("value"); String message = value.getString("message"); JsonObject jsonObject = new JsonObject(message); eb.send(CACHE_REDIS_EVENTBUS_ADDRESS, jsonObject); }); redis.subscribe(Constants.REDIS_PUBSUB_CHANNEL, res -> { if (res.succeeded()) { LOGGER.info("Subscribed to " + Constants.REDIS_PUBSUB_CHANNEL); } else { LOGGER.info(res.cause()); } });
The verticle subscribes to the appropriate Redis pub/sub-channel. If a message is sent over this channel, the payload is extracted and forwarded to the cache-verticle that stores the data in the L1-cache. After storing and enriching data, a response is sent back to the HttpVerticle, which responds to the HTTP request that initially hit this verticle. In addition, the message is converted to ByteBuffer, wrapped in protocol buffers, and send to an Amazon Kinesis Data Stream.
The following example shows a stripped-down version of the KinesisVerticle:
public class KinesisVerticle extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(KinesisVerticle.class); private AmazonKinesisAsync kinesisAsyncClient; private String eventStream = "EventStream"; @Override public void start() throws Exception { EventBus eb = vertx.eventBus(); kinesisAsyncClient = createClient(); eventStream = System.getenv(STREAM_NAME) == null ? "EventStream" : System.getenv(STREAM_NAME); eb.consumer(Constants.KINESIS_EVENTBUS_ADDRESS, message -> { try { TrackingMessage trackingMessage = Json.decodeValue((String)message.body(), TrackingMessage.class); String partitionKey = trackingMessage.getMessageId(); byte [] byteMessage = createMessage(trackingMessage); ByteBuffer buf = ByteBuffer.wrap(byteMessage); sendMessageToKinesis(buf, partitionKey); message.reply("OK"); } catch (KinesisException exc) { LOGGER.error(exc); } }); }
Kinesis Consumer
This AWS Lambda function consumes data from an Amazon Kinesis Data Stream and persists the data in an Amazon DynamoDB table. In order to improve testability, the invocation code is separated from the business logic. The invocation code is implemented in the class KinesisConsumerHandler and iterates over the Kinesis events pulled from the Kinesis stream by AWS Lambda. Each Kinesis event is unwrapped and transformed from ByteBuffer to protocol buffers and converted into a Java object. Those Java objects are passed to the business logic, which persists the data in a DynamoDB table. In order to improve duration of successive Lambda calls, the DynamoDB-client is instantiated lazily and reused if possible.
Redis Updater
From time to time, it is necessary to update core data in Redis. A very efficient implementation for this requirement is using AWS Lambda and Amazon Kinesis. New core data is sent over the AWS Kinesis stream using JSON as data format and consumed by a Lambda function. This function iterates over the Kinesis events pulled from the Kinesis stream by AWS Lambda. Each Kinesis event is unwrapped and transformed from ByteBuffer to String and converted into a Java object. The Java object is passed to the business logic and stored in Redis. In addition, the new core data is also sent to the main application using Redis pub/sub in order to reduce network overhead and converting from a pull- to a push-based model.
The following example shows the source code to store data in Redis and notify all subscribers:
public void updateRedisData(final TrackingMessage trackingMessage, final Jedis jedis, final LambdaLogger logger) { try { ObjectMapper mapper = new ObjectMapper(); String jsonString = mapper.writeValueAsString(trackingMessage); Map<String, String> map = marshal(jsonString); String statusCode = jedis.hmset(trackingMessage.getProgramId(), map); } catch (Exception exc) { if (null == logger) exc.printStackTrace(); else logger.log(exc.getMessage()); } } public void notifySubscribers(final TrackingMessage trackingMessage, final Jedis jedis, final LambdaLogger logger) { try { ObjectMapper mapper = new ObjectMapper(); String jsonString = mapper.writeValueAsString(trackingMessage); jedis.publish(Constants.REDIS_PUBSUB_CHANNEL, jsonString); } catch (final IOException e) { log(e.getMessage(), logger); } }
Similarly to our Kinesis Consumer, the Redis-client is instantiated somewhat lazily.
Infrastructure as Code
As already outlined, latency and response time are a very critical part of any ad-tracking solution because response time has a huge impact on conversion rate. In order to reduce latency for customers world-wide, it is common practice to roll out the infrastructure in different AWS Regions in the world to be as close to the end customer as possible. AWS CloudFormation can help you model and set up your AWS resources so that you can spend less time managing those resources and more time focusing on your applications that run in AWS.
You create a template that describes all the AWS resources that you want (for example, Amazon EC2 instances or Amazon RDS DB instances), and AWS CloudFormation takes care of provisioning and configuring those resources for you. Our reference architecture can be rolled out in different Regions using an AWS CloudFormation template, which sets up the complete infrastructure (for example, Amazon Virtual Private Cloud (Amazon VPC), Amazon Elastic Container Service (Amazon ECS) cluster, Lambda functions, DynamoDB table, Amazon ElastiCache cluster, etc.).
Conclusion
In this blog post we described reactive principles and an example architecture with a common use case. We leveraged the capabilities of different frameworks in combination with several AWS services in order to implement reactive principles—not only at the application-level but also at the system-level. I hope I’ve given you ideas for creating your own reactive applications and systems on AWS.