AWS Database Blog

Ingest CSV data to Amazon DynamoDB using AWS Lambda

In this post, we explore a streamlined solution that uses AWS Lambda and Python to read and ingest CSV data into an existing Amazon DynamoDB table. This approach adheres to organizational security restrictions, supports infrastructure as code (IaC) for table management, and provides an event-driven process for ingesting CSV datasets into DynamoDB.

For many organizations operating in highly regulated environments, maintaining comprehensive audit trails of data processing is not just beneficial—it’s often mandatory for compliance. This solution addresses that need by automatically documenting both successful transactions and failed records, providing the transparency required for regulatory validation and reconciliation activities. By creating distinct outputs for processed and unprocessed items, the system provides the evidence necessary to satisfy auditors across financial services, healthcare, and other highly regulated industries where data handling documentation may be required.

Key requirements this solution addresses include:

  • Programmatically ingest CSV data into DynamoDB using an extract, transform, and load (ETL) pipeline
  • Continuously append data to an existing DynamoDB table
  • Extend the solution to on-premises environments, not just AWS
  • Use an event-driven approach as new data is available for ingestion
  • Alleviate dependence on the AWS Management Console or manual processes
  • Use audit trails for point-in-time snapshots of data transformed

This solution is ideal for small to medium sized datasets (1k-1M+ rows per file). If your requirements include ability to resume interrupted import tasks, ability to ingest large datasets, and compute time per execution longer than 15 minutes, consider using an AWS Glue ETL job instead of AWS Lambda.

Solution Overview

The data ingestion workflow is as follows:

  1. Use the AWS Command Line Interface (AWS CLI) or schedule a file transfer to upload CSV data to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Amazon S3 Event Notifications triggers a Lambda function.
  3. The Lambda function reads the CSV file from the S3 bucket, appends the data to an existing DynamoDB table, and persists transformed data to a JSON object in the original bucket.

The following diagram shows the end-to-end data ingestion workflow.

To set up the solution, use the following high-level steps:

  1. Create an AWS Identity and Access Management (IAM) role.
  2. Create a Lambda function and upload the Python code.
  3. Create an S3 bucket and event notification to trigger the Lambda function.
  4. Create a DynamoDB table.
  5. Generate a sample CSV file.
  6. Upload to the S3 bucket to import the CSV file to the DynamoDB table.
  7. Explore the DynamoDB table items.

In this example, we are using small resource sizing. As you scale the dataset size, consider the following:

  • Increase Lambda memory up to 10GB (max). This will also boost CPU allocation and network bandwidth.
  • Set the Lambda timeout to a maximum of 15 minutes (900 seconds).
  • Increase Lambda ephemeral storage to 10GB if processing large CSV files temporarily.

*During testing this solution was able to ingest a CSV file containing 1 million rows in 9 minutes with 5120MB of memory configured.

If you require larger files to process in less than 15 minutes, have complex transformations to perform, or need to resume interrupted jobs, use an AWS Glue ETL job.

Prerequisites

To deploy this solution, you can use a development computer or AWS CloudShell. The following must be installed:

  • At a minimum, AWS CLI 2.23.12. For instructions, see Getting started with the AWS CLI.
  • At a minimum, Python 3.13.1.
  • This walkthrough uses Linux operating system syntax and commands. You will need to translate the commands to PowerShell/Microsoft Windows.
  • Set your default AWS Region in the AWS CLI:

You will need an IAM role with the appropriate permissions to configure Lambda, Amazon S3, IAM, and DynamoDB.

Create an IAM role

To create a project folder on your development computer, run the following code:

mkdir csv-to-ddb 
cd csv-to-ddb
Bash

To create an IAM role, follow these steps:

  1. Gather and set the environment variables. Be sure to replace your DynamoDB table name and desired Lambda function name:
    ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
    aws configure set region us-east-1
    export REGION=$(aws configure get region)
    export DDB_TABLE=<your-dynamodb-table-name>
    export LAMBDA_FUNCTION=<your-lambda-function>
    export S3_BUCKET=<your-s3-bucket>
    export IAM_ROLE=<iam-role-name>
    Bash
  2. Create a trust policy for the Lambda execution role:
    cat << 'EOF' > trust-policy.json
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Principal": {
    "Service": "lambda.amazonaws.com"
    },
    "Action": "sts:AssumeRole"
    }
    ]
    }
    EOF
    Bash
  3. Create the IAM role using the trust policy:
    aws iam create-role --role-name $IAM_ROLE --assume-role-policy-document file://trust-policy.json
    Bash
  4. Run the follow commands to create and attach a least privilege policy to the Lambda execution role. This policy is scoped down to only the permissions needed to read from Amazon S3 and write to DynamoDB.
    cat <<EOF > csv-to-ddb-policy.json
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Action": [
    "dynamodb:BatchWriteItem"
    ],
    "Resource": "arn:aws:dynamodb:$REGION:$ACCOUNT:table/$DDB_TABLE"
    },
    {
    "Effect": "Allow",
    "Action": [
    "s3:GetObject"
    ],
    "Resource": "arn:aws:s3:::$S3_BUCKET/*"
    },
    {
    "Effect": "Allow",
    "Action": [
    "s3:PutObject"
    ],
    "Resource": [
    "arn:aws:s3:::$S3_BUCKET/json-copy/*",
    "arn:aws:s3:::$S3_BUCKET/unprocessed/*"
    ]
    }
    ]
    }
    EOF
    
    Bash
    aws iam create-policy \
    --policy-name DynamoDBWriteS3ReadPolicy \
    --policy-document file://csv-to-ddb-policy.json
    Bash
    aws iam attach-role-policy \
    --role-name $IAM_ROLE \
    --policy-arn arn:aws:iam::$ACCOUNT:policy/DynamoDBWriteS3ReadPolicy
    Bash
    aws iam attach-role-policy \
    --role-name $IAM_ROLE \
    --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
    Bash

Create a Lambda function and upload the Python code

To create the Lambda function and upload the Python code, follow these steps:

  1. Define environment variables with the newly created IAM role’s Amazon Resource Name (ARN). This will be used when creating your Lambda function.
    export LAMBDA_EXECUTION_ROLE_ARN=$(aws iam get-role --role-name $IAM_ROLE --query 'Role.Arn' --output text)
    Bash
  2. Using your development computer, make a project folder and run the following code snippet to create your Python script. This Python code will be invoked by the Lambda function designed to process a CSV file uploaded to an S3 bucket, convert it into a list of dictionaries, write the data to a DynamoDB table, and then upload a JSON representation of the data back to the S3 bucket. The function is triggered by an S3 event and operates in an event-driven manner:
    cat << 'EOF' > import.py
    import logging
    import boto3
    import os
    import csv
    import json
    import time
    from io import StringIO
    
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    
    region = os.getenv('AWS_REGION')
    table_name = os.getenv('DYNAMO_DB_TABLE_NAME')
    dynamo_endpoint_url = f"https://dynamodb.{region}.amazonaws.com"
    
    ddbClient = boto3.resource('dynamodb', endpoint_url=dynamo_endpoint_url)
    s3client = boto3.client('s3')
    
    def lambda_handler(event, context):
        logger.info("Received Event: %s", event)
        ddbTable = ddbClient.Table(table_name)
    
        # Get the object from the event
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = event['Records'][0]['s3']['object']['key']
        csv_filename = os.path.basename(key)
    
        logger.info("Bucket Name extracted from the event: %s", bucket)
        logger.info("Object Key extracted from the event: %s", key)
    
        try:
            # Get the CSV object from S3
            csv_object = s3client.get_object(Bucket=bucket, Key=key)
    
            # Read and parse CSV data
            try:
                csv_data = csv_object['Body'].read().decode('utf-8-sig')
                csv_reader = csv.DictReader(StringIO(csv_data))
    
                # Convert CSV to list of dictionaries
                rows = []
                for row in csv_reader:
                    # Remove empty strings and clean up the data
                    cleaned_row = {k: v for k, v in row.items() if v is not None and v != ''}
                    if cleaned_row:  # Only append if the row has data
                        rows.append(cleaned_row)
    
                logger.info(f"Successfully parsed {len(rows)} rows from CSV")
    
            except Exception as e:
                error_msg = f"Error parsing CSV: {str(e)}"
                logger.error(error_msg)
                return {
                    "statusCode": 400,
                    "body": json.dumps({"error": error_msg})
                }
    
            # Write to DynamoDB with retry logic for unprocessed items
            batch_size = 25
            unprocessed_items = []
    
            for idx in range(0, len(rows), batch_size):
                batch = rows[idx:idx + batch_size]
                request_items = {table_name: [{'PutRequest': {'Item': item}} for item in batch]}
                retries = 0
    
                # Retry logic with exponential backoff
                while retries <= 3:
                    resp = ddbClient.meta.client.batch_write_item(RequestItems=request_items)
                    unp = resp.get('UnprocessedItems', {}).get(table_name, [])
                    if not unp:
                        break
                    request_items = {table_name: unp}
                    retries += 1
                    time.sleep(2 ** retries)
                    logger.warning(f"Retry {retries} for {len(unp)} unprocessed items")
    
                # Handle any remaining unprocessed items
                if unp:
                    items = [r['PutRequest']['Item'] for r in unp]
                    save_unprocessed(bucket, csv_filename, items)
                    unprocessed_items.extend(items)
    
            logger.info("Data written to DynamoDB table successfully.")
    
            # Create JSON object with array (excluding unprocessed items)
            processed_items = []
            unprocessed_item_set = {json.dumps(item, sort_keys=True) for item in unprocessed_items}
            for item in rows:
                if json.dumps(item, sort_keys=True) not in unprocessed_item_set:
                    processed_items.append(item)
    
            json_object = {"data": processed_items}
            json_data = json.dumps(json_object, indent=2)
    
            # Create the JSON key with just the filename
            json_key = f"json-copy/{csv_filename.replace('.csv', '.json')}"
            s3client.put_object(Body=json_data, Bucket=bucket, Key=json_key)
            logger.info(f"JSON data uploaded to {bucket}/{json_key}")
    
            return {
                "statusCode": 200,
                "body": json.dumps({
                    "processed_rows": len(processed_items),
                    "unprocessed_rows": len(unprocessed_items),
                    "unprocessed_file": f"unprocessed/{csv_filename}.unprocessed.json" if unprocessed_items else None,
                    "json_copy": json_key
                })
            }
    
        except Exception as e:
            error_msg = f"Error processing file: {str(e)}"
            logger.error(error_msg)
            return {
                "statusCode": 500,
                "body": json.dumps({"error": error_msg})
            }
    
    def save_unprocessed(bucket, fname, items):
        """Save items that couldn't be written to DynamoDB"""
        key = f"unprocessed/{fname}.unprocessed.json"
        try:
            existing = json.loads(s3client.get_object(Bucket=bucket, Key=key)['Body'].read())
        except s3client.exceptions.NoSuchKey:
            existing = []
        except Exception as e:
            logger.warning(f"Error reading existing unprocessed items: {str(e)}")
            existing = []
    
        existing.extend(items)
        s3client.put_object(Bucket=bucket, Key=key, Body=json.dumps(existing))
        logger.info(f"Saved {len(items)} unprocessed items to {key}")
    EOF
    Bash

    This Lambda function implements an ETL pipeline triggered by CSV file uploads to Amazon S3. The function extracts data from the triggering CSV file using the Amazon S3 client, and performs data transformation by parsing the CSV content with the DictReader class and sanitizing it through removal of empty values and null fields. It then executes a parallel load operation as it iterates through the rows of your CSV file and writes them in batches of 25 at a time to your table. If any of the writes fail they are retried up to 3 more times using an exponential backoff strategy before finally being written to S3 to log all unprocessed items. The implementation uses AWS SDK batch operations for DynamoDB writes to optimize throughput, incorporates error handling with logging, and maintains a fully serverless architecture pattern by using environment variables for configuration management. The code demonstrates serverless integration patterns by chaining AWS services (Amazon S3 to Lambda to DynamoDB to Amazon S3) and implementing a dual-destination data pipeline.

  3. Zip your Python code so you can create your Lambda function with it:
    zip import.zip import.py
    Bash
  4. Create your function using the zip file that contains your Python code:
    aws lambda create-function \
    --function-name $LAMBDA_FUNCTION \
    --runtime python3.13 \
    --zip-file fileb://import.zip \
    --handler import.lambda_handler \
    --role $LAMBDA_EXECUTION_ROLE_ARN \
    --timeout 180 \
    --memory-size 2048 \
    --tracing-config Mode=Active \
    --architectures arm64
    Bash
  5. Add environment variables to your Lambda function:
    aws lambda update-function-configuration \
    --function-name $LAMBDA_FUNCTION \
    --environment "Variables={DYNAMO_DB_TABLE_NAME=$DDB_TABLE}"
    Bash

Create an S3 bucket and event notification to trigger the Lambda function

To create an S3 bucket and event notification to trigger the Lambda function, follow these steps:

  1. Create the S3 bucket for importing your CSV data:
    aws s3 mb s3://$S3_BUCKET --region $REGION
    Bash
  2. Add permissions to the Lambda function to be invoked by the S3 bucket ARN:
    aws lambda add-permission \
    --function-name $LAMBDA_FUNCTION \
    --statement-id s3-lambda-permission \
    --action "lambda:InvokeFunction" \
    --principal s3.amazonaws.com \
    --source-arn "arn:aws:s3:::$S3_BUCKET" \
    --source-account $ACCOUNT
    Bash
  3. Create the S3 bucket notification configuration:
    echo '{
    "LambdaFunctionConfigurations": [
    {
    "LambdaFunctionArn": "arn:aws:lambda:'$REGION':'$ACCOUNT':function:'$LAMBDA_FUNCTION'",
    "Events": ["s3:ObjectCreated:*"]
    }
    ]
    }' > notify.json
    aws s3api put-bucket-notification-configuration \
    --bucket $S3_BUCKET \
    --notification-configuration file://notify.json
    Bash

Create a DynamoDB table

To create a DynamoDB table, use your development computer with the necessary prerequisites installed to run the following command:

aws dynamodb create-table \
--table-name $DDB_TABLE \
--attribute-definitions \
AttributeName=account,AttributeType=S \
AttributeName=offer_id,AttributeType=S \
--key-schema \
AttributeName=account,KeyType=HASH \
AttributeName=offer_id,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
Bash

This command will work on Linux operating systems. For Windows, replace the backslash with `.
The command creates an on-demand table with a partition key named account and a sort key named offer_id. Primary keys in DynamoDB can be either simple (partition key only) or composite (partition key and sort key). It is important that the primary key is both unique for each record, as well as provides uniform activity across all partition keys in a table. For our use case, we will be using account number and offer ID, as they can be combined to give each item a unique identifier that can be queried.

Since this is a batch use-case, an on-demand table will allow you to only pay for the requests that you make during the processing. On-demand tables scale automatically to accommodate the levels of traffic that you need, so you don’t have to worry about managing scaling policies or planning capacity. By default, on-demand tables are provisioned to support 4,000 Write Capacity Units (WCU) and 12,000 Read Capacity Units (RCU). If you plan on exceeding these values from the start, you can explore using warm throughput as a solution.

Generate a sample CSV file

To generate a sample data.csv file, follow these steps:

  1. Using your development computer, create the following Python script:
    cat << 'EOF' > make-csv.py
    import csv
    import random
    import string
    import datetime
    
    account_type_codes = [''.join(random.choices(string.ascii_uppercase, k=2)) for _ in range(26)]
    offer_type_ids = [''.join(random.choices(string.digits, k=8)) for _ in range(100)]
    risk_levels = ['high', 'medium', 'low']
    
    data_rows = []
    for _ in range(1500):
        account = ''.join(random.choices(string.digits, k=8))
        offer_id = ''.join(random.choices(string.digits, k=12))
        catalog_id = ''.join(random.choices(string.digits, k=18))
        account_type_code = random.choice(account_type_codes)
        offer_type_id = random.choice(offer_type_ids)
        created = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=random.randint(0, 365))
        expire = created + datetime.timedelta(days=random.randint(7, 365))
        risk = random.choice(risk_levels)
        data_row = [
            account,
            offer_id,
            catalog_id,
            account_type_code,
            offer_type_id,
            created.isoformat(),
            expire.isoformat(),
            risk
        ]
        data_rows.append(data_row)
    
    with open('data.csv', mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([
            'account',
            'offer_id',
            'catalog_id',
            'account_type_code',
            'offer_type_id',
            'created',
            'expire',
            'risk'
        ])
        writer.writerows(data_rows)
    
    print("CSV file generated successfully.")
    EOF
    
    Bash
  2. Run the following Python script:
    python3 make-csv.py
    Bash

    A file called data.csv with 1,500 rows will be created in your project folder. The following code shows an example:

    account,offer_id,catalog_id,account_type_code,offer_type_id,created,expire,risk
    43469653,626503640435,649413151048141733,GE,07052721,2024-07-22T16:23:06.771968+00:00,2025-04-15T16:23:06.771968+00:00,high

Upload to the S3 bucket to import the CSV file to the DynamoDB table

To upload to the S3 bucket to import the CSV file to the DynamoDB table, run the following command:

aws s3 cp data.csv s3://$S3_BUCKET/
Bash

The file is uploaded to your bucket, generating an S3 event notification that triggers the Lambda function. The Python Lambda function processes CSV files uploaded to Amazon S3, imports the data into DynamoDB for operational access, and simultaneously stores a JSON version in Amazon S3 for analytical purposes and auditing.Explore table items

To explore the table items, use these two CLI commands. These will return a count of all of the items inserted as well as the first 10 items from your table:

aws dynamodb scan \
--table-name $DDB_TABLE \
--select COUNT
Bash
aws dynamodb scan \
--table-name $DDB_TABLE \
--limit 10
Bash

The following screenshots show an example of the table queries.

You have just built a serverless ETL pipeline that automatically imports CSV data into DynamoDB. The solution uses Lambda and Amazon S3 event triggers to create a zero-maintenance data ingestion workflow.

Monitoring and Optimization

Once you have provisioned all of the resources, it is important to monitor both your DynamoDB table’s performance as well as your ETL Lambda. Use CloudWatch Metrics to monitor table-level metrics such as RCUs and WCUs consumed, Latency, and Throttled requests, while using CloudWatch Logs for debugging of your function through the error logging that is stored. This will provide insight into the performance of your DynamoDB table as well as Lambda function as the ETL process is running.

As the size of your input file grows, it is important to also scale the memory and ephemeral storage allotted to your Lambda function accordingly. This ensures that the Lambda function will run consistently and efficiently. When sizing your Lambda function, you can use the AWS Lambda Power Tuning tool to test different memory configurations to optimize for both performance and cost.

Clean up

When you’ve finished, clean up the resources associated with the example deployment to avoid incurring unwanted charges:

aws lambda delete-function --function-name $LAMBDA_FUNCTION
Bash
aws s3 rm "s3://$S3_BUCKET" --recursive
aws s3api delete-bucket --bucket "$S3_BUCKET"
Bash
aws iam detach-role-policy \
--role-name $IAM_ROLE \
--policy-arn arn:aws:iam::$ACCOUNT:policy/DynamoDBWriteS3ReadPolicy
Bash
aws iam detach-role-policy \
--role-name $IAM_ROLE \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Bash
aws iam delete-role --role-name $IAM_ROLE
aws iam delete-policy --policy-arn arn:aws:iam::$ACCOUNT:policy/DynamoDBWriteS3ReadPolicy
aws dynamodb delete-table --table-name $DDB_TABLE
Bash
cd ..
rm -r csv-to-ddb 
Bash

Conclusion

In this post, we presented a solution combining the power of Python for data manipulation and Lambda for interacting with DynamoDB that enables data ingestion from CSV sources into a DynamoDB table. We also demonstrated how to programmatically ingest and store structured data in DynamoDB.

You can use this solution to integrate data pipelines from on premises into an AWS environment for either migration or continuous delivery purposes.


About the Author

Bill Pfeiffer is a Sr. Solutions Architect at AWS, focused on helping customers design, implement, and evolve secure and cost optimized cloud infrastructure. He is passionate about solving business challenges with technical solutions. Outside of work, Bill enjoys traveling the US with his family in their RV and exploring the outdoors.

Mike Wells is a Solutions Architect at AWS who works with financial services customers. He helps organizations design resilient, secure cloud solutions that meet strict regulatory requirements while maximizing AWS benefits. When not architecting cloud solutions, Mike is an avid runner and enjoys spending time with family and friends.