AWS Compute Blog

Amazon Kinesis Firehose Data Transformation with AWS Lambda

February 12, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.


September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.


This post is written by Shiva Narayanaswamy, Solution Architect.

Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, or Amazon Elasticsearch Service (Amazon ES). You configure your data producers to send data to Firehose and it automatically delivers the data to the specified destination. You can send data to your delivery stream using the Amazon Kinesis Agent or the Firehose API, using the AWS SDK.

Customers have told us that they want to perform light preprocessing or mutation of the incoming data stream before writing it to the destination. Other use cases might include normalizing data produced by different producers, adding metadata to the record, or converting incoming data to a format suitable for the destination. At the moment, customers deliver data to an intermediate destination, such as a S3 bucket, and use S3 event notification to trigger a Lambda function to perform the transformation before delivering it to the final destination.

In this post, I introduce data transformation capabilities on your delivery streams, to seamlessly transform incoming source data and deliver the transformed data to your destinations.

Introducing Firehose Data Transformations

With the Firehose data transformation feature, you can now specify a Lambda function that can perform transformations directly on the stream, when you create a delivery stream.

When you enable Firehose data transformation, Firehose buffers incoming data and invokes the specified Lambda function with each buffered batch asynchronously. The transformed data is sent from Lambda to Firehose for buffering and then delivered to the destination. You can also choose to enable source record backup, which back up all untransformed records to your S3 bucket concurrently while delivering transformed records to the destination.

To get you started, we provide the following Lambda blueprints, which you can adapt to suit your needs:

  • Apache Log to JSON
  • Apache Log to CSV
  • Syslog to JSON
  • Syslog to CSV
  • General Firehose Processing

Setting up Firehose Data Transformation

Now I’m going to walk you through the setup of a Firehose stream with data transformation.

In the Firehose console, create a new delivery stream with an existing S3 bucket as the destination.

alt text

In the Configuration section, enable data transformation, and choose the generic Firehose processing Lambda blueprint, which takes you to the Lambda console.

alt text

Edit the code inline, and paste the following Lambda function, which I’m using to demonstrate the Firehose data transformation feature. Choose a timeout of 5 minutes. This function matches the records in the incoming stream to a regular expression. On match, it parses the JSON record. The function then does the following:

  • Picks only the RETAIL sector and drops the rest (filtering)
  • Adds a TIMESTAMP to the record (mutation)
  • Converts from JSON to CSV (transformation)
  • Passes the processed record back into the stream for delivery
'use strict';
console.log('Loading function');

/* Stock Ticker format parser */
const parser = /^\{\"TICKER_SYMBOL\"\:\"[A-Z]+\"\,\"SECTOR\"\:"[A-Z]+\"\,\"CHANGE\"\:[-.0-9]+\,\"PRICE\"\:[-.0-9]+\}/;

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    let dropped = 0; // Number of dropped entries 

    /* Process the list of records and transform them */
    const output = event.records.map((record) => {

        const entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let match = parser.exec(entry);
        if (match) {
            let parsed_match = JSON.parse(match); 
            var milliseconds = new Date().getTime();
            /* Add timestamp and convert to CSV */
            const result = `${milliseconds},${parsed_match.TICKER_SYMBOL},${parsed_match.SECTOR},${parsed_match.CHANGE},${parsed_match.PRICE}`+"\n";
            const payload = (new Buffer(result, 'utf8')).toString('base64');
            if (parsed_match.SECTOR != 'RETAIL') {
                /* Dropped event, notify and leave the record intact */
                dropped++;
                return {
                    recordId: record.recordId,
                    result: 'Dropped',
                    data: record.data,
                };
            }
            else {
                /* Transformed event */
                success++;  
                return {
                    recordId: record.recordId,
                    result: 'Ok',
                    data: payload,
                };
            }
        }
        else {
            /* Failed event, notify the error and leave the record intact */
            console.log("Failed event : "+ record.data);
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
        /* This transformation is the "identity" transformation, the data is left intact 
        return {
            recordId: record.recordId,
            result: 'Ok',
            data: record.data,
        } */
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

In the Firehose console, choose the newly created Lambda function. Enable source record backup, and choose the same S3 bucket and an appropriate prefix. Firehose delivers the raw data stream to this bucket under this prefix.

alt text

Choose a S3 buffer size of 1 MB, and a buffer interval of 60 seconds. Create a Firehose Delivery IAM role.

alt text

Review the configuration and create the Firehose delivery stream.

Testing Firehose Data Transformation

You can use the AWS Management Console to ingest simulated stock ticker data. The console runs a script in your browser to put sample records in your Firehose delivery stream. This enables you to test the configuration of your delivery stream without having to generate your own test data. The following is an example from the simulated data:

{"TICKER_SYMBOL":"QXZ","SECTOR":"HEALTHCARE","CHANGE":-0.05,"PRICE":84.51}
{"TICKER_SYMBOL":"TGT","SECTOR":"RETAIL","CHANGE":2.14,"PRICE":68.26}

To test the Firehose data transformation, the Lambda function created in the previous section adds a timestamp to the records, and delivers only the stocks from the “RETAIL” sector. This test demonstrates the ability to add metadata to the records in the incoming stream, and also filtering the delivery stream.

Choose the newly created Firehose delivery stream, and choose Test with demo data, Start sending demo data.

alt text

Firehose provides CloudWatch metrics about the delivery stream. Additional metrics to monitor the data processing feature are also now available.

alt text

The destination S3 bucket does not contain the prefixes with the source data backup, and the processed stream. Download a file of the processed data, and verify that the records contain the timestamp and the “RETAIL” sector data, as follows:

1483504691599,ABC,RETAIL,0.92,21.28
1483504691600,TGT,RETAIL,-1.2,61.89
1483504691600,BFH,RETAIL,-0.79,15.86
1483504691600,MJN,RETAIL,-0.27,129.37
1483504691600,WMT,RETAIL,-2.4,76.39

Conclusion

With the Firehose data transformation feature, you now have a powerful, scalable way to perform data transformations on streaming data. You can create a data lake with the raw data, and simultaneously transform data to be consumed in a suitable format by a Firehose destination.

For more information about Firehose, see the Amazon Kinesis Firehose Developer Guide.

If you have any questions or suggestions, please comment below.