AWS Database Blog

Use the AWS InfluxDB migration script to migrate your InfluxDB OSS 2.x data to Amazon Timestream for InfluxDB

This post is a joint collaboration between Improving and AWS and is being cross-published on both the Improving blog and the AWS Database Blog.

AWS has partnered with InfluxData to launch Amazon Timestream for InfluxDB, a managed version of the popular InfluxDB 2.x open source time series database engine. Many InfluxDB customers have asked for a fast and simple way to migrate their database to the managed service. We built the AWS InfluxDB migration script to help you with the migration. You can use the AWS InfluxDB migration script to migrate InfluxDB data, including buckets, dashboards, tasks, and other key-value data.

The script, along with more detailed documentation, can be found in the Amazon Timestream tools repository.

In this post, we demonstrate how to use the AWS InfluxDB migration script to migrate your data from your existing InfluxDB OSS 2.x instances to Timestream for InfluxDB. At the end of this post, we show one way to perform a live migration, with additional AWS resources.

Solution overview

The following architecture diagram provides a high-level view of the solution.

The architecture design of the AWS InfluxDB migration script.

The AWS InfluxDB migration script can migrate specific buckets and their metadata, migrate multiple buckets from multiple organizations, or perform a full migration.

The script works by creating a local backup of the source data. You can also choose to mount an Amazon Simple Storage Service (Amazon S3) bucket as well as other options. Unless otherwise specified, the data is kept in influxdb-backup-<timestamp> directories, one for each migration. You are responsible for setting up and managing the system running the migration script.

The script provides a number of options and configurations, including mounting S3 buckets to limit local storage usage during migration and choosing which organizations to use during migration. Resources used in examples in this post incur costs. Refer to AWS pricing for more information.

Prerequisites

The InfluxDB migration script uses InfluxDB CLI features and the InfluxDB v2 API to complete a migration. To run the migration script, you need an environment with the following characteristics:

  • A machine running either Windows, Linux, or macOS.
  • The operator token of your source InfluxDB OSS 2.x instance stored in an environment variable named INFLUX_SRC_TOKEN.
  • The operator token of your destination Timestream for InfluxDB instance stored in an environment variable named INFLUX_DEST_TOKEN. Refer to Connecting to an Amazon Timestream for InfluxDB instance for connection information, including tokens.
  • Python 3.7 or up.
  • The AWS SDK for Python (Boto3) and influxdb_client Python libraries.
  • The Influx CLI installed and added to PATH.
  • Enough disk space to store exported data locally, if not using an S3 bucket.
  • A network connection to both source and destination instances.
  • Optionally, Mountpoint for Amazon S3 on Linux or rclone on Windows and macOS for the use of local S3 bucket mounting during migration, to save local storage use.

When your environment meets the prerequisites, you can migrate a single bucket by running the following code:

python3 influx_migration.py \
    --src-host http(s)://<source address>:8086 \
    --src-bucket <source bucket> \
    --dest-host http(s)://<destination address>:8086

You can use the help option to view the available configuration options for the script by running the following code:

python3 influx_migration.py -h

The following is a high-level overview of possible configuration options for the script:

usage: influx_migration.py [-h] [--src-bucket SRC_BUCKET] [--dest-bucket DEST_BUCKET] [--src-host SRC_HOST] --dest-host DEST_HOST [--full] [--confirm-full] [--src-org SRC_ORG] [--dest-org DEST_ORG] [--csv] [--retry-restore-dir RETRY_RESTORE_DIR] [--dir-name DIR_NAME] [--log-level LOG_LEVEL] [--skip-verify] [--s3-bucket S3_BUCKET] [--allow-unowned-s3-bucket]

You can find a detailed description of every available option in the AWS InfluxDB migration script README.

Run the AWS InfluxDB migration script

After all the prerequisites have been met, follow these steps to run the migration script:

  1. Use the terminal app of your choice and run the Python script to transfer data from the source InfluxDB instance to the destination InfluxDB instance. Provide host addresses and ports as CLI options and make sure the INFLUX_SRC_TOKEN and INFLUX_DEST_TOKEN environment variables have been set, as mentioned in the prerequisites. The default port InfluxDB uses is 8086. For example:
    python3 influx_migration.py \
        --src-host http(s)://<source address>:8086 \
        --src-bucket <source bucket> \
        --dest-host http(s)://<destination address>:8086

    To make sure your data was successfully migrated from your existing instance to the new one, perform the following validation steps:

  2. Access the InfluxDB UI of your Timestream for InfluxDB instance and inspect the buckets.
  3. List buckets by running the following Influx CLI command:
    influx bucket list \
        -t <destination token> \
        --host http(s)://<destination address>:8086 \
        --org <organization name>
  4. Use the Influx CLI and run the following two InfluxQL queries to view the contents of a bucket and verify the correct number of records have been migrated.
    influx v1 shell \
        -t <destination token> \
        --host http(s)://<destination address>:8086 \
        --org <organization name>
    SELECT * FROM <migrated bucket>.<retention period>.<measurement name> LIMIT 100
    SELECT COUNT(*) FROM <migrated bucket>.<retention period>.<measurement name>
  5. Run a flux query using the following code:
    influx query \
        -t <destination token> \
        --host http(s)://<destination address>:8086 \
        --org <organization name> \
        'from(bucket: "<migrated bucket>")
            |> range(start: <desired start>, stop: <desired stop>)'

    Adding |> count() to the query is also a way to verify the correct number of records have been migrated.

Example run

The following steps show how you can set up the InfluxDB migration script and use it to migrate a single bucket:

  1. Open the terminal app of your choice and make sure the required prerequisites are properly installed.
    AWS InfluxDB migration script pre requisites
  2. Navigate to where the migration script is located.
    Amazon Timestream Tools repository navigation example
  3. Prepare the following information:
    1. Name of the source bucket (to be migrated).
    2. Optionally, a new bucket name for the migrated bucket in the destination server.
    3. Root token for the source and destination influx instances.
    4. Host address of the source and destination influx instances.
    5. Optionally, an S3 bucket name and credentials. AWS Command Line Interface (AWS CLI) credentials should be set in the OS environment variables.
      # AWS credentials
      export AWS_ACCESS_KEY_ID="xxx"
      export AWS_SECRET_ACCESS_KEY="xxx"
  4. Construct the command as follows:
    python3 influx_migration.py \
        --src-bucket <existing source bucket name> \
        --dest-bucket <new destination bucket name> \
        --src-host http(s)://<source address>:8086 \
        --dest-host http(s)://<destination address>:8086 \
        --s3-bucket <S3 bucket name>(optional) \
        --log-level debug
  5. Run the script:
    AWS InfluxDB migration script execution example

When the script has finished running, you can check the newly migrated bucket for data integrity. The file performance.txt, in the same directory where the script was run from, contains some basic information on how long each step took.

Clean up

Backup data will remain in your selected folder after the migration process is complete. You may want to remove these backup directories; by default, they are named influxdb-backup-<timestamp> and are stored locally in the same directory that the script is run in.

If you used the --s3-bucket option, these directories will be stored in the S3 bucket you indicated. To delete this S3 bucket, complete the following steps:

  1. On the Amazon S3 console, in the navigation pane, choose Buckets.
  2. Select the bucket you used for your migration from the list of buckets.
  3. Choose Delete.
  4. If the bucket isn’t empty, choose Empty bucket.
    1. Enter permanently delete in the text input field.
    2. Choose Empty.
    3. Upon successful deletion, choose delete bucket configuration in the banner at the top of the screen or restart the process at step 1.
  5. Enter the name of the bucket in the text input field and choose Delete bucket.

Live migrations

A migration from InfluxDB OSS 2.x to Timestream for InfluxDB using the AWS InfluxDB migration script incurs downtime while data is being migrated, and clients are required to be reconfigured to use Timestream for InfluxDB. If your current production workload requires a near-zero downtime migration of data, you have the option to perform near-zero downtime live migrations of InfluxDB workloads to Timestream for InfluxDB. The following diagram shows the steps of the live migration. All resources should be configured before starting the process.

The process to perform a live migration works as follows:

  1. An AWS Signature Version 4 proxy signs the client requests made to Amazon API Gateway. API Gateway supports alternative authentication methods besides AWS Signature Version 4, but AWS Signature Version 4 is recommended.
  2. API Gateway proxies the requests to the source and target databases. The clients’ access to the gateway is controlled using AWS Identity and Access Management (IAM) authorization; clients must use AWS Signature Version 4 to sign their requests.

    API Gateway forwards the write requests and read requests to different AWS resources. Write requests are forwarded to the Kinesis writer AWS Lambda function, which writes to an Amazon Kinesis data stream (using PutRecord or PutRecords to transform the request into a stream record), and read requests are forwarded to the query Lambda function.

    The inclusion of the query Lambda function is necessary in order to minimize stale reads. For each read request, the query Lambda function uses a secret to determine which database to read from. After the migration is complete and the Timestream for InfluxDB instance is up to date with the latest ingested records, the secret value the query Lambda function uses to make requests should be updated with the destination host address, causing the read requests to be sent to the destination Timestream for InfluxDB instance.

    Authorization header values containing InfluxDB tokens in client requests are left as is; this means that the destination Timestream for InfluxDB instance will need to have the same tokens and organizations as the source instance. This is possible using the --full option for the InfluxDB migration script. At the time of writing, this live migration architecture doesn’t support partial migrations.

    If you are not already using an API gateway in front of your source database, you will need to switch your data producers and consumers to use this solution’s API Gateway endpoint, after setting up and testing the live migration infrastructure. Configuring your application to using an API Gateway endpoint will incur a short downtime. At the end of the migration process, the API gateway that proxies read and write traffic to your target Timestream for InfluxDB instance will remain in place. If you want to replace or remove this piece of additional infrastructure, the change will incur some downtime, approximately equal to the time it takes to reconfigure your application to connect directly to the target database.

  3. Requests, intact and containing header values such as InfluxDB tokens, are stored in a Kinesis data stream using the Kinesis writer Lambda function.

    The following code snippet shows one possible implementation for the Kinesis writer Lambda function:

    import boto3
    import json
    import os
    import secrets
    import string
    
    PARTITION_KEY_LENGTH = 30
    
    def lambda_handler(event, context):
        kinesis = boto3.client('kinesis')
        # Randomize partition key to write to shards evenly
        random_partition_key = ''.join(secrets.choice(string.ascii_letters +
            string.digits)
            for i in range(PARTITION_KEY_LENGTH) )
        try:
            # put_records does not guarantee record ordering, put_record does
            response = kinesis.put_record(
                StreamName=os.environ["STREAM_NAME"],
                Data=json.dumps(event).encode('utf-8'), # Request from Amazon API Gateway
                PartitionKey=random_partition_key
            )
            return {
                'statusCode': 200,
                'body': json.dumps('Record successfully inserted')
            }
        except Exception:
            return {
                'statusCode': 500,
                'body': json.dumps('Error calling PutRecord')
            }

    By default, the stream stores these records for 24 hours. Data streams can be sharded for increased throughput and parallel data processing. The Kinesis writer Lambda function should use a random partition key to distribute incoming requests across Kinesis data stream shards evenly and not overload a single shard. Writing across shards means that when these shards are read from, a small amount of eventual consistency is introduced, because records may be written to instances that are out of order. However, due to being time series databases, InfluxDB and Timestream for InfluxDB will still order records according to timestamp. Records are processed in the exact order in which they are stored on a per-shard basis if PutRecord is used; PutRecords, however, doesn’t guarantee record order.

    Two Lambda functions, each with a function instance per shard, process the stream records. The source writer Lambda function writes data to the source InfluxDB instance; the destination writer Lambda function writes data to the target Timestream for InfluxDB instance. At the start of the migration process, the destination writer Lambda function should be deployed in a throttled state, which helps prevent it from processing stream records. It will be enabled after the target database has been seeded with data from the source database. After it’s enabled, the destination writer Lambda function will begin applying records from the stream to the target database, starting with the earliest records in the stream.

    The following code snippet shows one possible implementation for the source writer Lambda function:

    import base64
    import json
    import requests
    import os
    
    print('Loading InfluxDB-Source-Write-Function')
    
    def lambda_handler(event, context):
        # Get database url
        headers = {"X-Aws-Parameters-Secrets-Token": os.environ.get('AWS_SESSION_TOKEN')}
        secrets_extension_endpoint = "http://localhost:2773" + \
            "/secretsmanager/get?secretId=<secret name>"
        r = requests.get(secrets_extension_endpoint, headers=headers)
    
        print("Extracting secrets")
        secrets = json.loads(json.loads(r.text)["SecretString"])
        url = secrets["SOURCE_ENDPOINT"]
        
        response = None
        for record in event['Records']:
            # Kinesis data is base64 encoded so decode here
            payload =  json.loads(base64.b64decode(record['kinesis']['data']).decode('utf-8'))
            bucket = payload["queryStringParameters"]["bucket"]
            org = payload["queryStringParameters"]["org"]
            headers = {
                'Accept': 'application/json',
                'Authorization': payload['headers']['x-original-authorization'],
                'Content-Type': payload['headers']['content-type']
            }
            response = requests.post(url + f'/api/v2/write?bucket={bucket}&org={org}',
                data=payload['body'], headers=headers)
        if response is None:
            return response
        else:
            return {
                "isBase64Encoded": False,
                "statusCode": response.status_code,
                "headers": dict(response.headers),
                "body": f"{response.text}"
            }

    The implementation for the destination writer Lambda function would be the same, except instead of reading the SOURCE_ENDPOINT secret, it would read the DESTINATION_ENDPOINT secret.

    These two Lambda functions are independent of one another, and their instances process records at their own rate. Although records in a shard are processed in order, there is no total ordering over the entire stream. As a result, the order in which records are written to the source or target database may differ from the order in which the records were received into the stream. If each record includes the time it was emitted by a data producer, however, the total ordering of the time series in the source and target databases will not be affected.

  4. The query Lambda function receives read requests and forwards them to either the source or target database, replacing Authorization header values that contain InfluxDB tokens as necessary. The endpoint is configured with an endpoint value retrieved from AWS Secrets Manager.

    The following code snippet is one way to implement the query Lambda function:

    import json
    import os
    import requests
    
    print('Loading Read-Function')
    
    def lambda_handler(event, context):
        # Get database url
        headers = {"X-Aws-Parameters-Secrets-Token": os.environ.get('AWS_SESSION_TOKEN')}
        secrets_extension_endpoint = "http://localhost:2773" + \
            "/secretsmanager/get?secretId=<secret name>"
        r = requests.get(secrets_extension_endpoint, headers=headers)
    
        print("Extracting secrets")
        secrets = json.loads(json.loads(r.text)["SecretString"])
        url = secrets["SOURCE_ENDPOINT"]
    
        print("Sending query")
        headers = event["headers"]
        headers["Authorization"] = headers["x-original-authorization"]
        org = event["queryStringParameters"]["org"]
        response = requests.post(url + f'/api/v2/query?org={org}',
            json=json.loads(event["body"]), headers=headers)
        return {
            "statusCode": response.status_code,
            "headers": dict(response.headers),
            "body": response.text
        }
  5. Secrets Manager is used to hold source and target database endpoint addresses. Secrets Manager provides a single configuration point for the entire migration process. You can redirect requests to either the source or target database without incurring downtime by changing the endpoints in Secrets Manager.
  6. The migration can begin after the infrastructure has been installed, read requests are flowing to the source database through the query Lambda function, and writes are flowing to the source database through the Kinesis data stream and the source writer Lambda function.

    At this point, you use the AWS InfluxDB migration script to perform a full migration, migrating data, dashboards, tokens, tasks, users, and other key-value data from the source instance to the Timestream for InfluxDB target by using the --full option. Clients will continue to experience uninterrupted reads and writes while this migration is taking place.

    The following is an example of how to use the InfluxDB migration script to perform a full migration:

    python3 influx_migration.py \
        --src-host http(s)://<source address>:8086 \
        --dest-host http(s)://<destination address>:8086 \
        --full
  7. After you have verified on the environment running the AWS InfluxDB migration script that the full migration is complete, you can remove the throttle from the destination writer Lambda function. This function will begin ingesting records stored in the data stream, starting with the earliest record stored in the stream, to the Timestream for InfluxDB instance. Some of the early writes in the stream will already be present in the data that has been migrated to the target instance using the migration script. This is not a problem because InfluxDB, and therefore Timestream for InfluxDB, automatically handles duplicate data points
  8. You can verify that the target Timestream for InfluxDB instance has caught up with writes in the stream by inspecting the destination writer Lambda function’s Amazon CloudWatch IteratorAge metric. Values close to zero for this metric indicate that the function is processing records almost as soon as they arrive in the stream.

    Another way to verify whether your Timestream for InfluxDB instance has caught up is to use a flux query to show the timestamp of the latest record, assuming writes are using current timestamps:

    from(bucket: “<bucket name>”)
        |> range(start: 1680-01-01T00:00:00:00Z, stop: now())
        |> filter(fn: (r) => r[“_measurement”] == “<measurement>”)
        |> keep(columns: [“_time”])
        |> sort(columns: [“_time”], desc: false)
        |> last(column: “_time”)

    When you’re satisfied that the target database has caught up, you can redirect read requests to the target by changing the destination endpoint for requests in Secrets Manager. At this point, you can disable or remove the source writer Lambda function. The live migration is now complete.

The resources that you need to add in order to perform a live migration are as follows:

  • An API gateway, if not already used, to make performing a live migration and switching to Timestream for InfluxDB simple. For more details, refer to Security best practices in Amazon API Gateway.
  • The AWS SigV4 proxy to authorize requests to the API gateway.
  • A Kinesis data stream to allow dual ingestion for new data.
  • Three Lambda functions to read and write data.
  • A secret containing the source and destination host addresses, to be used by the Lambda functions.

For all resources, refer to Security best practices in IAM.

Considerations

Consider the following before implementing a live migration:

  • Make sure your self-managed InfluxDB and Timestream for InfluxDB instances are properly configured.
  • Your data, including tokens, buckets, organizations, users, and other key-value data, will be overwritten in your Timestream for InfluxDB instance when performing a full migration with the InfluxDB migration script.
  • Make sure your infrastructure is flexible, for example, a new set of services can be deployed between the data sources and readers and your InfluxDB instances.
  • During the migration, your application will be able to continue writing and querying time series data. However, the migration process requires that your application doesn’t perform other actions, such as creating or deleting users, organizations, buckets, and dashboards, against the source system while the migration is running.
  • This procedure assumes that you are familiar with the AWS InfluxDB migration script, and that your source and target environments satisfy the requirements for using the script.
  • The procedure described here assumes that your Timestream for InfluxDB instance has a public endpoint. If your Timestream for InfluxDB instance is private, you will need to install additional resources, such as VPC endpoints and an Amazon Elastic Compute Cloud (Amazon EC2) instance, in your Timestream for InfluxDB VPC.

Clean up

Make sure to remove or throttle your source writer Lambda function, because it’s no longer useful after the live migration is complete.

Refer to the previous Cleanup section for instructions on how to clean up stored migration data created in step six of the live migration process.

Conclusion

The AWS InfluxDB migration script helps you migrate your InfluxDB OSS 2.x data to Timestream for InfluxDB. The script offers a number of options to help you customize your migrations, including defining source and destination organizations, mounting an S3 bucket locally to limit local storage use, and the option to use CSV files for migration. If a live migration is necessary, you can use the method described in this post with additional AWS components.

Download the AWS InfluxDB migration script from the Amazon Timestream tools repository to start migrating your data from InfluxDB to Timestream for InfluxDB.


About the Author

Trevor Bonas author photoTrevor Bonas is a Software Developer at Improving. Trevor has experience with time series databases and ODBC driver development. In his free time, Trevor writes fiction and dabbles in game development.