AWS Big Data Blog
Normalize data with Amazon Elasticsearch Service ingest pipelines
September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.
Amazon OpenSearch Service is a fully managed service that makes it easy for you to deploy, secure, and run Elasticsearch cost-effectively at scale. Search and log analytics are the two most popular use cases for Amazon OpenSearch Service. In log analytics at scale, a common pattern is to create indexes from multiple sources. In these use cases, how can you ensure that all the incoming data follows a specific, predefined format if it’s operationally not feasible to apply checks in each data source? You can use Elasticsearch ingest pipelines to normalize all the incoming data and create indexes with the predefined format.
What’s an ingest pipeline?
An ingest pipeline lets you use some of your Amazon OpenSearch Service domain processing power to apply to a set of processors during indexing. Ingest pipeline applies processors in order, the output of one processor moving to the next processor in the pipe. You define a pipeline with the Elasticsearch _ingest
API. The following screenshot illustrates this architecture.
To find the available ingest processors in your Amazon OpenSearch Service domain, enter the following code:
GET _ingest/pipeline/
Solution overview
In this post, we discuss three log analytics use cases where data normalization is a common technique.
We create three pipelines and normalize the data for each use case. The following diagram illustrates this architecture.
Use case 1
In this first use case, Amazon ES domain has three sources: logstash, Fluentd, and AWS Lambda. Your logstash source sends the data to an index with the name index-YYYY.MM.DD.HH
(hours in the end). When you have an error in the Fluentd source, it creates the index named index-YYYY.MM.DD
(missing the hours). Your domain creates indexes for both the formats, which is not what you intended.
One way to correct the index name is to calculate the hours of the ingested data and assign the value to the index. If you can’t identify any pattern, or identify further issues to the indexing name, you need to segregate the data to a different index (for example, format_error
) for further analysis.
Use case 2
If your application uses time-series data and analyzes data from fixed time windows, your data sources can sometimes send data from a prior time window. In this use case, you need to check for the incoming data and discard data that doesn’t fit in the current time window.
Use case 3
In some use cases, the value for a key can contain large strings with common prefixes. End-users typically use wild card characters (*
) with the prefix to search on these fields. If your application or Kibana dashboards contain several wild card queries, it can increase CPU utilization and overall search lateness. You can address this by identifying the prefixes from the values and creating a new field with the data type as a keyword. You can use Term
queries for the keywords and improve search performance.
Pipeline 1: pipeline_normalize_index
The default pipeline for incoming data is pipeline_normalize_index
. This pipeline performs the following actions:
- Checks if the incoming data belongs to the current date.
- Checks if the data has any errors in the index name.
- Segregates the data:
- If it doesn’t find any errors, it pushes the data to
pipeline_normalize_data
. - If it finds errors, it pushes the pipeline to
pipeline_fix_index
.
- If it doesn’t find any errors, it pushes the data to
Checking the index date
In this step, you can create an index pipeline using a script processor, which lets you create a script and execute within the pipeline.
Use the Set
processor to add _ingest.timestamp
to doc_received_date
and compare the index date to the document received date. The script processor lets you create a script using painless scripts. You can create a script to check if the index date matches the doc_received_date
. The script processor let you access the ingest document using the ctx
variable. See the following code:
Checking for index name errors
You can use the same script processor from the previous step to check if the index name matches the format index-YYYY.MM.DD.HH
or index-YYYY.MM.DD
. See the following code:
Segregating the data
If the index date doesn’t match the _ingest.timestamp
, you can drop the request using the drop
processor. If the index name doesn’t match the format index-YYYY.MM.DD
, you can segregate the data to pipeline pipeline_verify_index_date
and proceed to the pipeline pipeline_normalize_data
. If conditions aren’t met, you can proceed to the pipeline pipeline_indexformat_errors
or assign a default index indexing_errors
. If no are issues found, you proceed to the pipeline pipeline_normalize_data
. See the following code:
The following code is an example pipeline:
Pipeline 2: pipeline_normalize_data
The pipeline pipeline_normalize_data
fixes index data. It extracts the prefix from the defined field and creates a new field. You can use the new field for Term
queries.
In this step, you can use a grok
processor to extract prefixes from the existing fields and create a new field that you can use for term queries. The output of this pipeline creates the index. See the following code of an example pipeline:
Pipeline 3: pipeline_fix_index
This pipeline fixes the index name. The indexing errors identified in pipeline_normalize_Index
are the incoming data points for this pipeline. pipeline_fix_index
extracts the hours from the _ingest.timestamp
and appends it to the index name.
The index name errors identified from Pipeline 1 are the data source for this pipeline. You can use the script
processor to write a painless script. The script extracts hours (HH
) from the _ingest.timestamp
and appends it to the _index
. See the following code of the example pipeline:
Adding the default pipeline to the index template
After creating all the pipelines, add the default pipeline to the index template. See the following code:
"default_pipeline" : "pipeline_normalize_index"
Summary
You can normalize data, fix indexing errors, and segregate operation data and anomalies by using ingest pipelines. Although you can use one pipeline with several processors (depending on the use case), indexing pipelines provides an efficient way to utilize compute resources and operational resources by eliminating unwanted indexes.
About the Author
Vijay Injam is a Data Architect with Amazon Web Services.
Kevin Fallis is an AWS specialist search solutions architect. His passion at AWS is to help customers leverage the correct mix of AWS services to achieve success for their business goals. His after-work activities include family, DIY projects, carpentry, playing drums, and all things music.