AWS Big Data Blog

Integrating MongoDB’s Application Data Platform with Amazon Kinesis Data Firehose

Sept 2022 – This post has been updated for change in product name with updated screenshots.

Amazon Kinesis Data Firehose now supports MongoDB ‘s application data platform, MongoDB Atlas, as one of its delivery destinations. This native integration between Kinesis Data Firehose and Atlas provides a managed, secure, scalable, and fault-tolerant delivery mechanism for customers into MongoDB’s integrated suite of cloud database and data services that accelerate and simplify how you build with data.

With the release of Kinesis Data Firehose HTTP endpoint delivery, you can now stream your data through Amazon Data Streams or directly push data to Kinesis Data Firehose and configure it to deliver data to MongoDB Atlas. You can also configure Kinesis Data Firehose to transform the data before delivering it to its destination. You don’t have to write applications and manage resources to read data and push to MongoDB. It’s all managed by AWS, making it easier to estimate costs for your data based on your data volume.

In this post, we discuss how to integrate Kinesis Data Firehose and MongoDB Atlas and demonstrate how to stream data from your source directly to your MongoDB database.

The following diagram depicts the overall architecture of the solution. We configure Kinesis Data Firehose to push the data to an event-driven serverless javascript function in MongoDB Realm. MongoDB Realm provides a fully managed, serverless backend with application services that help manage the flow of data between MongoDB Atlas and applications.  To connect to Kinesis Data Firehose, we use MongoDB Realm’s HTTPS endpoints which can execute the function when invoking the endpoint. The HTTPS endpoints parse the JSON message from Kinesis Data Firehose and inserts parsed records into the MongoDB Atlas database.

Integrating Kinesis Data Firehose and MongoDB Atlas

Kinesis Data Firehose is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, which minimizes the amount of storage used at the destination and increases security.

As part of Kinesis Data Firehose, you can transform your records before delivering them to the destination. In addition, Kinesis Data Firehose enables you to buffer data (based on size or time) before delivering to the final destination. In case of delivery failures, Kinesis Data Firehose can store your failed records in an Amazon Simple Storage Service (Amazon S3) bucket to prevent data loss.

MongoDB Atlas is a platform that can be used across a range of Online Transactional Processing (OLTP) and data analytics applications.  MongoDB Atlas allows developers to address popular use cases such as Internet of Things (IoT), Mobile Apps, Payments, Single View, Customer Data Management and many more.  In all of those cases, developers spend a significant amount of time delivering data to MongoDB Atlas from various data sources.  This integration significantly reduces the amount of development effort by leveraging Kinesis Data Firehose HTTP Endpoint integration to ingest data into MongoDB Atlas.

Creating a MongoDB Realm Application

  1. Log into your MongoDB cloud account. If you do not have an account you can sign up for a free account.
  2. Navigate to the Realm tab on MongoDB Atlas.
  3. Go to Functions and paste the <<missing something here>>
  4. Create an HTTPS endpoint on the Realm tab in MongoDB Atlas.
  5. Click Add an Endpoint.
  6. For Route, enter an appropriate name for the endpoint.
  7. For Function click +New Function and provide an appropriate name.

  1. In the function editor, enter the following code:
exports = function(payload, response) {
  
    const decodeBase64 = (s) => {
        var e={},i,b=0,c,x,l=0,a,r='',w=String.fromCharCode,L=s.length
        var A="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
        for(i=0;i<64;i++){e[A.charAt(i)]=i}
        for(x=0;x<L;x++){
            c=e[s.charAt(x)];b=(b<<6)+c;l+=6
            while(l>=8){((a=(b>>>(l-=8))&0xff)||(x<(L-2)))&&(r+=w(a))}
        }
        return r
    }
    
    var fullDocument = JSON.parse(payload.body.text());
    
    const firehoseAccessKey = payload.headers["X-Amz-Firehose-Access-Key"]
    console.log('should be: ' + context.values.get("KDFH_SECRET_KEY"));
 
   // Check shared secret is the same to validate Request source
   if (firehoseAccessKey == context.values.get("KDFH_SECRET_KEY")) {
 

      var collection = context.services.get("Cluster0").db("kdf").collection("kdf-test");
      
      fullDocument.records.forEach((record) => {
            const document = JSON.parse(decodeBase64(record.data))
            const status = collection.insertOne(document);
            console.log("got status: "+ status)
      })

      response.setStatusCode(200)
            const s = JSON.stringify({
                requestId: payload.headers['X-Amz-Firehose-Request-Id'][0],
                timestamp: (new Date()).getTime()
            })
            response.addHeader(
                "Content-Type",
                "application/json"
            );
            response.setBody(s)
            console.log("response JSON:" + s)
      return
   } else {
    response.setStatusCode(500)
            response.setBody(JSON.stringify({
                requestId: payload.headers['X-Amz-Firehose-Request-Id'][0],
                timestamp: (new Date()).getTime()
                errorMessage: "Error authenticating"
            }))
    return
   }
};

The preceding code is a simplified implementation of the HTTPS Endpoint. The HTTPS Endpoint inserts records one at a time and has abbreviated for readability error handling. For more information about the full implementation, see Using MongoDB Realm HTTPS Endpoints with Amazon Kinesis Data Firehose.

This HTTPS Endpoint uses the values and secrets of MongoDB Realm.

  1. Leave other options at their default.
  2. On the Realm side navigation, choose Functions and click on the function that you just created. Set the Authentication to System.

    As stated above, because the HTTPS Endpoint is decoding a secret in the function, we can remove the need for additional Application Authentication. To create the secret being used, we will need to create a secret in Realm.
  3. On the Realm tab, choose Values & Secrets.

  1. On the Secrets tab, choose Create New Secret/Add a Secret.

  1. Enter the Secret Name and Secret Value and click save. The Secret Name entered here must be the same name used in HTTPS Endpoint code.

  1. On the Values tab, choose Create New Value/Add a Value.

  1. Enter the Value Name.
  2. For Value Type, select Secret.
  3. For Secret Name, choose the secret you created.

  1. Choose Save.

You can now use the secret in your HTTPS Endpoint function.

  1. Choose REVIEW & DEPLOY.

Creating a Kinesis Data Firehose delivery stream to MongoDB

  1. Log into AWS Console and search for Kinesis.
  2. On the Kinesis Data Firehose console, choose Create delivery stream.
  3. For Delivery stream name, enter a name.
  4. For Source, choose Direct PUT of other sources.
  5. Choose Next.

  1. On the Process records page, keep all settings at their default and choose Next.
  2. From the Third-party partner drop-down menu, choose MongoDB Cloud.

  1. For MongoDB Realm Webhooks HTTP Endpoint URL, please enter the URL of realm app that was created in MongoDB cloud console.
  2. For API Key, please enter the secret value stored in MongoDB secrets section.
  3. For Content encoding, leave it as Disabled.
  4. For S3 backup mode, select Failed data only.
  5. For S3 bucket, enter the S3 bucket for delivery of log events that exceeded the retry duration. Alternatively, you can create a new bucket by choosing Create new.
  6. Click on Next.
  7. For MongoDB buffer conditions, accept the default MongoDB and Amazon S3 buffer conditions for your stream.  Note that the buffer size should be a value between 1MiB and 16MiB.  Review the limits in MongoDB Atlas documentation.

  1. In the IAM role section, configure permissions for your delivery stream by choosing Create or update IAM role.
  2. Choose Next.
  3. Review your settings and choose Create delivery stream.

As part of HTTP endpoint integration, Kinesis Data Firehose only supports HTTPS endpoints. The server-side TLS/SSL certificate must be signed by a trusted Certificate Authority (CA) and is used for verification by Kinesis Data Firehose.

The body of the request that is delivered from Kinesis Data Firehose is a JSON document with the following schema:

"$schema": http://json-schema.org/draft-07/schema#

title: FirehoseCustomHttpsEndpointRequest
description: >
  The request body that the Firehose service sends to
  custom HTTPS endpoints.
type: object
properties:
  requestId:
    description: >
      Same as the value in the X-Amz-Firehose-Request-Id header,
      duplicated here for convenience.
    type: string
  timestamp:
    description: >
      The timestamp (milliseconds since epoch) at which the Firehose
      server generated this request.
    type: integer
  records:
    description: >
      The actual records of the Delivery Stream, carrying 
      the customer data.
    type: array
    minItems: 1
    maxItems: 10000
    items:
      type: object
      properties:
        data:
          description: >
            The data of this record, in Base64. Note that empty
            records are permitted in Firehose. The maximum allowed
            size of the data, before Base64 encoding, is 1024000
            bytes; the maximum length of this field is therefore
            1365336 chars.
          type: string
          minLength: 0
          maxLength: 1365336

required:
  - requestId
  - records

The records are delivered as a collection based on BufferingHints configured on the Firehose delivery stream. The delivery-side service endpoint created on MongoDB Realm has to process these records one by one before inserting them into MongoDB collections or use the MongoDB Bulk APIs.

When Kinesis Data Firehose is set up with an HTTP endpoint destination to MongoDB Cloud, you can push data into Kinesis Data Firehose using Kinesis Agent or SDK from your application. Kinesis Data Firehose is also integrated with other AWS data sources such as Kinesis Data Streams, AWS IoT, Amazon CloudWatch Logs, and Amazon CloudWatch Events.

To test the integration, use the testing option on the Kinesis Data Firehose console and test with sample data. After the time configured in BufferingHints, log in to your Atlas platform and navigate to your Database/Collection to see the ingested records.

Conclusion

In this post, we showed how easy it is to ingest data into MongoDB Atlas using a Kinesis Data Firehose HTTP endpoint. This integration has many use cases.  For example you can stream Internet of Things (IoT) data directly into MongoDB Atlas platform with minimum code using AWS Kinesis Data Firehose HTTP endpoint integration.  Try MongoDB Atlas on AWS here.


About the Authors

Anusha Dharmalingam is a Solutions Architect at Amazon Web Services, with a passion for Application Development and Big Data solutions. Anusha works with enterprise customers to help them architect, build, and scale applications to achieve their business goals.

Igor Alekseev is a Partner Solution Architect at AWS in Data and Analytics. Igor works with strategic partners helping them build complex, AWS-optimized architectures. Prior joining AWS, as a Data/Solution Architect, he implemented many projects in Big Data, including several data lakes in the Hadoop ecosystem. As a Data Engineer, he was involved in applying AI/ML to fraud detection and office automation. Igor’s projects were in a variety of industries including communications, finance, public safety, manufacturing, and healthcare. Earlier, Igor worked as full stack engineer/tech lead.