Tag: AWS Lambda

Preprocessing Data in Amazon Kinesis Analytics with AWS Lambda

Many customers use Amazon Kinesis to ingest, analyze, and persist their streaming data.  One of the easiest ways to gain real-time insights into your streaming data is to use Kinesis Analytics.  It enables you to query the data in your stream or build entire streaming applications using SQL.  Customers use Kinesis Analytics for things like filtering, aggregation, and anomaly detection.

Kinesis Analytics now gives you the option to preprocess your data with AWS Lambda.  This gives you a great deal of flexibility in defining what data gets analyzed by your Kinesis Analytics application. You can also define how that data is structured before it is queried by your SQL.

In this post, I discuss some common use cases for preprocessing, and walk you through an example to help highlight its applicability.

Common use cases

There are many reasons why you might choose to preprocess data before starting your analysis.  Because you build your preprocessing logic with Lambda, your preprocessor can do anything supported by Lambda.  However, there are some specific use cases that lend themselves well to preprocessing, such as data enrichment and data transformation.


In some scenarios, you may need to enhance your streaming data with additional information, before you perform your SQL analysis.  Kinesis Analytics gives you the ability to use data from Amazon S3 in your Kinesis Analytics application, using the Reference Data feature. However, you cannot use other data sources from within your SQL query.

To add dynamic data to your streaming data, you can preprocess with a Lambda function, and retrieve the data from the data store of your choosing.  For example, consider a scenario where you’re streaming some data about users of your application, including their IP address.  You want to do some real-time analysis on the geographic locations of your customers.  In this example, your preprocessing Lambda function uses your data source for geolocation information to retrieve the user’s city, state or province, and country, based on the IP address that was included in the streaming record.  You then enrich the record with that information and now your SQL query can use those attributes in its aggregation.


Because Kinesis Analytics uses SQL to analyze your data, the structure of your streaming records must be mapped to a schema.  If your records are JSON or CSV, Kinesis Analytics automatically creates a schema.  However, if your JSON records contain complex nested arrays, you may need to customize how the record structure is mapped to a flattened schema.  Further, Kinesis Analytics is unable to automatically parse formats such as GZIP, protobuf, or Avro.

If your input records are unstructured text, Kinesis Analytics creates a schema, but it consists of a single column representing your entire record.  To remedy these complexities, use Lambda to transform and convert your streaming data so that it more easily maps to a schema that can be queried by the SQL in your Kinesis Analytics application.

Assume that you’re streaming raw Apache access log data from a web fleet to a Kinesis stream, and you want to use Kinesis Analytics to detect anomalies in your HTTP response codes.  In this example, you want to detect when your stream contains an unusually large number of 500 response codes.  This may indicate that something has gone wrong somewhere in your application, and as a result, Apache is returning 500 responses to clients.  This is typically not a good customer experience.

An example Apache access log record looks like this: - - [28/Sep/2017:11:18:59 -0400] "PUT /explore HTTP/1.1" 200 2742 "-" "Mozilla/5.0 (Windows; U; Windows NT 6.3) AppleWebKit/538.0.1 (KHTML, like Gecko) Chrome/20.0.872.0 Safari/538.0.1"

Although its structure is well-defined, it is not JSON or CSV, so it doesn’t map to a defined schema in Kinesis Analytics.  To use Kinesis Analytics with raw Apache log records, you can transform them to JSON or CSV with a preprocessing Lambda function.  For example, you can convert it to a simple JSON documents that easily maps to a schema:

	"client_ip": "",
	"request_time": "28/Sep/2017:11:18:59 -0400",
	"method": "PUT",
	"resource": "/explore",
	"protocol": "HTTP/1.1",
	"response": 200,
	"response_size": 2742,
	"user-agent": "Mozilla/5.0 (Windows; U; Windows NT 6.3) AppleWebKit/538.0.1 (KHTML, like Gecko) Chrome/20.0.872.0 Safari/538.0.1"


To illustrate where the preprocessing step takes place within Kinesis Analytics, take a high-level look at the architecture.

Kinesis Analytics continuously reads data from your Kinesis stream or Kinesis Firehose delivery stream.  For each batch of records that it retrieves, the Lambda processor subsystem manages how each batch gets passed to your Lambda function.  Your function receives a list of records as input.  Within your function, you iterate through the list and apply your business logic to accomplish your preprocessing requirements (such as data transformation).

The input model to your preprocessing function varies slightly, depending on whether the data was received from a stream or delivery stream.


Build a Serverless Architecture to Analyze Amazon CloudFront Access Logs Using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

Nowadays, it’s common for a web server to be fronted by a global content delivery service, like Amazon CloudFront. This type of front end accelerates delivery of websites, APIs, media content, and other web assets to provide a better experience to users across the globe.

The insights gained by analysis of Amazon CloudFront access logs helps improve website availability through bot detection and mitigation, optimizing web content based on the devices and browser used to view your webpages, reducing perceived latency by caching of popular object closer to its viewer, and so on. This results in a significant improvement in the overall perceived experience for the user.

This blog post provides a way to build a serverless architecture to generate some of these insights. To do so, we analyze Amazon CloudFront access logs both at rest and in transit through the stream. This serverless architecture uses Amazon Athena to analyze large volumes of CloudFront access logs (on the scale of terabytes per day), and Amazon Kinesis Analytics for streaming analysis.

The analytic queries in this blog post focus on three common use cases:

  1. Detection of common bots using the user agent string
  2. Calculation of current bandwidth usage per Amazon CloudFront distribution per edge location
  3. Determination of the current top 50 viewers

However, you can easily extend the architecture described to power dashboards for monitoring, reporting, and trigger alarms based on deeper insights gained by processing and analyzing the logs. Some examples are dashboards for cache performance, usage and viewer patterns, and so on.

Following we show a diagram of this architecture.


Build a Healthcare Data Warehouse Using Amazon EMR, Amazon Redshift, AWS Lambda, and OMOP

In the healthcare field, data comes in all shapes and sizes. Despite efforts to standardize terminology, some concepts (e.g., blood glucose) are still often depicted in different ways. This post demonstrates how to convert an openly available dataset called MIMIC-III, which consists of de-identified medical data for about 40,000 patients, into an open source data model known as the Observational Medical Outcomes Partnership (OMOP) Common Data Model (CDM). It describes the architecture and steps for analyzing data across various disconnected sources of health datasets so you can start applying Big Data methods to health research.

Before designing and deploying a healthcare application on AWS, make sure that you read through the AWS HIPAA Compliance whitepaper. This covers the information necessary for processing and storing patient health information (PHI).

Note: If you arrived at this page looking for more info on the movie Mimic 3: Sentinel, you might not enjoy this post.

OMOP overview

The OMOP CDM helps standardize healthcare data and makes it easier to analyze outcomes at a large scale. The CDM is gaining a lot of traction in the health research community, which is deeply involved in developing and adopting a common data model. Community resources are available for converting datasets, and there are software tools to help unlock your data after it’s in the OMOP format. The great advantage of converting data sources into a standard data model like OMOP is that it allows for streamlined, comprehensive analytics and helps remove the variability associated with analyzing health records from different sources.

OMOP ETL with Apache Spark

Observational Health Data Sciences and Informatics (OHDSI) provides the OMOP CDM in a variety of formats, including Apache Impala, Oracle, PostgreSQL, and SQL Server. (See the OHDSI Common Data Model repo in GitHub.) In this scenario, the data is moved to AWS to take advantage of the unbounded scale of Amazon EMR and serverless technologies, and the variety of AWS services that can help make sense of the data in a cost-effective way—including Amazon Machine Learning, Amazon QuickSight, and Amazon Redshift.

This example demonstrates an architecture that can be used to run SQL-based extract, transform, load (ETL) jobs to map any data source to the OMOP CDM. It uses MIMIC ETL code provided by Md. Shamsuzzoha Bayzid. The code was modified to run in Amazon Redshift.

Getting access to the MIMIC-III data

Before you can retrieve the MIMIC-III data, you must request access on the PhysioNet website, which is hosted on Amazon S3 as part of the Amazon Web Services (AWS) Public Dataset Program. However, you don’t need access to the MIMIC-III data to follow along with this post.

Solution architecture and loading process

The following diagram shows the architecture that is used to convert the MIMIC-III dataset to the OMOP CDM.


How Eliza Corporation Moved Healthcare Data to the Cloud

This is a guest post by Laxmikanth Malladi, Chief Architect at NorthBay. NorthBay is an AWS Advanced Consulting Partner and an AWS Big Data Competency Partner

“Pay-for-performance” in healthcare pays providers more to keep the people under their care healthier. This is a departure from fee-for-service where payments are for each service used. Pay-for-performance arrangements provide financial incentives to hospitals, physicians, and other healthcare providers to carry out improvements and achieve optimal outcomes for patients.

Eliza Corporation, a company that focuses on health engagement management, acts on behalf of healthcare organizations such as hospitals, clinics, pharmacies, and insurance companies. This allows them to engage people at the right time, with the right message, and in the right medium. By meeting them where they are in life, Eliza can capture relevant metrics and analyze the overall value provided by healthcare.

Eliza analyzes more than 200 million such outreaches per year, primarily through outbound phone calls with interactive voice responses (IVR) and other channels. For Eliza, outreach results are the questions and responses that form a decision tree, with each question and response captured as a pair:

<question, response>: <“Did you visit your physician in the last 30 days?” , “Yes”>

This type of data has been characteristic and distinctive for Eliza and poses challenges in processing and analyzing. For example, you can’t have a table with fixed columns to store the data.

The majority of data at Eliza takes the form of outreach results captured as a set of <attribute> and <attribute value> pairs. Other data sets at Eliza include structured data for the members to target for outreach. This data is received from various systems that include customers, claims data, pharmacy data, electronic medical records (EMR/EHR) data, and enrichment data. There are considerable variety and quality considerations in the data that Eliza deals with for keeping the business running.

NorthBay was chosen as the big data partner to architect and implement a data infrastructure to improve the overall performance of Eliza’s process. NorthBay architected a data lake on AWS for Eliza’s use case and implemented majority of the data lake components by following the best practice recommendations from the AWS white paper “Building a Data Lake on AWS.”

In this post, I discuss some of the practical challenges faced during the implementation of the data lake for Eliza and the corresponding details of the ways we solved these issues with AWS. The challenges we faced involved the variety of data and a need for a common view of the data.

Data transformation

This section highlights some of the transformations done to overcome the challenges related to data obfuscation, cleansing, and mapping.

The following architecture depicts the flow for each of these processes.


  • The Amazon S3 manifest file or time-based event triggers an AWS Lambda function.
  • The Lambda function launches an AWS Data Pipeline orchestration process passing the relevant parameters.
  • The Data Pipeline process creates a transient Amazon EMR resource and submits the appropriate Hadoop job.
  • The Hadoop job is configured to read the relevant metadata tables from Amazon DynamoDB and AWS KMS (for encrypt/decrypt operations).
  • Using the metadata, the Hadoop job transforms the input data to put results in the appropriate S3 location.
  • When the Hadoop job is complete, an Amazon SNS topic is notified for further processing.


Building Event-Driven Batch Analytics on AWS

Karthik Sonti is a Senior Big Data Architect with AWS Professional Services

Modern businesses typically collect data from internal and external sources at various frequencies throughout the day. These data sources could be franchise stores, subsidiaries, or new systems integrated as a result of merger and acquisitions.

For example, a retail chain might collect point-of-sale (POS) data from all franchise stores three times a day to get insights into sales as well as to identify the right number of staff at a given time in any given store. As each franchise functions as an independent business, the format and structure of the data might not be consistent across the board. Depending on the geographical region, each franchise would provide data at a different frequency and the analysis of these datasets should wait until all the required data is provided (event-driven) from the individual franchises. In most cases, the individual data volumes received from each franchise are usually small but the velocity of the data being generated and the collective volume can be challenging to manage.

In this post, I walk you through an architectural approach as well as a sample implementation on how to collect, process, and analyze data for event-driven applications in AWS.


The architecture diagram below depicts the components and the data flow needed for a event-driven batch analytics system. At a high-level, this architecture approach leverages Amazon S3 for storing source, intermediate, and final output data; AWS Lambda for intermediate file level ETL and state management; Amazon RDS as the state persistent store; Amazon EMR for aggregated ETL (heavy lifting, consolidated transformation, and loading engine); and Amazon Redshift as the data warehouse hosting data needed for reporting.

In this architecture, each location on S3 stores data at a certain state of transformation. When new data is placed at a specific location, an S3 event is raised that triggers a Lambda function responsible for the next transformation in the chain. You can use this event-driven approach to create sophisticated ETL processes, and to syndicate data availability at a given point in the chain.



Processing VPC Flow Logs with Amazon EMR

Michael Wallman is a senior consultant with AWS ProServ

It’s easy to understand network patterns in small AWS deployments where software stacks are well defined and managed. But as teams and usage grow, its gets harder to understand which systems communicate with each other, and on what ports. This often results in overly permissive security groups.

In this post, I show you how to gain valuable insight into your network by using Amazon EMR and Amazon VPC Flow Logs. The walkthrough implements a pattern often found in network equipment called ‘Top Talkers’, an ordered list of the heaviest network users, but the model can also be used for many other types of network analysis. Customers have successfully used this process to lock down security groups, analyze traffic patterns, and create network graphs.

VPC Flow Logs

VPC Flow Logs enables the capture of IP information flowing to and from network interfaces within a VPC. Each Flow Log record is a 5-tuple set of 5 different values that specify the source, destination, and protocol for an Internet protocol (IP) flow:

version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status

VPC Flow Logs can be enabled on a single network interface, on a subnet, or an entire VPC. When enabled on the VPC, it begins log collection on all network interfaces within that VPC. In large deployments of tens of thousands of instances, Flow Logs can easily generate terabytes of compressed log data per hour!

To process this data at scale, this post takes you through the steps in the following graphic.


Data Lake Ingestion: Automatically Partition Hive External Tables with AWS

Songzhi Liu is a Professional Services Consultant with AWS

The data lake concept has become more and more popular among enterprise customers because it collects data from different sources and stores it where it can be easily combined, governed, and accessed.

On the AWS cloud, Amazon S3 is a good candidate for a data lake implementation, with large-scale data storage. Amazon EMR provides transparent scalability and seamless compatibility with many big data applications on Hadoop. However, no matter what kind of storage or processing is used, data must be defined.

In this post, I introduce a simple data ingestion and preparation framework based on AWS Lambda, Amazon DynamoDB, and Apache Hive on EMR for data from different sources landing in S3. This solution lets Hive pick up new partitions as data is loaded into S3 because Hive by itself cannot detect new partitions as data lands.

Apache Hive

Hive is a great choice as it is a general data interfacing language thanks to its well-designed Metastore and other related projects like HCatalog. Many other Hadoop applications like Pig, Spark, and Presto, etc. can leverage the schemas defined in Hive.

Moreover, external tables make Hive a great data definition language to define the data coming from different sources on S3, such as streaming data from Amazon Kinesis, log files from Amazon CloudWatch and AWS CloudTrail, or data ingested using other Hadoop applications like Sqoop or Flume.

To maximize the efficiency of data organization in Hive, you should leverage external tables and partitioning. By properly partitioning the data, you can largely reduce the amount of data needs to be retrieved and improve the efficiency during ETL or other types of analysis.

Solving the problem with AWS services

For many of the aforementioned services or applications, data is loaded periodically, as in one batch every 15 minutes. Because Hive external tables don’t pick up new partitions automatically, you need to update and add new partitions manually; this is difficult to manage at scale. A framework based on Lambda, DynamoDB, and S3 can assist with this challenge.

Architectural diagram

As data is ingested from different sources to S3, new partitions are added by this framework and become available in the predefined Hive external tables.


Simplify Management of Amazon Redshift Snapshots using AWS Lambda

Ian Meyers is a Solutions Architecture Senior Manager with AWS

Amazon Redshift is a fast, fully managed, petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using your existing business intelligence tools. A cluster is automatically backed up to Amazon S3 by default, and three automatic snapshots of the cluster are retained for 24 hours. You can also convert these automatic snapshots to ‘manual’, which means they are kept forever. Snapshots are incremental, so they only store the changes made since the last snapshot was taken, and are very space efficient.

You can restore manual snapshots into new clusters at any time, or you can use them to do table restores, without having to use any third-party backup/recovery software. (For an overview of how to build systems that use disaster recovery best practices, see the AWS white paper Using AWS for Disaster Recovery.)

When creating cluster backups for a production system, you must carefully consider two dimensions:

  • RTO: Recovery Time Objective. How long does it take to recover from  disaster recovery scenario?
  • RPO: Recovery Point Objective. When you have recovered, to what point in time will the system be consistent?

Recovery Time Objective

When using Amazon Redshift, your RTO is determined by the node type you are using, how many of those nodes you have, and the size of the data they store. It is vital that you practice restoration from snapshots created on the cluster to correctly determine Recovery Time Objective. It is also important that you re-test the restore performance any time you resize the cluster or your data volume changes significantly.

Recovery Point Objective

Automated backups are triggered based on a threshold of blocks changed or after a certain amount of time. For a cluster with minimal changes to data, a backup is taken after approximately every 8 hours. For a cluster which churns a massive amount of data, backups can be taken several times per hour. If you find that your data churn rate isn’t triggering automated backups at a frequency which satisfies your RPO then this utility can be leveraged to supplement the existing automated backups with additional manual snapshots in order to guarantee the targeted RPO.

What’s New?


Real-time in-memory OLTP and Analytics with Apache Ignite on AWS

Babu Elumalai is a Solutions Architect with AWS

Organizations are generating tremendous amounts of data, and they increasingly need tools and systems that help them use this data to make decisions. The data has both immediate value (for example, trying to understand how a new promotion is performing in real time) and historic value (trying to understand the month-over-month revenue of launched offers on a specific product).

The Lambda  architecture (not AWS Lambda) helps you gain insight into immediate and historic data by having a speed layer and a batch layer. You can use the speed layer for real-time insights and the batch layer for historical analysis.

In this post, we’ll walk through how to:

  1. Build a Lambda architecture using Apache Ignite
  2. Use Apache Ignite to perform ANSI SQL on real-time data
  3. Use Apache Ignite as a cache for online transaction processing (OLTP) reads

To illustrate these approaches, we’ll discuss a simple order-processing application. We will extend the architecture to implement analytics pipelines and then look at how to use Apache Ignite for real-time analytics.

A classic online application

Let’s assume that you’ve built a system to handle the order-processing pipeline for your organization. You have an immutable stream of order documents that are persisted in the OLTP data store. You use Amazon DynamoDB to store the order documents coming from the application.

Below is an example order payload for this system:

{'BillAddress': '5719 Hence Falls New Jovannitown  NJ 31939', 'BillCity': 'NJ', 'ShipMethod': '1-day', 'UnitPrice': 14, 'BillPostalCode': 31939, 'OrderQty': 1, 'OrderDate': 20160314050030, 'ProductCategory': 'Healthcare'}

{'BillAddress': '89460 Johanna Cape Suite 704 New Fionamouth  NV 71586-3118', 'BillCity': 'NV', 'ShipMethod': '1-hour', 'UnitPrice': 3, 'BillPostalCode': 71586, 'OrderQty': 1, 'OrderDate': 20160314050030, 'ProductCategory': 'Electronics'}

Here is example code that I used to generate sample order data like the preceding and write the sample orders  into DynamoDB.

The illustration following shows the current architecture for this example.


From SQL to Microservices: Integrating AWS Lambda with Relational Databases

Bob Strahan is a Senior Consultant with AWS Professional Services

AWS Lambda has emerged as excellent compute platform for modern microservices architecture, driving dramatic advancements in flexibility, resilience, scale and cost effectiveness. Many customers can take advantage of this transformational technology from within their existing relational database applications. In this post, we explore how to integrate your Amazon EC2-hosted Oracle or PostgreSQL database with AWS Lambda, allowing your database application to use a microservices architecture.

Here are a few reasons why you might find this capability useful:

  • Instrumentation: Use database triggers to call a Lambda function when important data is changed in the database. Your Lambda function can easily integrate with Amazon CloudWatch, allowing you to create custom metrics, dashboards and alarms based on changes to your data.
  • Outbound streaming: Again, use triggers to call Lambda when key data is modified. Your Lambda function can post messages to other AWS services such as Amazon SQS, Amazon SNS, Amazon SES, or Amazon Kinesis Firehose, to send notifications, trigger external workflows, or to push events and data to downstream systems, such as an Amazon Redshift data warehouse.
  • Access external data sources: Call Lambda functions from within your SQL code to retrieve data from external web services, read messages from Amazon Kinesis streams, query data from other databases, and more.
  • Incremental modernization: Improve agility, scalability, and reliability, and eliminate database vendor lock-in by evolving in steps from an existing monolithic database design to a well-architected, modern microservices approach. You can use a microservices architecture to migrate business logic embodied in database procedures into database-agnostic Lambda functions while preserving compatibility with remaining SQL packages.

I’ll revisit these scenarios in Part 2, but first you need to establish the interface that enables SQL code to invoke Lambda functions.