亚马逊AWS官方博客

从 AWS DynamoDB 迁移数据到 AWS Aurora MySQL

一、概述

虽然 AWS DynamoDB 提供了高性能、可扩展的 NoSQL 解决方案,但在某些业务场景下存在局限性:

复杂查询需求:

  • AWS DynamoDB 不支持复杂的 JOIN 操作
  • 缺乏灵活的多条件查询能力
  • 不支持完整的 SQL 语法

业务适配性:

  • 现有系统可能深度依赖 SQL 特性
  • 团队更熟悉关系型数据库操作
  • 需要与其他 MySQL 系统集成

成本考虑:

  • AWS DynamoDB 按读写容量计费
  • 大量复杂查询可能导致成本升高
  • 数据规模增长时预算难以控制

基于以上因素,客户要求将数据迁移至 MySQL来满足以下要求:

  • 提供更灵活的查询能力
  • 降低开发维护成本
  • 更好地满足业务需求
  • 实现与现有系统的无缝集成

二、方案概述

以下是基于 AWS Glue 从 AWS DynamoDB 迁移数据到 AWS Aurora MySQL 的方案概述(使用 Crawler 爬取 AWS Aurora MySQL 表结构)。

项目概述:

使用 AWS Glue 实现从 AWS DynamoDB 到 AWS Aurora MySQL 的数据自动化迁移方案。

技术架构:

  • 数据源:AWS DynamoDB
  • 目标库:AWS Aurora MySQL 数据库
  • ETL 工具:AWS Glue
  • 开发语言:Python/PySpark

实现步骤:

目标数据库结构探索

  • 使用 AWS Glue Crawler 爬取 AWS Aurora MySQL 表结构
  • 将 AWS Aurora MySQL 表结构信息保存到 AWS Glue Data Catalog
  • 用于后续 ETL 作业的 schema 映射

ETL 作业开发

  • 创建 Glue ETL Job
  • 从 AWS DynamoDB 读取源数据
  • 根据爬取的 AWS Aurora MySQL 表结构进行数据转换
  • 写入 AWS Aurora MySQL 目标表

任务调度

  • 配置 Job 触发器
  • 设置运行参数和资源配置

三、方案部署

如下是基于亚马逊云科技已有产品,确保数据能够快速且平滑地过渡到 AWS Aurora MySQL 环境的具体操作步骤,方便客户快速搭建环境以及配置数据过渡。

为 AWS Glue Crawler 创建一个安全组,方便抓取 AWS Aurora MySQL 的数据库表结构

aws ec2 create-security-group --group-name glue-crawler-sg --description "Security group for Glue Crawler" --vpc-id vpc-0a12ed3c51c66f3c8

通过 describe 命令获取已经创建的安全组

SG_ID=$(aws ec2 describe-security-groups --filters "Name=group-name,Values=glue-crawler-sg" --query 'SecurityGroups[0].GroupId' --output text)

增加自身关联,增加出入规则

aws ec2 authorize-security-group-ingress --group-id $SG_ID --protocol all --source-group $SG_ID --port -1

创建 AWS Glue 的数据库连接(注意此处需要根据已有的数据库连接构件如下命令)

JDBC_CONNECTION_URL: jdbc:mysql://xxx-aurora-serverless-v2.cluster-cadhydlq2qny.ap-southeast-1.rds.amazonaws.com:3306/xxx_db

SECRET_ID: rds!cluster-e71e8aba-5c72-47e3-bc08-a38cf735c6e9

本文使用 Secret Manager 的方式保存数据库密码,所以要找到对应的 ID。

SubnetId: 所在子网 subnet-0937f925b8018d822

SecurityGroupIdList: 安全组列表 sg-0021c104d18dc4222

AvailabilityZone: 可用区 ap-southeast-1a

aws glue create-connection --connection-input '{ "Name": "mysql-connection", "ConnectionType": "JDBC", "ConnectionProperties": { "JDBC_CONNECTION_URL": "jdbc:mysql://xxx-aurora-serverless-v2.cluster-cadhydlq2qny.ap-southeast-1.rds.amazonaws.com:3306/xxx_db", "JDBC_ENFORCE_SSL": "false", "SECRET_ID": "rds!cluster-e71e8aba-5c72-47e3-bc08-a38cf735c6e9" }, "PhysicalConnectionRequirements": { "SubnetId": "subnet-0937f925b8018d822", "SecurityGroupIdList": ["sg-0021c104d18dc4222"], "AvailabilityZone": "ap-southeast-1a" } }' 

接下来我们根据导出 AWS DynamoDB 数据,再导入到 AWS Aurora MySQL 数据库,所需要的权限进行最小力度的角色的创建。

创建一个 create_role.sh 脚本文件

#!/bin/bash
# 设置变量
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
ROLE_NAME="XXX-GlueJobRole"
# 创建信任策略
cat << EOF > trust-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "glue.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
EOF
# 创建权限策略
cat << EOF > permission-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:*",
                "s3:*",
                "cloudwatch:*",
                "logs:*",
                "ec2:*",
                "secretsmanager:GetSecretValue",
                "rds-db:connect"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": "*"
        }
    ]
}
EOF
# 创建角色
aws iam create-role \
    --role-name $ROLE_NAME \
    --assume-role-policy-document file://trust-policy.json
# 创建策略
aws iam create-policy \
    --policy-name XXX-GlueJobPolicy \
    --policy-document file://permission-policy.json
# 附加策略
aws iam attach-role-policy \
    --role-name $ROLE_NAME \
    --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/XXX-GlueJobPolicy
aws iam attach-role-policy \
    --role-name $ROLE_NAME \
    --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
# 清理临时文件
rm trust-policy.json permission-policy.json

ROLE_ARN=$(aws iam get-role --role-name $ROLE_NAME --query 'Role.Arn' --output text) 

echo "Role ARN: $ROLE_ARN"

echo "Role creation completed!"

创建 Crawler 获取数据库结构并创建 table

aws glue create-crawler \
--name "mysql-crawler" \
--role "XXX-GlueJobRole" \
--targets '{
"JdbcTargets": [
{
"ConnectionName": "mysql-connection",
"Path": "xxx_db/xxx_files",
"Exclusions": [],
"EnableAdditionalMetadata": []
}
]
}' \
--database-name "xxx-mysql" \
--recrawl-policy '{
"RecrawlBehavior": "CRAWL_EVERYTHING"
}' \
--schema-change-policy '{
"UpdateBehavior": "UPDATE_IN_DATABASE",
"DeleteBehavior": "DEPRECATE_IN_DATABASE"
}' \
--lineage-configuration '{
"CrawlerLineageSettings": "DISABLE"
}' 

启动 Crawler 来抓取表结构

aws glue start-crawler --name "mysql-crawler"

将会在 Table 中获取到 Table 的结构

创建 AWS S3 存储代码和 AWS Glue 的一些临时文件信息,创建 create_job.sh 脚本, 只需要修改您的 region 即可,当然请根据您的表结构进行一些代码的修改。

#!/bin/bash

# 设置变量
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
REGION="ap-southeast-1"
BUCKET_NAME="aws-glue-assets-${ACCOUNT_ID}-${REGION}"

# 创建 AWS S3 存储桶
aws s3api create-bucket \
    --bucket ${BUCKET_NAME} \
    --region ${REGION} \
    --create-bucket-configuration LocationConstraint=${REGION}
    
# 创建必要的文件夹结构
aws s3api put-object --bucket ${BUCKET_NAME} --key scripts/
aws s3api put-object --bucket ${BUCKET_NAME} --key temporary/
aws s3api put-object --bucket ${BUCKET_NAME} --key sparkHistoryLogs/

cat << 'EOF' > etl_script.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, udf, lit, when
from pyspark.sql.types import *
import uuid

# 获取作业参数
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'glue_database',       # AWS Glue 数据目录数据库名
    'glue_table',          # AWS Glue 数据目录表名
    'dynamodb_table_arn',  # AWS DynamoDB 表 ARN
    's3_bucket',           # AWS S3 存储桶名称
    's3_prefix'            # AWS S3 前缀路径
])

# 创建 UUID 生成器 UDF
def generate_uuid():
    return str(uuid.uuid4())

generate_uuid_udf = udf(generate_uuid, StringType())

# 初始化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# 从 AWS DynamoDB 读取数据
dyf_dynamodb = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.s3.bucket": args['s3_bucket'],
        "dynamodb.s3.prefix": args['s3_prefix'],
        "dynamodb.tableArn": args['dynamodb_table_arn'],
        "dynamodb.unnestDDBJson": True
    },
    transformation_ctx="AmazonDynamoDB_node1729663842557"
)

# 转换为 DataFrame
df = dyf_dynamodb.toDF()

# 打印现有的列名
print("现有列名:", df.columns)

# 添加 UUID 作为 id 列
df_with_id = df.withColumn("id", generate_uuid_udf())

# 为所有可能缺失的列添加默认值
columns_with_defaults = {
    "fileId": ("string", ""),
    "recipientEmail": ("string", ""),
    "dateAdded": ("timestamp", None),
    "expiryDate": ("timestamp", None),
    "dateShared": ("timestamp", None),
    "downloadCount": ("int", 0),
    "downloadLimit": ("int", 0),
    "downloads": ("string", ""),
    "fileName": ("string", ""),
    "folderId": ("string", ""),
    "notify": ("boolean", False),
    "ownerEmail": ("string", ""),
    "ownerId": ("string", ""),
    "ownerName": ("string", ""),
    "classification": ("string", ""),
    "ipLimit": ("string", ""),
    "documentId": ("string", ""),
    "stampId": ("string", ""),
    "stampDate": ("timestamp", None),
    "pwKey": ("string", ""),
    "pwErrorDetail": ("string", ""),
    "pwStatus": ("string", ""),
    "size": ("long", 0),
    "type": ("string", ""),
    "userType": ("string", ""),
    "downloadType": ("string", "")
}

# 添加缺失的列并设置默认值
df_with_defaults = df_with_id
for col_name, (col_type, default_value) in columns_with_defaults.items():
    if col_name not in df_with_id.columns:
        if col_type == "timestamp" and default_value is None:
            df_with_defaults = df_with_defaults.withColumn(col_name, lit(None).cast("timestamp"))
        else:
            df_with_defaults = df_with_defaults.withColumn(col_name, lit(default_value))

# 数据类型转换
select_expressions = []
for col_name, (col_type, _) in columns_with_defaults.items():
    select_expressions.append(col(col_name).cast(col_type))

# 添加 id 列到选择表达式
select_expressions.insert(0, col("id").cast("string"))

# 执行选择和类型转换
df_processed = df_with_defaults.select(select_expressions)

# 数据验证
print("记录总数:", df_processed.count())
print("Schema 信息:")
df_processed.printSchema()

# 显示样本数据
print("样本数据:")
df_processed.show(5)

# 转回 DynamicFrame
dyf_processed = DynamicFrame.fromDF(df_processed, glueContext, "processed_data")

# 写入 AWS Aurora MySQL
glueContext.write_dynamic_frame.from_catalog(
    frame=dyf_processed,
    database=args['glue_database'],
    table_name=args['glue_table'],
    transformation_ctx="MySQL_node1729671565717",
    additional_options={
        "writeMode": "append"
    }
)

job.commit()

EOF

# 上传脚本到 AWS S3
aws s3 cp etl_script.py s3://${BUCKET_NAME}/scripts/xxx_etl.py

# 验证上传
aws s3 ls s3://${BUCKET_NAME}/scripts/

# 设置存储桶策略
cat << EOF > bucket-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GlueAccess",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::${ACCOUNT_ID}:role/XXX-GlueJobRole"
            },
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::${BUCKET_NAME}",
                "arn:aws:s3:::${BUCKET_NAME}/*"
            ]
        }
    ]
}
EOF

aws s3api put-bucket-policy \
    --bucket ${BUCKET_NAME} \
    --policy file://bucket-policy.json
    
# 启用版本控制
aws s3api put-bucket-versioning \
    --bucket ${BUCKET_NAME} \
    --versioning-configuration Status=Enabled
    
# 清理临时文件
rm etl_script.py bucket-policy.json

# 打印重要信息
echo "Bucket created: ${BUCKET_NAME}"
echo "Script location: s3://${BUCKET_NAME}/scripts/xxx_etl.py"

此刻准备资源和文件都已经好了,我们根据 AWS S3 的位置和名称创建 AWS Glue Job

aws glue create-job \
    --name "dynamodb-to-mysql" \
    --role "XXX-GlueJobRole" \
    --command '{
        "Name": "glueetl",
        "ScriptLocation": "s3://aws-glue-assets-xxx-ap-southeast-1/scripts/xxx_etl.py",
        "PythonVersion": "3"
    }' \
    --default-arguments '{
        "--enable-metrics": "true",
        "--enable-spark-ui": "true",
        "--spark-event-logs-path": "s3://aws-glue-assets-xxx-ap-southeast-1/sparkHistoryLogs/",
        "--enable-job-insights": "true",
        "--enable-observability-metrics": "true",
        "--enable-glue-datacatalog": "true",
        "--enable-continuous-cloudwatch-log": "true",
        "--job-bookmark-option": "job-bookmark-disable",
        "--job-language": "python",
        "--TempDir": "s3://aws-glue-assets-xxx-ap-southeast-1/temporary/",
        "--glue_database": "xxx-mysql",
        "--glue_table": "xxx_db_xxx_files",
        "--dynamodb_table_arn": "arn:aws:dynamodb:ap-southeast-1:xxx:table/xxx-files",
        "--s3_bucket": "aws-glue-assets-xxx-ap-southeast-1",
        "--s3_prefix": "temporary/ddbexport/"
    }' \
    --connections '{
        "Connections": ["mysql-connection"]
    }' \
    --max-retries 0 \
    --timeout 2880 \
    --worker-type "G.1X" \
    --number-of-workers 10 \
    --glue-version "4.0" \
    --execution-property '{
        "MaxConcurrentRuns": 1
    }' \
    --execution-class "STANDARD"

您可以直接在 AWS Web Console 启动 Job,也可以使用如下命令启动 Job

aws glue start-job-run \
    --job-name "dynamodb-to-mysql" \
    --arguments '{
        "--glue_database": "xxx-mysql",
        "--glue_table": "xxx_db_xxx_files",
        "--dynamodb_table_arn": "arn:aws:dynamodb:ap-southeast-1:xxx:table/xxx-files",
        "--s3_bucket": "aws-glue-assets-xxx-ap-southeast-1",
        "--s3_prefix": "temporary/ddbexport/"
    }'

四、总结

该方案通过 AWS Glue 实现了从 AWS DynamoDB 到 AWS Aurora MySQL 的数据迁移,利用 Crawler 自动探索 AWS Aurora MySQL 表结构并存储到 AWS Glue Data Catalog,再通过 ETL Job 完成数据的抽取、转换和加载过程,具有全托管、自动化程度高、可扩展性强等优势。

本篇作者

孙超群

亚马逊云科技解决方案架构师,负责基于 AWS 云计算方案的架构咨询,设计实现并以技术赋能助力企业成长,同时热衷于为客户设计和构建端到端的区块链解决方案。

杨迪

亚马逊云科技高级解决方案架构师,企业架构和云采用方面拥有超过 15 年的经验。