AWS Big Data Blog

Amazon OpenSearch Ingestion 101: Set CloudWatch alarms for key metrics

Amazon OpenSearch Ingestion is a fully managed, serverless data pipeline that simplifies the process of ingesting data into Amazon OpenSearch Service and OpenSearch Serverless collections. Some key concepts include:

  • Source – Input component that specifies how the pipeline ingests the data. Each pipeline has a single source which can be either push-based and pull-based.
  • Processors – Intermediate processing units that can filter, transform, and enrich records before delivery.
  • Sink – Output component that specifies the destination(s) to which the pipeline publishes data. It can publish records to one or more destinations.
  • Buffer – It is the layer between the source and the sink. It serves as temporary storage for events, decoupling the source from the downstream processors and sinks. Amazon OpenSearch Ingestion also offers a persistent buffer option for push-based sources
  • Dead-letter queues (DLQs) – Configures Amazon Simple Storage Service (Amazon S3) to capture records that fail to write to the sink, enabling error handling and troubleshooting.

This end-to-end data ingestion service can help you collect, process, and deliver data to your OpenSearch environments without the need to manage underlying infrastructure.

This post provides an in-depth look at setting up Amazon CloudWatch alarms for OpenSearch Ingestion pipelines. It goes beyond our recommended alarms to help identify bottlenecks in the pipeline, whether that’s in the sink, the OpenSearch clusters data is being sent to, the processors, or the pipeline not pulling or accepting enough from the source. This post will help you proactively monitor and troubleshoot your OpenSearch Ingestion pipelines.

Overview

Monitoring your OpenSearch Ingestion pipelines is crucial for catching and addressing issues early. By understanding the key metrics and setting up the right alarms, you can proactively manage the health and performance of your data ingestion workflows. In the following sections, we provide details about alarm metrics for different sources, monitors, and sinks. The specific values for the threshold, period, and datapoints to alarm used for alarms can vary based on the individual use case and requirements.

Prerequisites

To create an OpenSearch Ingestion pipeline, refer to Creating Amazon OpenSearch Ingestion pipelines. For creating CloudWatch alarms, refer to Create a CloudWatch alarm based on a static threshold.

You can enable logging for OpenSearch Ingestion Pipeline, which captures various log messages during pipeline operations and ingestion activity, including errors, warnings, and informational messages. For details on enabling and monitoring pipeline logs, refer to Monitoring pipeline logs

Sources

The entry point of your pipeline is often where monitoring should begin. By setting appropriate alarms for source components, you can quickly identify ingestion bottlenecks or connection issues. The following table summarizes key alarm metrics for different sources.

Source Alarm Description Recommended Action
HTTP/ OpenTelemetry requestsTooLarge.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The request payload size of the client (data producer) is greater than the maximum request payload size, resulting in the status code HTTP 413. The default maximum request payload size is 10 MB for HTTP sources and 4 MB for OpenTelemetry sources. The limit for the HTTP sources can be increased for the pipelines with persistent buffer enabled. The chunk size for the client can be reduced so that the request payload doesn’t exceed the maximum size. You can examine the distribution of payload sizes of incoming requests using the payloadSize.sum metric.
HTTP requestsRejected.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The request was sent to the HTTP endpoint of the OpenSearch Ingestion pipeline by the client (data producer), but the request wasn’t accepted by the pipeline, and it rejected the request with the status code 429 in the response. For persistent issues, consider increasing the minimum OCUs for the pipeline to allocate additional resources for request processing.
Amazon S3 s3ObjectsFailed.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The pipeline is unable to read some objects from the Amazon S3 source. Refer to REF-003 in Reference Guide below.
Amazon DynamoDB Difference for totalOpenShards.max - activeShardsInProcessing.value
Threshold: >0
Statistic: Maximum (totalOpenShards.max) and Sum (activeShardsInProcessing.value)
Datapoints to Alarm: 3 out of 3.Additional Note: refer REF-004 for more details on configuring this specific alarm.
It monitors alignment between total open shards that should be processed by the pipeline and active shards currently in processing. The activeShardsInProcessing.value will go down periodically as shards close but should never misalign from ‘totalOpenShards.max’ for longer than a couple of minutes. If the alarm is triggered, you can consider stopping and starting the pipeline, this option resets the pipeline’s state, and the pipeline will restart with a new full export. It is non-destructive, so it does not delete your index or any data in DynamoDB. If you don’t create a fresh index before you do this, you might see a high number of errors from version conflicts because the export tries to insert older documents than the current _version in the index. You can safely ignore these errors. For root cause analysis on the misalignment, you can reach out to AWS Support
Amazon DynamoDB dynamodb.changeEventsProcessingErrors.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The number of processing errors for change events for a pipeline with stream processing for DynamoDB. If the metrics report increasing values, refer to REF-002 in Reference Guide below
Amazon DocumentDB documentdb.exportJobFailure.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The attempt to trigger an export to Amazon S3 failed. Review ERROR-level logs in the pipeline logs for entries beginning with “Received an exception during export from DocumentDB, backing off and retrying.” These logs contain the complete exception details indicating the root cause of the failure.
Amazon DocumentDB documentdb.changeEventsProcessingErrors.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The number of processing errors for change events for a pipeline with stream processing for Amazon DocumentDB. Refer to REF-002 in Reference Guide below
Kafka kafka.numberOfDeserializationErrors.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The OpenSearch Ingestion pipeline encountered deserialization errors while consuming a record from Kafka. Review WARN-level logs in the pipeline logs and verify serde_format is configured correctly in the pipeline configuration and the pipeline role has access to the AWS Glue Schema Registry (if used).
OpenSearch opensearch.processingErrors.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
Processing errors were encountered while reading from the index. Ideally, the OpenSearch Ingestion pipeline would retry automatically, but for unknown exceptions, it might skip processing. Refer to REF-001 or REF-002 in Reference Guide below, to get the exception details that resulted in processing errors.
Amazon Kinesis Data Streams kinesis_data_streams.recordProcessingErrors.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The OpenSearch Ingestion pipeline encountered an error while processing the records. If the metrics report increasing values, refer to REF-002 in Reference Guide below, which can help in identifying the cause.
Amazon Kinesis Data Streams kinesis_data_streams.acknowledgementSetFailures.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The pipeline encountered a negative acknowledgment while processing the streams, causing it to reprocess the stream. Refer to REF-001 or REF-002 in Reference Guide below.
Confluence confluence.searchRequestsFailed.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
While trying to fetch the content, the pipeline encountered the exception. Review ERROR-level logs in the pipeline logs for entries beginning with “Error while fetching content.” These logs contain the complete exception details indicating the root cause of the failure.
Confluence confluence.authFailures.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The number of UNAUTHORIZED exceptions received while establishing the connection Although the service should automatically renew tokens, if the metrics show an increasing value, review ERROR-level logs in the pipeline logs to identify why the token refresh is failing.
Jira jira.ticketRequestsFailed.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
While trying to fetch the issue, the pipeline encountered an exception. Review ERROR-level logs in the pipeline logs for entries beginning with “Error while fetching issue.” These logs contain the complete exception details indicating the root cause of the failure.
Jira jira.authFailures.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The number of UNAUTHORIZED exceptions received while establishing the connection. Although the service should automatically renew tokens, if the metrics show an increasing value, review ERROR-level logs in the pipeline logs to identify why the token refresh is failing.

Processors

The following table provides details about alarm metrics for different processors.

Processor Alarm Description Recommended Action
AWS Lambda aws_lambda_processor.recordsFailedToSentLambda.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
Some of the records could not be sent to Lambda. In the case of high values for this metric, refer to REF-002 in Reference Guide below.
AWS Lambda aws_lambda_processor.numberOfRequestsFailed.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The pipeline was unable to invoke the Lambda function. Although this situation should not occur under normal conditions, if it does, review Lambda logs and refer to REF-002 in Reference Guide below.
AWS Lambda aws_lambda_processor.requestPayloadSize.max
Threshold: >= 6292536
Statistic: MAXIMUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The payload size is exceeding the 6 MB limit, so the Lambda function can’t be invoked. Consider revisiting the batching thresholds in the pipeline configuration for the aws_lambda processor.
Grok grok.grokProcessingMismatch.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The incoming data doesn’t match the Grok pattern defined in the pipeline configuration. In the case of high values for this metric, review the Grok processor configurations and make sure the defined pattern matches according to the incoming data.
Grok grok.grokProcessingErrors.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The pipeline encountered an exception when extracting the information from the incoming data according to the defined Grok pattern. In the case of high values for this metric, refer to REF-002 in Reference Guide below.
Grok grok.grokProcessingTime.max
Threshold: >= 1000
Statistic: MAXIMUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The maximum amount of time that each individual record takes to match against patterns from the match configuration option. If the time taken is equal to or more than 1 second, check the incoming data and the Grok pattern. The maximum amount of time during which matching occurs is 30,000 milliseconds, which is controlled by the timeout_millis parameter.

Sinks and DLQs

The following table contains details about alarm metrics for different sinks and DLQs.

Sink Alarm Description Recommended Action
OpenSearch opensearch.bulkRequestErrors.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The number of errors encountered while sending a bulk request. Refer to REF-002 in Reference Guide below which can help to identify the exception details.
OpenSearch opensearch.bulkRequestFailed.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The number of errors received after sending the bulk request to the OpenSearch domain. Refer to REF-001 in Reference Guide below which can help to identify the exception details.
Amazon S3 s3.s3SinkObjectsFailed.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
The OpenSearch Ingestion pipeline encountered a failure while writing the object to Amazon S3. Verify that the pipeline role has the necessary permissions to write objects to the specified S3 key. Review the pipeline logs to identify the specific keys where failures occurred.
Monitor the s3.s3SinkObjectsEventsFailed.count metric for granular details on the number of failed write operations.
Amazon S3 DLQ s3.dlqS3RecordsFailed.count
Threshold: >0
Statistic: SUM
Period: 5 minutes
Datapoints to alarm: 1 out 1
For a pipeline with DLQ enabled, the records are either sent to the sink or to the DLQ (if they are unable to send to the sink). This alarm indicates the pipeline was unable to send the records to the DLQ due to some error. Refer to REF-002 in Reference Guide below which can help to identify the exception details.

Buffer

The following table contains details about alarm metrics for buffers.

Buffer Alarm Description Recommended Action
BlockingBuffer BlockingBuffer.bufferUsage.value
Threshold: >80
Statistic: AVERAGE
Period: 5 minutes
Datapoints to alarm: 1 out 1
The percent usage, based on the number of records in the buffer. To investigate further, check if the Pipeline is bottlenecked due to processors or sink by comparing timeElapsed.max metrics and analyzing bulkRequestLatency.max
Persistent persistentBufferRead.recordsLagMax.value
Threshold: > 5000
Statistic: AVERAGE
Period: 5 minutes
Datapoints to alarm: 1 out 1
The maximum lag in terms of number of records stored in the persistent buffer. If the value for bufferUsage is low, increase the maximum OCUs. If bufferUsage is also high [>80], investigate if pipeline is bottlenecked by processors or sink.

Reference Guide

The following provide guidance for resolving common pipeline issues along with general reference.

REF-001: WARN-level Log Review

Review WARN-level logs in the pipeline logs to identify the exception details.

REF-002: ERROR-level Log Review

Review ERROR-level logs in the pipeline logs to identify the exception details.

REF-003: S3 Objects Failed

When troubleshooting increasing s3ObjectsFailed.count values, monitor these specific metrics to narrow down the root cause:

  • s3ObjectsAccessDenied.count – This metric increments when the pipeline encounters Access Denied or Forbidden errors while reading S3 objects. Common causes include:
  • Insufficient permissions in the pipeline role.
  • Restrictive S3 bucket policy not allowing the pipeline role access.
  • For cross-account S3 buckets, incorrectly configured bucket_owners mapping.
  • s3ObjectsNotFound.count – This metric increments when the pipeline receives Not Found errors while attempting to read S3 objects.

For further assistance with the recommended actions, contact AWS support.

REF-004: Configuring Alarm for difference in totalOpenShards.max and activeShardsInProcessing.value for Amazon DynamoDB source.

  1. Open the CloudWatch console at https://console.aws.amazon.com/cloudwatch/.
  2. In the navigation pane, choose Alarms, All alarms.
  3. Choose Create alarm.
  4. Choose Select Metric.
  5. Select Source.
  6. In source, following JSON can be used after updating the <sub-pipeline-name>, <pipeline-name> and <region>.
    {
        "metrics": [
            [ { "expression": "m1-e1", "label": "Expression2", "id": "e2", "period": 900 } ],
            [ { "expression": "FLOOR((m2/15)+0.5)", "label": "Expression1", "id": "activeShardsInProcessing", "visible": false, "period": 900 } ],
            [ "AWS/OSIS", "<sub-pipeline-name>.dynamodb.totalOpenShards.max", "PipelineName", "<pipeline-name>", { "stat": "Maximum", "id": "m1", "visible": false } ],
            [ ".", "<sub-pipeline name>.dynamodb.activeShardsInProcessing.value", ".", ".", { "stat": "Average", "id": "m2", "visible": false } ]
        ],
        "view": "timeSeries",
        "stacked": false,
        "period": 900,
        "region": "<region>"
    }

Let’s review couple of scenarios based on the above metrics.

Scenario 1 – Understand and Lower Pipeline Latency

Latency within a pipeline is built up of three main components:

  • The time it takes to send documents via bulk requests to OpenSearch,
  • the time it takes for data to go through the pipeline processors, and
  • the time that data sits in the pipeline buffer

Bulk requests and processors (last two items in the previous list) are the root causes for why the buffer builds up and leads to latency.

To monitor how much data is being stored in the buffer, monitor the bufferUsage.value metric. The only way to lower latency within the buffer is to optimize the pipeline processors and sink bulk request latency, depending on which of those is the bottleneck.

The bulkRequestLatency metric measures the time taken to execute bulk requests, including retries, and can be used to monitor write performance to the OpenSearch sink. If this metric reports an unusually high value, it indicates that the OpenSearch sink may be overloaded, causing increased processing time. To troubleshoot further, review the bulkRequestNumberOfRetries.count metric to confirm whether the high latency is due to rejections from OpenSearch that are leading to retries, such as throttling (429 errors) or other reasons. If document errors are present, examine the configured DLQ to identify the failed document details. Additionally, the max_retries parameter can be configured in the pipeline configuration to limit the number of retries. However, if the documentErrors metric reports zero, the bulkRequestNumberOfRetries.count is also zero, and the bulkRequestLatency remains high, it is likely an indicator that the OpenSearch sink is overloaded. In this case, review the destination metrics for additional details.

If the bulkRequestLatency metric is low (for example, less than 1.5 seconds) and the bulkRequestNumberOfRetries metric is reported as 0, then the bottleneck is likely within the pipeline processors. To monitor the performance of the processors, review the <processorName>.timeElapsed.avg metric. This metric reports the time taken for the processor to complete processing of a batch of records. For example, if a grok processor is reporting a much higher value than other processors for timeElapsed, it may be due to a slow grok pattern that can be optimized or even replaced with a more performant processor, depending on the use case.

Scenario 2 – Understanding and Resolving Document Errors to OpenSearch

The documentErrors.count metric tracks the number of documents that failed to be sent by bulk requests. The failure can happen due to various reasons such as mapping conflicts, invalid data formats, or schema mismatches. When this metric reports a non-zero value, it indicates that some documents are being rejected by OpenSearch. To identify the root cause, examine the configured Dead Letter Queue (DLQ), which captures the failed documents along with error details. The DLQ provides information about why specific documents failed, enabling you to identify patterns such as incorrect field types, missing required fields, or data that exceeds size limits. For example, find the sample DLQ objects for common issues below:

Mapper parsing exception:

{"dlqObjects": [{
        "pluginId": "opensearch",
        "pluginName": "opensearch",
        "pipelineName": "<PipelineName>",
        "failedData": {
            "index": "<IndexName>",
            "indexId": null,
            "status": 400,
            "message": "failed to parse field [<fieldname>] of type [integer] in document with id '<DocumentId>'. Preview of field's value: 'N/A' caused by For input string: \"N/A\"",
            "document": {<OriginalDocument>}
        },
        "timestamp": "…"
    }]}

Here, OpenSearch cannot store the text string “N/A” in a field that is only for numbers, so it rejects the document and stores it in the DLQ.

Limit of total fields exceeded:

{"dlqObjects": [{
        "pluginId": "opensearch",
        "pluginName": "opensearch",
        "pipelineName": "<PipelineName>",
        "failedData": {
            "index": "<IndexName>",
            "indexId": null,
            "status": 400,
            "message": "Limit of total fields [<field limit>] has been exceeded",
            "document": {<OriginalDocument>}
        },
        "timestamp": "…"
    }]}

The index.mapping.total_fields.limit setting is the parameter that controls the maximum number of fields allowed in an index mapping, and exceeding this limit will cause indexing operations to fail. You can check if all those fields are required or leverage various processors provided by OpenSearch Ingestion to transform the data.

Once these issues are identified, you can either correct the source data, adjust the pipeline configuration to transform the data appropriately, or modify the OpenSearch index mapping to accommodate the incoming data format.

Clean up

When setting up alarms for monitoring your OpenSearch Ingestion pipelines, it’s important to be mindful of the potential costs involved. Each alarm you configure will incur charges based on the CloudWatch pricing model.

To avoid unnecessary expenses, we recommend carefully evaluating your alarm requirements and configuring them accordingly. Only set up the alarms that are essential for your use case, and regularly review your alarm configurations to identify and remove unused or redundant alarms.

Conclusion

In this post, we explored the comprehensive monitoring capabilities for OpenSearch Ingestion pipelines through CloudWatch alarms, covering key metrics across various sources, processors, and sinks. Although this post highlights the most critical metrics, there’s more to discover. For a deeper dive, refer to the following resources:

Effective monitoring through CloudWatch alarms is crucial for maintaining healthy ingestion pipelines and maintaining optimal data flow.


About the authors

Utkarsh Agarwal

Utkarsh Agarwal

Utkarsh is a Cloud Support Engineer in the Support Engineering team at AWS. He provides guidance and technical assistance to customers, helping them build scalable, highly available, and secure solutions in the AWS Cloud. In his free time, he enjoys watching movies, TV series, and of course cricket! Lately, he is also attempting to master foosball.

Ramesh Chirumamilla

Ramesh Chirumamilla

Ramesh is a Technical Manager with Amazon Web Services. In his role, Ramesh works proactively to help craft and execute strategies to drive customers’ adoption and use of AWS services. He uses his experience working with Amazon OpenSearch Service to help customers cost-optimize their OpenSearch domains by helping them right-size and implement best practices.

Taylor Gray

Taylor Gray

Taylor is a Software Engineer in the Amazon OpenSearch Ingestion team at Amazon Web Services. He has contributed many features within both Data Prepper and OpenSearch Ingestion to enable scalable solutions for customers. In his free time, he enjoys pickle ball, reading, and playing Rocket League.