AWS Big Data Blog

Integrating MongoDB’s Application Data Platform with Amazon Kinesis Data Firehose

March 2023 – This post was reviewed and updated for accuracy.

February 9, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.

Amazon Kinesis Data Firehose now supports MongoDB ‘s application data platform, MongoDB Atlas, as one of its delivery destinations. This native integration between Kinesis Data Firehose and Atlas provides a managed, secure, scalable, and fault-tolerant delivery mechanism for customers into MongoDB’s integrated suite of cloud database and data services that accelerate and simplify how you build with data.

With the release of Kinesis Data Firehose HTTP endpoint delivery, you can now stream your data through Amazon Data Streams or directly push data to Kinesis Data Firehose and configure it to deliver data to MongoDB Atlas. You can also configure Kinesis Data Firehose to transform the data before delivering it to its destination. You don’t have to write applications and manage resources to read data and push to MongoDB. It’s all managed by AWS, making it easier to estimate costs for your data based on your data volume.

In this post, we discuss how to integrate Kinesis Data Firehose and MongoDB Atlas and demonstrate how to stream data from your source directly to your MongoDB database.

The following diagram depicts the overall architecture of the solution. We configure Kinesis Data Firehose to push the data to an event-driven serverless javascript function in MongoDB App Services. MongoDB App Services provides a fully managed, serverless backend with application services that help manage the flow of data between MongoDB Atlas and applications.  To connect to Kinesis Data Firehose, we use MongoDB App Services’ HTTPS endpoints which can execute the function when invoking the endpoint. The HTTPS endpoints parse the JSON message from Kinesis Data Firehose and inserts parsed records into the MongoDB Atlas database.

Integrating Kinesis Data Firehose and MongoDB Atlas

Kinesis Data Firehose is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, which minimizes the amount of storage used at the destination and increases security.

As part of Kinesis Data Firehose, you can transform your records before delivering them to the destination. In addition, Kinesis Data Firehose enables you to buffer data (based on size or time) before delivering to the final destination. In case of delivery failures, Kinesis Data Firehose can store your failed records in an Amazon Simple Storage Service (Amazon S3) bucket to prevent data loss.

MongoDB Atlas is a platform that can be used across a range of Online Transactional Processing (OLTP) and data analytics applications.  MongoDB Atlas allows developers to address popular use cases such as Internet of Things (IoT), Mobile Apps, Payments, Single View, Customer Data Management and many more.  In all of those cases, developers spend a significant amount of time delivering data to MongoDB Atlas from various data sources.  This integration significantly reduces the amount of development effort by leveraging Kinesis Data Firehose HTTP Endpoint integration to ingest data into MongoDB Atlas.

Creating a MongoDB App Services Application

  1. Log into your MongoDB cloud account. If you do not have an account you can sign up for a free account.
  2. Navigate to the App Services tab.
  3. Create a new App if you don’t have one, or select an existing App.
  4. Create an HTTPS endpoint on the App Services in MongoDB Atlas.
  5. Select HTTPS Endpoints.
  6. Click Add an Endpoint.
  7. For Route, enter an appropriate name for the endpoint.
  8. For Function click +New Function and provide an appropriate name.
  9. In the function editor, enter the following code:
    exports = function(payload, response) {
      
        const decodeBase64 = (s) => {
            var e={},i,b=0,c,x,l=0,a,r='',w=String.fromCharCode,L=s.length
            var A="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
            for(i=0;i<64;i++){e[A.charAt(i)]=i}
            for(x=0;x<L;x++){
                c=e[s.charAt(x)];b=(b<<6)+c;l+=6
                while(l>=8){((a=(b>>>(l-=8))&0xff)||(x<(L-2)))&&(r+=w(a))}
            }
            return r
        }
        
        var fullDocument = JSON.parse(payload.body.text());
        
        const firehoseAccessKey = payload.headers["X-Amz-Firehose-Access-Key"]
    
       // Check shared secret is the same to validate Request source
       if (firehoseAccessKey == context.values.get("MyValue")) {
     
    
          var collection = context.services.get("mongodb-atlas").db("kdf").collection("kdf-test");
          
          fullDocument.records.forEach((record) => {
                const document = JSON.parse(decodeBase64(record.data))
                const status = collection.insertOne(document);
                console.log("got status: "+ status)
          })
    
          response.setStatusCode(200)
                const s = JSON.stringify({
                    requestId: payload.headers['X-Amz-Firehose-Request-Id'][0],
                    timestamp: (new Date()).getTime()
                })
                response.addHeader(
                    "Content-Type",
                    "application/json"
                );
                response.setBody(s)
                console.log("response JSON:" + s)
          return
       } else {
        response.setStatusCode(500)
                response.setBody(JSON.stringify({
                    requestId: payload.headers['X-Amz-Firehose-Request-Id'][0],
                    timestamp: (new Date()).getTime(),
                    errorMessage: "Error authenticating"
                }))
        return
       }
    };
    Code

The preceding code is a simplified implementation of the HTTPS Endpoint. The HTTPS Endpoint inserts records one at a time and has abbreviated for readability error handling. For more information about the full implementation, see Using MongoDB Realm HTTPS Endpoints with Amazon Kinesis Data Firehose.

Make sure Atlas service name, database and collection correspond to your environment.  You can obtain that information by navigating to App services and then selecting Linked Data sources.

This HTTPS Endpoint uses the values and secrets of MongoDB App Services.

  1. Leave other options at their default.
  2. On the side navigation, choose Functions and click on the function that you just created. Set the Authentication to System.

As stated above, because the HTTPS Endpoint is decoding a secret in the function, we can remove the need for additional Application Authentication. To create the secret being used, we will need to create a secret in MongoDB App Services.

  1. In the App Services side navigation, choose Values and then Create New Value.
  2. Enter the Name and select Secrete, provide value and click Save.
  3. On the Values tab, choose Create New Value/Add a Value.  Enter the Value Name and select Link to Secret.  Then choose the secret you created earlier and save.

You can now use the secret in your HTTPS Endpoint function.

Creating a Kinesis Data Firehose delivery stream to MongoDB

  1. Log into AWS Console and search for Kinesis.
  2. On the Kinesis Data Firehose console, choose Create delivery stream.
  3. For Delivery stream name, enter a name.
  4. For Source, choose Direct PUT of other sources.
  5. For Destination, choose MongoDB Cloud.
  6. For MongoDB Realm’s HTTP Endpoint URL, please enter the URL of the MongoDB Atlas App Services HTTP endpoint that was created in MongoDB Atlas console.
  7. For API Key, please enter the secret value stored in MongoDB Atlas.
  8. For Content encoding, leave it as Disabled.
  9. For S3 backup mode, select Failed data only.
  10. For S3 bucket, enter the S3 bucket for delivery of log events that exceeded the retry duration. Alternatively, you can create a new bucket by choosing Create new.
  11. For MongoDB buffer conditions, accept the default MongoDB and Amazon S3 buffer conditions for your stream.  Note that the buffer size should be a value between 1MiB and 16MiB.  Review the limits in MongoDB Atlas documentation.
  12. In the Advanced settings section, configure permissions for your delivery stream by choosing Create or update IAM role.
  13. Choose Create delivery stream.

As part of HTTP endpoint integration, Kinesis Data Firehose only supports HTTPS endpoints. The server-side TLS/SSL certificate must be signed by a trusted Certificate Authority (CA) and is used for verification by Kinesis Data Firehose.

The body of the request that is delivered from Kinesis Data Firehose is a JSON document with the following schema:

"$schema": http://json-schema.org/draft-07/schema#

title: FirehoseCustomHttpsEndpointRequest
description: >
  The request body that the Firehose service sends to
  custom HTTPS endpoints.
type: object
properties:
  requestId:
    description: >
      Same as the value in the X-Amz-Firehose-Request-Id header,
      duplicated here for convenience.
    type: string
  timestamp:
    description: >
      The timestamp (milliseconds since epoch) at which the Firehose
      server generated this request.
    type: integer
  records:
    description: >
      The actual records of the Delivery Stream, carrying 
      the customer data.
    type: array
    minItems: 1
    maxItems: 10000
    items:
      type: object
      properties:
        data:
          description: >
            The data of this record, in Base64. Note that empty
            records are permitted in Firehose. The maximum allowed
            size of the data, before Base64 encoding, is 1024000
            bytes; the maximum length of this field is therefore
            1365336 chars.
          type: string
          minLength: 0
          maxLength: 1365336

required:
  - requestId
  - records
Code

The records are delivered as a collection based on BufferingHints configured on the Firehose delivery stream. The delivery-side service endpoint created on MongoDB Atlas App Services has to process these records one by one before inserting them into MongoDB collections or use the MongoDB Bulk APIs.

When Kinesis Data Firehose is set up with an HTTP endpoint destination to MongoDB Cloud, you can push data into Kinesis Data Firehose using Kinesis Agent or SDK from your application. Kinesis Data Firehose is also integrated with other AWS data sources such as Kinesis Data Streams, AWS IoT, Amazon CloudWatch Logs, and Amazon CloudWatch Events.

To test the integration, use the testing option on the Kinesis Data Firehose console and test with sample data. After the time configured in BufferingHints, log in to your Atlas platform and navigate to your Database/Collection to see the ingested records.

Conclusion

In this post, we showed how easy it is to ingest data into MongoDB Atlas using a Kinesis Data Firehose HTTP endpoint. This integration has many use cases.  For example you can stream Internet of Things (IoT) data directly into MongoDB Atlas platform with minimum code using AWS Kinesis Data Firehose HTTP endpoint integration.  Try MongoDB Atlas on AWS here.


About the Authors

Anusha Dharmalingam is a Solutions Architect at Amazon Web Services, with a passion for Application Development and Big Data solutions. Anusha works with enterprise customers to help them architect, build, and scale applications to achieve their business goals.

Igor Alekseev is a Partner Solution Architect at AWS in Data and Analytics. Igor works with strategic partners helping them build complex, AWS-optimized architectures. Prior joining AWS, as a Data/Solution Architect, he implemented many projects in Big Data, including several data lakes in the Hadoop ecosystem. As a Data Engineer, he was involved in applying AI/ML to fraud detection and office automation. Igor’s projects were in a variety of industries including communications, finance, public safety, manufacturing, and healthcare. Earlier, Igor worked as full stack engineer/tech lead.