AWS Big Data Blog

Use Amazon Kinesis Data Streams to deliver real-time data to Amazon OpenSearch Service domains with Amazon OpenSearch Ingestion

In this post, we show how to use Amazon Kinesis Data Streams to buffer and aggregate real-time streaming data for delivery into Amazon OpenSearch Service domains and collections using Amazon OpenSearch Ingestion. You can use this approach for a variety of use cases, from real-time log analytics to integrating application messaging data for real-time search. In this post, we focus on the use case for centralizing log aggregation for an organization that has a compliance need to archive and retain its log data.

Kinesis Data Streams is a fully managed, serverless data streaming service that stores and ingests various streaming data in real time at any scale. For log analytics use cases, Kinesis Data Streams enhances log aggregation by decoupling producer and consumer applications, and providing a resilient, scalable buffer to capture and serve log data. This decoupling provides advantages over traditional architectures. As log producers scale up and down, Kinesis Data Streams can be scaled dynamically to persistently buffer log data. This prevents load changes from impacting an OpenSearch Service domain, and provides a resilient store of log data for consumption. It also allows for multiple consumers to process log data in real time, providing a persistent store of real-time data for applications to consume. This allows the log analytics pipeline to meet Well-Architected best practices for resilience (REL04-BP02) and cost (COST09-BP02).

OpenSearch Ingestion is a serverless pipeline that provides powerful tools for extracting, transforming, and loading data into an OpenSearch Service domain. OpenSearch Ingestion integrates with many AWS services, and provides ready-made blueprints to accelerate ingesting data for a variety of analytics use cases into OpenSearch Service domains. When paired with Kinesis Data Streams, OpenSearch Ingestion allows for sophisticated real-time analytics of data, and helps reduce the undifferentiated heavy lifting of creating a real-time search and analytics architecture.

Solution overview

In this solution, we consider a common use case for centralized log aggregation for an organization. Organizations might consider a centralized log aggregation approach for a variety of reasons. Many organizations have compliance and governance requirements that have stipulations for what data needs to be logged, and how long log data must be retained and remain searchable for investigations. Other organizations seek to consolidate application and security operations, and provide common observability toolsets and capabilities across their teams.

To meet such requirements, you need to collect data from log sources (producers) in a scalable, resilient, and cost-effective manner. Log sources may vary between application and infrastructure use cases and configurations, as illustrated in the following table.

Log Producer Example Example Producer Log Configuration
Application Logs AWS Lambda Amazon CloudWatch Logs
Application Agents FluentBit Amazon OpenSearch Ingestion
AWS Service Logs Amazon Web Application Firewall Amazon S3

The following diagram illustrates an example architecture.

You can use Kinesis Data Streams for a variety of these use cases. You can configure Amazon CloudWatch logs to send data to Kinesis Data Streams using a subscription filter (see Real-time processing of log data with subscriptions). If you send data with Kinesis Data Streams for analytics use cases, you can use OpenSearch Ingestion to create a scalable, extensible pipeline to consume your streaming data and write it to OpenSearch Service indexes. Kinesis Data Streams provides a buffer that can support multiple consumers, configurable retention, and built-in integration with a variety of AWS services. For other use cases where data is stored in Amazon Simple Storage Service (Amazon S3), or where an agent writes data such as FluentBit, an agent can write data directly to OpenSearch Ingestion without an intermediate buffer thanks to OpenSearch Ingestion’s built-in persistent buffers and automatic scaling.

Standardizing logging approaches reduces development and operational overhead for organizations. For example, you might standardize on all applications logging to CloudWatch logs when feasible, and also handle Amazon S3 logs where CloudWatch logs are unsupported. This reduces the number of use cases that a centralized team needs to handle in their log aggregation approach, and reduces the complexity of the log aggregation solution. For more sophisticated development teams, you might standardize on using FluentBit agents to write data directly to OpenSearch Ingestion to lower cost when log data doesn’t need to be stored in CloudWatch.

This solution focuses on using CloudWatch logs as a data source for log aggregation. For the Amazon S3 log use case, see Using an OpenSearch Ingestion pipeline with Amazon S3. For agent-based solutions, see the agent-specific documentation for integration with OpenSearch Ingestion, such as Using an OpenSearch Ingestion pipeline with Fluent Bit.

Prerequisites

Several key pieces of infrastructure used in this solution are required to ingest data into OpenSearch Service with OpenSearch Ingestion:

  • A Kinesis data stream to aggregate the log data from CloudWatch
  • An OpenSearch domain to store the log data

When creating the Kinesis data stream, we recommend starting with On-Demand mode. This will allow Kinesis Data Streams to automatically scale the number of shards needed for your log throughput. After you identify the steady state workload for your log aggregation use case, we recommend moving to Provisioned mode, using the number of shards identified in On-Demand mode. This can help you optimize long-term cost for high-throughput use cases.

In general, we recommend using one Kinesis data stream for your log aggregation workload. OpenSearch Ingestion supports up to 96 OCUs per pipeline, and 24,000 characters per pipeline definition file (see OpenSearch Ingestion quotas). This means that each pipeline can support a Kinesis data stream with up to 96 shards, because each OCU processes one shard. Using one Kinesis data stream simplifies the overall process to aggregate log data into OpenSearch Service, and simplifies the process for creating and managing subscription filters for log groups.

Depending on the scale of your log workloads, and the complexity of your OpenSearch Ingestion pipeline logic, you may consider more Kinesis data streams for your use case. For example, you may consider one stream for each major log type in your production workload. Having log data for different use cases separated into different streams can help reduce the operational complexity of managing OpenSearch Ingestion pipelines, and allows you to scale and deploy changes to each log use case separately when required.

To create a Kinesis Data Stream, see Create a data stream.

To create an OpenSearch domain, see Creating and managing Amazon OpenSearch domains.

Configure log subscription filters

You can implement CloudWatch log group subscription filters at the account level or log group level. In both cases, we recommend creating a subscription filter with a random distribution method to make sure log data is evenly distributed across Kinesis data stream shards.

Account-level subscription filters are applied to all log groups in an account, and can be used to subscribe all log data to a single destination. This works well if you want to store all your log data in OpenSearch Service using Kinesis Data Streams. There is a limit of one account-level subscription filter per account. Using Kinesis Data Streams as the destination also allows you to have multiple log consumers to process the account log data when relevant. To create an account-level subscription filter, see Account-level subscription filters.

Log group-level subscription filters are applied on each log group. This approach works well if you want to store a subset of your log data in OpenSearch Service using Kinesis Data Streams, and if you want to use multiple different data streams to store and process multiple log types. There is a limit of two log group-level subscription filters per log group. To create a log group-level subscription filter, see Log group-level subscription filters.

After you create your subscription filter, verify that log data is being sent to your Kinesis data stream. On the Kinesis Data Streams console, choose the link for your stream name.

Choose a shard with Starting position set as Trim horizon, and choose Get records.

You should see records with a unique Partition key column value and binary Data column. This is because CloudWatch sends data in .gzip format to compress log data.

Configure an OpenSearch Ingestion pipeline

Now that you have a Kinesis data stream and CloudWatch subscription filters to send data to the data stream, you can configure your OpenSearch Ingestion pipeline to process your log data. To begin, you create an AWS Identity and Access Management (IAM) role that allows read access to the Kinesis data stream and read/write access to the OpenSearch domain. To create your pipeline, your manager role that is used to create the pipeline will require iam:PassRole permissions to the pipeline role created in this step.

  1. Create an IAM role with the following permissions to read from your Kinesis data stream and access your OpenSearch domain:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "allowReadFromStream",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:DescribeStreamSummary",
                    "kinesis:GetRecords",
                    "kinesis:GetShardIterator",
                    "kinesis:ListShards",
                    "kinesis:ListStreams",
                    "kinesis:ListStreamConsumers",
                    "kinesis:RegisterStreamConsumer",
                    "kinesis:SubscribeToShard"
                ],
                "Resource": [
                    "arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{stream-name}}"
                ]
            },
            {
                "Sid": "allowAccessToOS",
                "Effect": "Allow",
                "Action": [
                    "es:DescribeDomain",
                    "es:ESHttp*"
                ],
                "Resource": [
                    "arn:aws:es:{region}:{account-id}:domain/{domain-name}",
                    "arn:aws:es:{region}:{account-id}:domain/{domain-name}/*"
                ]
            }
        ]
    }
  2. Give your role a trust policy that allows access from osis-pipelines.amazonaws.com:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "",
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "osis-pipelines.amazonaws.com"
                    ]
                },
                "Action": "sts:AssumeRole",
                "Condition": {
                    "StringEquals": {
                        "aws:SourceAccount": "{account-id}"
                    },
                    "ArnLike": {
                        "aws:SourceArn": "arn:aws:osis:{region}:{account-id}:pipeline/*"
                    }
                }
            }
        ]
    }

For a pipeline to write data to a domain, the domain must have a domain-level access policy that allows the pipeline role to access it, and if your domain uses fine-grained access control, then the IAM role needs to be mapped to a backend role in the OpenSearch Service security plugin that allows access to create and write to indexes.

  1. After you create your pipeline role, on the OpenSearch Service console, choose Pipelines under Ingestion in the navigation pane.
  2. Choose Create pipeline.
  3. Search for Kinesis in the blueprints, select the Kinesis Data Streams blueprint, and choose Select blueprint.
  4. Under Pipeline settings, enter a name for your pipeline, and set Max capacity for the pipeline to be equal to the number of shards in your Kinesis data stream.

If you’re using On-Demand mode for the data stream, choose a capacity equal to the current number of shards in the stream. This use case doesn’t require a persistent buffer, because Kinesis Data Streams provides a persistent buffer for the log data, and OpenSearch Ingestion tracks its position in the Kinesis data stream over time, preventing data loss on restarts.

  1. Under Pipeline configuration, update the pipeline source settings to use your Kinesis data stream name and pipeline IAM role Amazon Resource Name (ARN).

For full configuration information, see . For most configurations, you can use the default values. By default, the pipeline will write batches of 100 documents every 1 second, and will subscribe to the Kinesis data stream from the latest position in the stream using enhanced fan-out, checkpointing its position in the stream every 2 minutes. You can adjust this behavior as desired to tune how frequently the consumer checkpoints, where it begins in the stream, and use polling to reduce costs from enhanced fan-out.

  source:
    kinesis-data-streams:
      acknowledgments: true
      codec:
        # JSON codec supports parsing nested CloudWatch events into
        # individual log entries that will be written as documents to
        # OpenSearch
        json:
          key_name: "logEvents"
          # These keys contain the metadata sent by CloudWatch Subscription Filters
          # in addition to the individual log events:
          # https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
          include_keys: ['owner', 'logGroup', 'logStream' ]
      streams:
        # Update to use your Kinesis Stream name used in your Subscription Filters:
        - stream_name: "KINESIS_STREAM_NAME"
          # Can customize initial position if you don't want OSI to consume the entire stream:
          initial_position: "EARLIEST"
          # Compression will always be gzip for CloudWatch, but will vary for other sources:
          compression: "gzip"
      aws:
        # Provide the Role ARN with access to KDS. This role should have a trust relationship with osis-pipelines.amazonaws.com
        # This must be the same role used below in the Sink configuration.
        sts_role_arn: "PIPELINE_ROLE_ARN"
        # Provide the region of the Data Stream.
        region: "REGION"
  1. Update the pipeline sink settings to include your OpenSearch domain endpoint URL and pipeline IAM role ARN.

The IAM role ARN must be the same for both the OpenSearch Servicer sink definition and the Kinesis Data Streams source definition. You can control what data gets indexed in different indexes using the index definition in the sink. For example, you can use metadata about the Kinesis data stream name to index by data stream (${getMetadata("kinesis_stream_name")), or you can use document fields to index data depending on the CloudWatch log group or other document data (${path/to/field/in/document}). In this example, we use three document-level fields (data_stream.type, data_stream.dataset, and data_stream.namespace) to index our documents, and create these fields in our pipeline processor logic in the next section:

  sink:
    - opensearch:
        # Provide an AWS OpenSearch Service domain endpoint
        hosts: [ "OPENSEARCH_ENDPOINT" ]
        # Route log data to different target indexes depending on the log context:
        index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
        aws:
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          # This role must be the same as the role used above for Kinesis.
          sts_role_arn: "PIPELINE_ROLE_ARN"
          # Provide the region of the domain.
          region: "REGION"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: false

Finally, you can update the pipeline configuration to include processor definitions to transform your log data before writing documents to the OpenSearch domain. For example, this use case adopts Simple Schema for Observability (SS4O) and uses the OpenSearch Ingestion pipeline to create the desired schema for SS4O. This includes adding common fields to associate metadata with the indexed documents, as well as parsing the log data to make data more searchable. This use case also uses the log group name to identify different log types as datasets, and uses this information to write documents to different indexes depending on their use cases.

  1. Rename the CloudWatch event timestamp to mark the observed timestamp when the log was generated using the rename_keys processor, and add the current timestamp as the processed timestamp when OpenSearch Ingestion handled the record using the date processor:
      #  Processor logic is used to change how log data is parsed for OpenSearch.
      processor:
        - rename_keys:
            entries:
            # Include CloudWatch timestamp as the observation timestamp - the time the log
            # was generated and sent to CloudWatch:
            - from_key: "timestamp"
              to_key: "observed_timestamp"
        - date:
            # Include the current timestamp that OSI processed the log event:
            from_time_received: true
            destination: "processed_timestamp"
  2. Use the add_entries processor to include metadata about the processed document, including the log group, log stream, account ID, AWS Region, Kinesis data stream information, and dataset metadata:
        - add_entries:
            entries:
            # Support SS4O common log fields (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
            - key: "cloud/provider"
              value: "aws"
            - key: "cloud/account/id"
              format: "${owner}"
            - key: "cloud/region"
              value: "us-west-2"
            - key: "aws/cloudwatch/log_group"
              format: "${logGroup}"
            - key: "aws/cloudwatch/log_stream"
              format: "${logStream}"
            # Include default values for the data_stream:
            - key: "data_stream/namespace"
              value: "default"
            - key: "data_stream/type"
              value: "logs"
            - key: "data_stream/dataset"
              value: "general"
            # Include metadata about the source Kinesis message that contained this log event:
            - key: "aws/kinesis/stream_name"
              value_expression: "getMetadata(\"stream_name\")"
            - key: "aws/kinesis/partition_key"
              value_expression: "getMetadata(\"partition_key\")"
            - key: "aws/kinesis/sequence_number"
              value_expression: "getMetadata(\"sequence_number\")"
            - key: "aws/kinesis/sub_sequence_number"
              value_expression: "getMetadata(\"sub_sequence_number\")"
  3. Use conditional expression syntax to update the data_stream.dataset fields depending on the log source, to control what index the document is written to, and use the delete_entries processor to delete the original CloudWatch document fields that were renamed:
        - add_entries:
            entries:
            # Update the data_stream fields based on the log event context - in this case
            # classifying the log events by their source (CloudTrail or Lambda).
            # Additional logic could be added to classify the logs by business or application context:
            - key: "data_stream/dataset"
              value: "cloudtrail"
              add_when: "contains(/logGroup, \"cloudtrail\") or contains(/logGroup, \"CloudTrail\")"
              overwrite_if_key_exists: true
            - key: "data_stream/dataset"
              value: "lambda"
              add_when: "contains(/logGroup, \"/aws/lambda/\")"
              overwrite_if_key_exists: true
            - key: "data_stream/dataset"
              value: "apache"
              add_when: "contains(/logGroup, \"/apache/\")"
              overwrite_if_key_exists: true
        # Remove the default CloudWatch fields, as we re-mapped them to SS4O fields:
        - delete_entries:
            with_keys:
              - "logGroup"
              - "logStream"
              - "owner"
  4. Parse the log message fields to allow structured and JSON data to be more searchable in the OpenSearch indexes using the grok and parse_json

Grok processors use pattern matching to parse data from structured text fields. For examples of built-in Grok patterns, see java-grok patterns and dataprepper grok patterns.

    # Use Grok parser to parse non-JSON apache logs
    - grok:
        grok_when: "/data_stream/dataset == \"apache\""
        match:
          message: ['%{COMMONAPACHELOG_DATATYPED}']
        target_key: "http"
    # Attempt to parse the log data as JSON to support field-level searches in the OpenSearch index:
    - parse_json:
        # Parse root message object into aws.cloudtrail to match SS4O standard for SS4O logs
        source: "message"
        destination: "aws/cloudtrail"
        parse_when: "/data_stream/dataset == \"cloudtrail\""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for Lambda function logs - can also set up Grok support
        # for Lambda function logs to capture non-JSON logging function data as searchable fields
        source: "message"
        destination: "aws/lambda"
        parse_when: "/data_stream/dataset == \"lambda\""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for general logs
        source: "message"
        destination: "body"
        parse_when: "/data_stream/dataset == \"general\""
        tags_on_failure: ["json_parse_fail"]

When it’s all put together, your pipeline configuration will look like the following code:

version: "2"
kinesis-pipeline:
  source:
    kinesis-data-streams:
      acknowledgments: true
      codec:
        # JSON codec supports parsing nested CloudWatch events into
        # individual log entries that will be written as documents to
        # OpenSearch
        json:
          key_name: "logEvents"
          # These keys contain the metadata sent by CloudWatch Subscription Filters
          # in addition to the individual log events:
          # https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
          include_keys: ['owner', 'logGroup', 'logStream' ]
      streams:
        # Update to use your Kinesis Stream name used in your Subscription Filters:
        - stream_name: "KINESIS_STREAM_NAME"
          # Can customize initial position if you don't want OSI to consume the entire stream:
          initial_position: "EARLIEST"
          # Compression will always be gzip for CloudWatch, but will vary for other sources:
          compression: "gzip"
      aws:
        # Provide the Role ARN with access to KDS. This role should have a trust relationship with osis-pipelines.amazonaws.com
        # This must be the same role used below in the Sink configuration.
        sts_role_arn: "PIPELINE_ROLE_ARN"
        # Provide the region of the Data Stream.
        region: "REGION"
        
  #  Processor logic is used to change how log data is parsed for OpenSearch.
  processor:
    - rename_keys:
        entries:
        # Include CloudWatch timestamp as the observation timestamp - the time the log
        # was generated and sent to CloudWatch:
        - from_key: "timestamp"
          to_key: "observed_timestamp"
    - date:
        # Include the current timestamp that OSI processed the log event:
        from_time_received: true
        destination: "processed_timestamp"
    - add_entries:
        entries:
        # Support SS4O common log fields (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
        - key: "cloud/provider"
          value: "aws"
        - key: "cloud/account/id"
          format: "${owner}"
        - key: "cloud/region"
          value: "us-west-2"
        - key: "aws/cloudwatch/log_group"
          format: "${logGroup}"
        - key: "aws/cloudwatch/log_stream"
          format: "${logStream}"
        # Include default values for the data_stream:
        - key: "data_stream/namespace"
          value: "default"
        - key: "data_stream/type"
          value: "logs"
        - key: "data_stream/dataset"
          value: "general"
        # Include metadata about the source Kinesis message that contained this log event:
        - key: "aws/kinesis/stream_name"
          value_expression: "getMetadata(\"stream_name\")"
        - key: "aws/kinesis/partition_key"
          value_expression: "getMetadata(\"partition_key\")"
        - key: "aws/kinesis/sequence_number"
          value_expression: "getMetadata(\"sequence_number\")"
        - key: "aws/kinesis/sub_sequence_number"
          value_expression: "getMetadata(\"sub_sequence_number\")"
    - add_entries:
        entries:
        # Update the data_stream fields based on the log event context - in this case
        # classifying the log events by their source (CloudTrail or Lambda).
        # Additional logic could be added to classify the logs by business or application context:
        - key: "data_stream/dataset"
          value: "cloudtrail"
          add_when: "contains(/logGroup, \"cloudtrail\") or contains(/logGroup, \"CloudTrail\")"
          overwrite_if_key_exists: true
        - key: "data_stream/dataset"
          value: "lambda"
          add_when: "contains(/logGroup, \"/aws/lambda/\")"
          overwrite_if_key_exists: true
        - key: "data_stream/dataset"
          value: "apache"
          add_when: "contains(/logGroup, \"/apache/\")"
          overwrite_if_key_exists: true
    # Remove the default CloudWatch fields, as we re-mapped them to SS4O fields:
    - delete_entries:
        with_keys:
          - "logGroup"
          - "logStream"
          - "owner"
    # Use Grok parser to parse non-JSON apache logs
    - grok:
        grok_when: "/data_stream/dataset == \"apache\""
        match:
          message: ['%{COMMONAPACHELOG_DATATYPED}']
        target_key: "http"
    # Attempt to parse the log data as JSON to support field-level searches in the OpenSearch index:
    - parse_json:
        # Parse root message object into aws.cloudtrail to match SS4O standard for SS4O logs
        source: "message"
        destination: "aws/cloudtrail"
        parse_when: "/data_stream/dataset == \"cloudtrail\""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for Lambda function logs - can also set up Grok support
        # for Lambda function logs to capture non-JSON logging function data as searchable fields
        source: "message"
        destination: "aws/lambda"
        parse_when: "/data_stream/dataset == \"lambda\""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for general logs
        source: "message"
        destination: "body"
        parse_when: "/data_stream/dataset == \"general\""
        tags_on_failure: ["json_parse_fail"]

  sink:
    - opensearch:
        # Provide an AWS OpenSearch Service domain endpoint
        hosts: [ "OPENSEARCH_ENDPOINT" ]
        # Route log data to different target indexes depending on the log context:
        index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
        aws:
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          # This role must be the same as the role used above for Kinesis.
          sts_role_arn: "PIPELINE_ROLE_ARN"
          # Provide the region of the domain.
          region: "REGION"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: false
  1. When your configuration is complete, choose Validate pipeline to check your pipeline syntax for errors.
  2. In the Pipeline role section, optionally enter a suffix to create a unique service role that will be used to start your pipeline run.
  3. In the Network section, select VPC access.

For a Kinesis Data Streams source, you don’t need to select a virtual private cloud (VPC), subnets, or security groups. OpenSearch Ingestion only requires these attributes for HTTP data sources that are located within a VPC. For Kinesis Data Streams, OpenSearch Ingestion uses AWS PrivateLink to read from Kinesis Data Streams and write to OpenSearch domains or serverless collections.

  1. Optionally, enable CloudWatch logging for your pipeline.
  2. Choose Next to review and create your pipeline.

If you’re using account-level subscription filters for CloudWatch logs in the account where OpenSearch Ingestion is running, this log group should be excluded from the account-level subscription. This is because OpenSearch Ingestion pipeline logs could cause a recursive loop with the subscription filter that could lead to high volumes of log data ingestion and cost.

  1. In the Review and create section, choose Create pipeline.

When your pipeline enters the Active state, you’ll see logs begin to populate in your OpenSearch domain or serverless collection.

Monitor the solution

To maintain the health of the log ingestion pipeline, there are several key areas to monitor:

  • Kinesis Data Streams metrics – You should monitor the following metrics:
    • FailedRecords – Indicates an issue in CloudWatch subscription filters writing to the Kinesis data stream. Reach out to AWS Support if this metric stays at a non-zero level for a sustained period.
    • ThrottledRecords – Indicates your Kinesis data stream needs more shards to accommodate the log volume from CloudWatch.
    • ReadProvisionedThroughputExceeded – Indicates your Kinesis data stream has more consumers consuming read throughput than supplied by the shard limits, and you may need to move to an enhanced fan-out consumer strategy.
    • WriteProvisionedThroughputExceeded – Indicates your Kinesis data stream needs more shards to accommodate the log volume from CloudWatch, or that your log volume is being unevenly distributed to your shards. Make sure the subscription filter distribution strategy is set to random, and consider enabling enhanced shard-level monitoring on the data stream to identify hot shards.
    • RateExceeded – Indicates that a consumer is incorrectly configured for the stream, and there may be an issue in your OpenSearch Ingestion pipeline causing it to subscribe too often. Investigate your consumer strategy for the Kinesis data stream.
    • MillisBehindLatest – Indicates the enhanced fan-out consumer isn’t keeping up with the load in the data stream. Investigate the OpenSearch Ingestion pipeline OCU configuration and make sure there are sufficient OCUs to accommodate the Kinesis data stream shards.
    • IteratorAgeMilliseconds – Indicates the polling consumer isn’t keeping up with the load in the data stream. Investigate the OpenSearch Ingestion pipeline OCU configuration and make sure there are sufficient OCUs to accommodate the Kinesis data stream shards, and investigate the polling strategy for the consumer.
  • CloudWatch subscription filter metrics – You should monitor the following metrics:
    • DeliveryErrors – Indicates an issue in CloudWatch subscription filter delivering data to the Kinesis data stream. Investigate data stream metrics.
    • DeliveryThrottling – Indicates insufficient capacity in the Kinesis data stream. Investigate data stream metrics.
  • OpenSearch Ingestion metrics – For recommended monitoring for OpenSearch Ingestion, see Recommended CloudWatch alarms.
  • OpenSearch Service metrics – For recommended monitoring for OpenSearch Service, see Recommended CloudWatch alarms for Amazon OpenSearch Service.

Clean up

Make sure you clean up unwanted AWS resources created while following this post in order to prevent additional billing for these resources. Follow these steps to clean up your AWS account:

  1. Delete your Kinesis data stream.
  2. Delete your OpenSearch Service domain.
  3. Use the DeleteAccountPolicy API to remove your account-level CloudWatch subscription filter.
  4. Delete your log group-level CloudWatch subscription filter:
    1. On the CloudWatch console, select the desired log group.
    2. On the Actions menu, choose Subscription Filters and Delete all subscription filter(s).
  5. Delete the OpenSearch Ingestion pipeline.

Conclusion

In this post, you learned how to create a serverless ingestion pipeline to deliver CloudWatch logs in real time to an OpenSearch domain or serverless collection using OpenSearch Ingestion. You can use this approach for a variety of real-time data ingestion use cases, and add it to existing workloads that use Kinesis Data Streams for real-time data analytics.

For other use cases for OpenSearch Ingestion and Kinesis Data Streams, consider the following:

To continue improving your log analytics use cases in OpenSearch, consider using some of the pre-built dashboards available in Integrations in OpenSearch Dashboards.


About the authors

M Mehrtens has been working in distributed systems engineering throughout their career, working as a Software Engineer, Architect, and Data Engineer. In the past, M has supported and built systems to process terrabytes of streaming data at low latency, run enterprise Machine Learning pipelines, and created systems to share data across teams seamlessly with varying data toolsets and software stacks. At AWS, they are a Sr. Solutions Architect supporting US Federal Financial customers.

Arjun Nambiar is a Product Manager with Amazon OpenSearch Service. He focuses on ingestion technologies that enable ingesting data from a wide variety of sources into Amazon OpenSearch Service at scale. Arjun is interested in large-scale distributed systems and cloud-centered technologies, and is based out of Seattle, Washington.

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.