#!/bin/bash
# 设置变量
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
REGION="ap-southeast-1"
BUCKET_NAME="aws-glue-assets-${ACCOUNT_ID}-${REGION}"
# 创建 AWS S3 存储桶
aws s3api create-bucket \
--bucket ${BUCKET_NAME} \
--region ${REGION} \
--create-bucket-configuration LocationConstraint=${REGION}
# 创建必要的文件夹结构
aws s3api put-object --bucket ${BUCKET_NAME} --key scripts/
aws s3api put-object --bucket ${BUCKET_NAME} --key temporary/
aws s3api put-object --bucket ${BUCKET_NAME} --key sparkHistoryLogs/
cat << 'EOF' > etl_script.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, udf, lit, when
from pyspark.sql.types import *
import uuid
# 获取作业参数
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'glue_database', # AWS Glue 数据目录数据库名
'glue_table', # AWS Glue 数据目录表名
'dynamodb_table_arn', # AWS DynamoDB 表 ARN
's3_bucket', # AWS S3 存储桶名称
's3_prefix' # AWS S3 前缀路径
])
# 创建 UUID 生成器 UDF
def generate_uuid():
return str(uuid.uuid4())
generate_uuid_udf = udf(generate_uuid, StringType())
# 初始化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 从 AWS DynamoDB 读取数据
dyf_dynamodb = glueContext.create_dynamic_frame.from_options(
connection_type="dynamodb",
connection_options={
"dynamodb.export": "ddb",
"dynamodb.s3.bucket": args['s3_bucket'],
"dynamodb.s3.prefix": args['s3_prefix'],
"dynamodb.tableArn": args['dynamodb_table_arn'],
"dynamodb.unnestDDBJson": True
},
transformation_ctx="AmazonDynamoDB_node1729663842557"
)
# 转换为 DataFrame
df = dyf_dynamodb.toDF()
# 打印现有的列名
print("现有列名:", df.columns)
# 添加 UUID 作为 id 列
df_with_id = df.withColumn("id", generate_uuid_udf())
# 为所有可能缺失的列添加默认值
columns_with_defaults = {
"fileId": ("string", ""),
"recipientEmail": ("string", ""),
"dateAdded": ("timestamp", None),
"expiryDate": ("timestamp", None),
"dateShared": ("timestamp", None),
"downloadCount": ("int", 0),
"downloadLimit": ("int", 0),
"downloads": ("string", ""),
"fileName": ("string", ""),
"folderId": ("string", ""),
"notify": ("boolean", False),
"ownerEmail": ("string", ""),
"ownerId": ("string", ""),
"ownerName": ("string", ""),
"classification": ("string", ""),
"ipLimit": ("string", ""),
"documentId": ("string", ""),
"stampId": ("string", ""),
"stampDate": ("timestamp", None),
"pwKey": ("string", ""),
"pwErrorDetail": ("string", ""),
"pwStatus": ("string", ""),
"size": ("long", 0),
"type": ("string", ""),
"userType": ("string", ""),
"downloadType": ("string", "")
}
# 添加缺失的列并设置默认值
df_with_defaults = df_with_id
for col_name, (col_type, default_value) in columns_with_defaults.items():
if col_name not in df_with_id.columns:
if col_type == "timestamp" and default_value is None:
df_with_defaults = df_with_defaults.withColumn(col_name, lit(None).cast("timestamp"))
else:
df_with_defaults = df_with_defaults.withColumn(col_name, lit(default_value))
# 数据类型转换
select_expressions = []
for col_name, (col_type, _) in columns_with_defaults.items():
select_expressions.append(col(col_name).cast(col_type))
# 添加 id 列到选择表达式
select_expressions.insert(0, col("id").cast("string"))
# 执行选择和类型转换
df_processed = df_with_defaults.select(select_expressions)
# 数据验证
print("记录总数:", df_processed.count())
print("Schema 信息:")
df_processed.printSchema()
# 显示样本数据
print("样本数据:")
df_processed.show(5)
# 转回 DynamicFrame
dyf_processed = DynamicFrame.fromDF(df_processed, glueContext, "processed_data")
# 写入 AWS Aurora MySQL
glueContext.write_dynamic_frame.from_catalog(
frame=dyf_processed,
database=args['glue_database'],
table_name=args['glue_table'],
transformation_ctx="MySQL_node1729671565717",
additional_options={
"writeMode": "append"
}
)
job.commit()
EOF
# 上传脚本到 AWS S3
aws s3 cp etl_script.py s3://${BUCKET_NAME}/scripts/xxx_etl.py
# 验证上传
aws s3 ls s3://${BUCKET_NAME}/scripts/
# 设置存储桶策略
cat << EOF > bucket-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "GlueAccess",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::${ACCOUNT_ID}:role/XXX-GlueJobRole"
},
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::${BUCKET_NAME}",
"arn:aws:s3:::${BUCKET_NAME}/*"
]
}
]
}
EOF
aws s3api put-bucket-policy \
--bucket ${BUCKET_NAME} \
--policy file://bucket-policy.json
# 启用版本控制
aws s3api put-bucket-versioning \
--bucket ${BUCKET_NAME} \
--versioning-configuration Status=Enabled
# 清理临时文件
rm etl_script.py bucket-policy.json
# 打印重要信息
echo "Bucket created: ${BUCKET_NAME}"
echo "Script location: s3://${BUCKET_NAME}/scripts/xxx_etl.py"