AWS Big Data Blog

View summarized data with Amazon OpenSearch Service Index Transforms

Amazon OpenSearch Service recently announced support for Index Transforms. You can use Index Transforms to extract meaningful information from an existing index, and store the aggregated information in a new index. The key benefit of Index Transforms is faster retrieval of data by performing aggregations, grouping in advance, and storing those results in summarized views. For example, you can run continuous aggregations on ecommerce order data to summarize and learn the spending behaviors of your customers. With Index Transforms, you have the flexibility to select specific fields from the source index. You can also run Index Transform jobs on indices that don’t have a timestamp field.

There are two ways to configure Index Transform jobs: by using the OpenSearch Dashboards UI or index transform REST APIs.  In this post, we discuss these two methods and share some best practices.

Use the OpenSearch Dashboards UI

To configure an Index Transform job in the Dashboards UI, first identify the source index you want to transform. You can also use sample ecommerce orders data available on the OpenSearch Dashboards home page.

  1. After you log into Kibana Dashboards, choose Home in the navigation pane, then choose Add sample data.
  2. Choose Add Data to create a sample index (for example, opensearch_dashboards_sample_data_ecommerce).
  3. Launch OpenSearch Dashboards and on the menu bar, choose Index Management.
  4. Choose Transform Jobs in the navigation pane.
  5. Choose Create Transform Job.
  6. Specify the Index Transform job name and select the recently created sample ecommerce index as the source.
  7. Choose an existing index or create a new one when selecting the target index.
  8. Choose Edit data filter, you have an option to run transformations only on the filtered data. For this post, we run transformations on products sold more than 10 times but less than 200.
  9. Choose Next.

The sample ecommerce source index has over 50 fields. We only want to select the fields that are relevant to tracking the sales data by product category.

  1. Select the fields category.keyword, total_quantity, and products.price. Index transform wizard allows to filter specific fields of interest, and then select transform operations on these selected fields.
  2. Because we want to aggregate by product category, choose the plus sign next to the field category.keyword and choose Group by terms.
  3. Similarly, choose Aggregate by max, min, avg for the products.price field and Aggregate by sum for the total_quantity field.

Index transform wizard provides preview capability of transformed fields on sample data for quick review. Additionally, you can also edit the transformed field names in favor of more descriptive names.

Currently, Index Transform jobs support histogram, date_histogram, and terms groupings. For more information about groupings, see Bucket aggregations. For metrics aggregations, you can choose from sum, avg, max, min, value_count, percentiles, and scripted_metric.

Scripted metrics can be useful when you need to calculate a value based on an existing attribute of the document. For example, finding a latest follower count on a continuous social feed or finding the customer who placed the first order over certain amount on a particular day. Scripted metrics can be coded in painless scripts —simple, secure scripting language designed specifically for use with search platforms.

The following is the example script to find the first customer who placed an order valued more than $100.

{
   "init_script": "state.timestamp_earliest = -1l; state.order_total = 0l; state.customer_id = ''",
   "map_script": "if (!doc['order_date'].empty) { def current_date = doc['order_date'].getValue().toInstant().toEpochMilli(); def order_total = doc['taxful_total_price'].getValue(); if ((current_date < state.timestamp_earliest && order_total >= 100) || (state.timestamp_earliest == -1 && order_total >= 100)) { state.timestamp_earliest = current_date; state.customer_id = doc['customer_id'].getValue();}}",
   "combine_script": "return state",
   "reduce_script": "def customer_id = ''; def earliest_timestamp = -1L;for (s in states) { if (s.timestamp_earliest < earliest_timestamp || earliest_timestamp == -1){ earliest_timestamp = s.timestamp_latest; customer_id = s.customer_id;}} return customer_id"
}

Scripted metrics run in four phases:

  • Initialize phase (init_script) – Optional initialization phase where shard level variables can be initialized.
  • Map phase (map_script) – Runs the code on each collected document.
  • Combine phase (combine_script) – Returns the results from all shards ornodes to the coordinator node.
  • Reduce phase (reduce_script) – Produces the final result by processing the results from all shards.

If your use case involves multiple complex scripted metrics calculations, plan to perform calculations prior to ingesting data into the OpenSearch Service domain.

  1. In the last step, specify the schedule for the Index Transform job, for example every 12 hours.
  2. On the Advanced tab, you can modify the pages per run.

This setting indicates the data that can be processed in each search request. Raising this number can increase the memory utilization and lead to higher latency. We recommend using the default setting (1000 pages per run).

  1. Review all the selection and choose Create to schedule the Index Transform job.

Index Transform jobs are enabled by default and run based on a selected schedule.  Choose Refresh to view the status of the Index Transform job.

After the job runs successfully, you can view the details around the number of documents processed, and the time taken to index and search the data.

You can also view the target index contents using the _search API using the OpenSearch Dev Tools console.

Use REST APIs

Index Transform APIs can also be used to create, update, start, and stop Index Transform job operations. For example, refer Create Transform API to create Index Transform job to execute every minute. Index Transform API provides flexibility to customize the job interval to meet your specific requirements.

Use the following API to get details of your scheduled Index Transform job:

GET _plugins/_transform/kibana_ecommerce_transform_job

To preview results of a previously run Index Transform job:

GET _plugins/_transform/kibana_ecommerce_transform_job/_explain

We get the following response from our API call:

{
  "kibana_ecommerce_transform_job" : {
    "metadata_id" : "uA45cToY8nOCsVSyCZs2yA",
    "transform_metadata" : {
      "transform_id" : "kibana_ecommerce_transform_job",
      "last_updated_at" : 1633987988049,
      "status" : "finished",
      "failure_reason" : null,
      "stats" : {
        "pages_processed" : 2,
        "documents_processed" : 7409,
        "documents_indexed" : 6,
        "index_time_in_millis" : 56,
        "search_time_in_millis" : 7
      }
    }
  }
}

To delete an existing Index Transform job, disable the job and then issue the Delete API:

POST _plugins/_transform/kibana_ecommerce_transform_job/_stop

DELETE _plugins/_transform/kibana_ecommerce_transform_job

Response:
{
  "took" : 12,
  "errors" : false,
  "items" : [
    {
      "delete" : {
        "_index" : ".opendistro-ism-config",
        "_type" : "_doc",
        "_id" : "kibana_ecommerce_transform_job",
        "_version" : 3,
        "result" : "deleted",
        "forced_refresh" : true,
        "_shards" : {
          "total" : 2,
          "successful" : 2,
          "failed" : 0
        },
        "_seq_no" : 35091,
        "_primary_term" : 1,
        "status" : 200
      }
    }
  ]
}

Best practices:

Index Transform jobs are ideal for continuous aggregation of data and maintaining summarized data instead of performing complex aggregations at query time over and over. It’s designed to run on an index or indices, and not on changes between job runs.

Consider the following best practices when using Index Transforms:

  • Avoid running Index Transform jobs on rotating indexes with index patterns as the job scans all documents in those indices at each run. Use APIs to create a new Index Transform job for each rotating index.
  • Factor in additional compute capacity if your Index Transform job involves multiple aggregations because this process can be CPU intensive.  For example, If your job scans 5 indices with 3 shards each and takes 5 minutes to complete, then minimum of 17 (5*3=15 for reading source indices and 2 for writing to target index considering 1 replica) vCPUs are required for 5minutes to complete.
  • Try to schedule Index Transform jobs at non-peak times to minimize the impact on real-time search queries.
  • Make sure that there is sufficient storage for the target indexes. The size of the target index depends on the cardinality of the chosen group by term(s) and a number of attributes are computed as part of the transform. Make sure you have enough storage overhead discussed in our sizing guide.
  • Monitor and adjust the OpenSearch Service cluster configurations.

Conclusion

This post describes how you can use OpenSearch Index Transforms to aggregate specific fields from an existing index and store the summarized data into a new index using the OpenSearch Dashboards UI or Index Transform REST APIs. The Index Transform feature is powered by OpenSearch, an open-source search and analytics engine that makes it easy for you to perform interactive log analytics, real-time application monitoring, website search, and more. Index Transforms are available on all domains running Amazon OpenSearch Service 1.0 or greater, across 25 AWS Regions globally.


About the Authors

Viral Shah is a Principal Solutions Architect with the AWS Data Lab team based out of New York, NY. He has over 20 years of experience working with enterprise customers and startups, primarily in the data and database space. He loves to travel and spend quality time with his family.-

Arun Lakshmanan is a Search Specialist Solution Architect at AWS based out of Chicago, IL.