混合事务分析处理(Hybrid Transactional and Analytical Processing, HTAP)是一种在没有复杂 ETL 解决方案的情况下,进行近实时数据分析的技术,它提供了一种将 OLTP 和 OLAP 相结合的混合方法,用于实时数据处理和分析。
越来越多的客户有 HTAP 相关的需求,例如电商网站中常用的实时的大屏展示,上游的交易数据采用了传统数据库来保存,大量的统计和分析工作,需要在数仓中完成,同时有多个客户端的对统计结果进行查询,传统的数仓 ETL 链路长,延迟大,很难满足客户端对统计结果的高并发、低延时的需求。
在这篇博客中,我们将使用纽约出租车的测试数据,用来模拟一个实时交易数据,将实时数据写入 Amazon Aurora,并通过 Zero-ETL 技术将数据同步到 Amazon Redshift,在 Redshift 中对数据进行聚合,然后将聚合后的数据,再回写到 Aurora,使用 Aurora 来进行实时的汇总和查询。
1. 概述
本文在测试过程中,使用纽约出租车测试数据集,它是由纽约市出租车和豪华轿车委员会(Taxi & Limousine Commission, TLC)公布的黄色奖章出租车和上亿次出行记录。通过开放平台公开行程记录数据,可以实时和批量的访问的记录。TLC 不收集任何乘客信息。我们这里只采用了黄色出租车的出行记录。
来看一下本文用到的两个主要的产品。
Amazon Aurora 是一个与 MySQL 和 PostgreSQL 兼容的完全托管的关系数据库引擎。不仅具有高端商用数据库的速度和可靠性,同时还具有开源数据库的简单性和成本效益。您目前用于现有 MySQL 和 PostgreSQL 数据库的代码、工具和应用程序都可用于 Aurora。
Amazon Redshift 是一种快速、完全托管式 PB 级数据仓库服务,开发人员、数据科学家和数据分析师可以跨数据仓库和数据湖进行操作,从而构建报告和数据展示面板的应用程序、执行实时分析、协作处理数据以及构建和训练机器学习(ML)模型。我们在本文的实验中将使用 Redshift Serverless。
在整个实验的过程中,我们会使用 Cloud9 来执行脚本,可以参考文档部署 Cloud9。
我们会使用如下的架构,来实现实现 HTAP 解决方案。
解释一下上面的架构图,实时的交易数据会写入 Aurora,配置 Aurora 到 Redshift 的 Zero-ETL,将实时交易数据同步到 Redshift,在 Redshift 里实现对实时数据的聚合,例如我们以小时为间隔,由 Amazon Managed Workflows for Apache Airflow (MWAA) 调度和触发,对数据进行聚合,并将聚合后的数据,写入聚合数据表。
对 Redshift 中聚合好的数据,再由 MWAA 来调度和触发,回写到 Aurora 中,当客户需要读取实时的汇总数据时,可以结合实时交易数据,和聚合数据,来汇总结果。例如:当要查询 2023 年 1 月 1 日从 0 点到 6 点 30 分的出租车费用,可以从聚合表中查询 0 点到 6 点前的聚合数据,再从明细表中汇总 6 点 0 分到 6 点 30 分的明细数据,将二者相加,即得到 6 点 30 分之前的实时汇总数据。如下图所示:
这里只列出来交易的时间戳,具体的表字段请参考后面章节中的建表语句。
2. 实时交易数据写入 Aurora
我们会将实时交易的数据,写入 Aurora 的表中,先来准备 Aurora 的参数组和集群。
2.1 配置 Aurora 参数
为了实现 Zero-ETL,需要使用特定的参数来创建 Aurora 集群,我们来创建一个 Aurora 集群使用的参数组,具体步骤请参考 Zero-ETL 的配置文档的 “Configure the Aurora MySQL source with a customized DB cluster parameter group” 章节。
需要修改的参数包括:
- aurora_enhanced_binlog=1
- binlog_backup=0
- binlog_format=ROW
- binlog_replication_globaldb=0
- binlog_row_image=full
- binlog_row_metadata=full
另外,确认参数 binlog_transaction_compression parameter 没有设置为 ON,参数 binlog_row_value_options 没有设置为 PARTIAL_JSON。
2.2 创建 Aurora 集群
使用上一步创建的参数组,创建名称为 htap-demo 的 Aurora 集群,具体步骤请参考 Zero-ETL 的配置文档 的 “Configure the Aurora MySQL source with a customized DB cluster parameter group” 章节,这里不再重复赘述。
2.3 在 Aurora 中创建表
可以从 Cloud9 或者其他的数据库客户端,登录上一步创建的 Aurora 集群。
在 Aurora 中创建实时交易明细表,该表用来保存实时产生的出租车的出行记录。使用如下脚本创建数据库 taxi_trips 和表 nyc_source。
CREATE DATABASE taxi_trips;
CREATE TABLE `taxi_trips`.`nyc_source` (
`VendorID` bigint DEFAULT NULL,
`tpep_pickup_datetime` timestamp NULL DEFAULT NULL,
`tpep_dropoff_datetime` timestamp NULL DEFAULT NULL,
`passenger_count` double DEFAULT NULL,
`trip_distance` double DEFAULT NULL,
`RatecodeID` double DEFAULT NULL,
`store_and_fwd_flag` text,
`PULocationID` bigint DEFAULT NULL,
`DOLocationID` bigint DEFAULT NULL,
`payment_type` bigint DEFAULT NULL,
`fare_amount` double DEFAULT NULL,
`extra` double DEFAULT NULL,
`mta_tax` double DEFAULT NULL,
`tip_amount` double DEFAULT NULL,
`tolls_amount` double DEFAULT NULL,
`improvement_surcharge` double DEFAULT NULL,
`total_amount` double DEFAULT NULL,
`congestion_surcharge` double DEFAULT NULL,
`airport_fee` double DEFAULT NULL,
`pk_id` bigint NOT NULL AUTO_INCREMENT,
PRIMARY KEY (pk_id),
INDEX `index_1` (`VendorID`, `tpep_pickup_datetime`, `PULocationID`,
`DOLocationID`, `payment_type`)
);
创建表的时候,必须要包含主键,否则 Zero-ETL 会失败,另外要根据将来使用的查询创建索引。
2.4 Glue connection
导入数据时,需要使用连接到 Aurora 的 Glue connection, 我们可以在 Cloud9 上执行如下命令,创建名称为 htap-demo 的 Glue connection:
aws glue create-connection \
--connection-input '{"Name": "htap-demo", "ConnectionType": "JDBC",
"ConnectionProperties":{"JDBC_CONNECTION_URL":"jdbc:mysql://<Aurora-Endpoint>:3306/taxi_trips","USERNAME":"<Aurora-User-Name>","PASSWORD":"<Aurora-Password>", "JDBC_ENFORCE_SSL":"false"},
"PhysicalConnectionRequirements":{"SubnetId":"<Your-subnet>","SecurityGroupIdList":["<Your-Security-Group>"],"AvailabilityZone":"us-east-1a"}}' \
--region us-east-1
替换上面命令中的参数,我们实验用到的服务都位于美东 1。
2.5 准备写入测试数据
我们用一段 Python 代码来导入 NYC 的数据,可以在 Cloud9 或者 EC2 上执行这段代码,先来安装依赖包:
sudo yum install python3-pip
python3 -m pip install awswrangler pymysql pandas boto3 fsspec s3fs
Python 代码如下:
import awswrangler as wr
import pymysql
import pandas as pd
import boto3
bucket_name = 'nyc-tlc'
file_prefix = 'trip data'
schema_name = 'taxi_trips'
table_name = 'nyc_source'
rds_connection_name = 'htap-demo'
s3 = boto3.resource('s3', region_name='us-east-1')
bucket = s3.Bucket(bucket_name)
con_mysql = wr.mysql.connect(rds_connection_name)
for obj in bucket.objects.filter(Prefix=file_prefix):
if obj.key.endswith('.parquet') and (obj.key).rsplit('/')[-1] >= 'yellow_tripdata_2023-01.parquet' :
df = pd.read_parquet(f's3://{bucket_name}/{obj.key}')
wr.mysql.to_sql(df, con_mysql, schema=schema_name, table=table_name, mode="append", use_column_names=True)
con_mysql.commit()
con_mysql.close()
执行代码,导入 2023 年前 6 个月的 NYC 数据。
请注意:我们这里只导入了 2023 年前 6 个月的数据,总共 6 个数据文件,导入的数据总量是大约 2 千万条,由于导入的数据文件较大,创建 Cloud9 或者 EC2 的时候,尽量选用配置高一些的实例,例如 8xlarge。另外需要为 Cloud9 或 EC2 添加访问 S3 和 Glue 的权限。
3. Redshift Serverless 和 Zero-ETL
我们可以参考 Zero-ETL 的配置文档,配置 Redshift Serverless,Workgroup 的名称为 htap-demo-wg,并部署从 Aurora 到 Redshift Serverless 的 Zero-ETL。部署完成后,Aurora 中的数据库 taxi_trips 会同步到 Redshift 中。
我们可以在 Redshift 的查询页面中,看到同步过来的数据库 taxi_trips,以及里面包含的同名的 Schema,和 Schema 下的 nyc_source 表。如下图:
可以查询数据表,数据同步的延迟大概在 1 分钟左右。
4. Redshift 数据聚合
在 Redshift 中对 Zero-ETL 同步过来的数据,进行聚合,这步操作是由 MWAA 来调度和触发的。
4.1 在 Redshift 中创建聚合表
先在 Redshift 中默认的 dev 数据库和 public Schema 下,创建聚合表,它用来保存每个小时的聚合数据。
CREATE TABLE "nyc_hour_aggregation" (
"VendorID" bigint,
"agg_date" date,
"agg_hour" int,
"passenger_count" float8,
"trip_distance" float8,
"PULocationID" bigint,
"DOLocationID" bigint,
"payment_type" bigint,
"fare_amount" float8,
"tip_amount" float8,
"tolls_amount" float8,
"improvement_surcharge" float8,
"total_amount" float8,
"congestion_surcharge" float8,
"airport_fee" float8,
"pk_id" bigint IDENTITY NOT NULL
) diststyle auto sortkey("agg_date");
4.2 聚合
创建 Redshift 存储过程,它以每小时为间隔执行 SQL,对上一个小时内发生的实时交易数据进行聚合,并保存结果到聚合表:
CREATE OR REPLACE PROCEDURE nyc_hour_aggregation()
LANGUAGE plpgsql
AS $$
BEGIN
insert into dev.public.nyc_hour_aggregation
select
"VendorID",
to_date(tpep_pickup_datetime, 'YYYY-MM-DD') as "agg_date",
extract(hour from tpep_pickup_datetime) as "agg_hour",
sum("passenger_count") as "passenger_count" ,
sum("trip_distance") as "trip_distance",
"PULocationID" ,
"DOLocationID" ,
"payment_type" ,
sum("fare_amount") as "fare_amount" ,
sum("tip_amount") as "tip_amount",
sum("tolls_amount") as "tolls_amount" ,
sum("improvement_surcharge") as "improvement_surcharge",
sum("total_amount") as "total_amount" ,
sum("congestion_surcharge") as "congestion_surcharge" ,
sum("airport_fee") as "airport_fee"
from taxi_trips.taxi_trips.nyc_source
where to_date(tpep_pickup_datetime, 'YYYY-MM-DD') = to_date(dateadd(hour, -1, getdate()), 'YYYY-MM-DD')
and extract(hour from tpep_pickup_datetime) = extract(hour from dateadd(hour, -1, getdate()))
group by "VendorID", "agg_date", "agg_hour", "PULocationID", "DOLocationID", "payment_type";
END;
$$;
我们将使用 MWAA 来触发和调用这个 Redshift 存储过程,具体的 DAG 的代码请参考后面的章节。
5. 聚合数据回写 Aurora
我们通过 Glue ETL 将 Redshift 中的聚合数据,回写到 Aurora,并在 MWAA 中调度这一步操作。
5.1 在 Aurora 中创建聚合表
在 Aurora 中创建数据库 taxi_trips_agg 和 聚合表 nyc_hour_aggregation,它用来保存 Redshift 回写的聚合数据。
CREATE DATABASE taxi_trips_agg;
CREATE TABLE `taxi_trips_agg`.`nyc_hour_aggregation`(
`VendorID` bigint DEFAULT NULL,
`agg_date` date,
`agg_hour` int,
`passenger_count` double DEFAULT NULL,
`trip_distance` double DEFAULT NULL,
`PULocationID` bigint DEFAULT NULL,
`DOLocationID` bigint DEFAULT NULL,
`payment_type` bigint DEFAULT NULL,
`fare_amount` double DEFAULT NULL,
`tip_amount` double DEFAULT NULL,
`tolls_amount` double DEFAULT NULL,
`improvement_surcharge` double DEFAULT NULL,
`total_amount` double DEFAULT NULL,
`congestion_surcharge` double DEFAULT NULL,
`airport_fee` double DEFAULT NULL,
`pk_id` bigint NOT NULL,
PRIMARY KEY (pk_id),
INDEX `index_1` (`VendorID`, `agg_date`, `PULocationID`,
`DOLocationID`, `payment_type`, `agg_hour`)
);
我们也根据将来使用的查询创建索引。
5.2 使用 Glue ETL Job 回写聚合数据
在 Cloud9 使用 AWS CLI 命令创建 Glue ETL Job,实现聚合数据从 Redshift 到 Aurora 的回写。先创建一个 Python 脚本 redshift-2-aurora.py,内容如下:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import *
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
print(args)
table_name = 'table_hour_aggregation'
rds_connection_url = 'jdbc:mysql://<Aurora-Endpoint>:3306/taxi_trips_agg'
db_user = '<Aurora-User-Name>'
db_password = '<Aurora-Password>'
redshift_connection_url = "jdbc:redshift://<Redshift-Endpoint>?user=<Redshift-User-Name>&password=<Redshift-Password>"
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sql_context = SQLContext(sc)
df = sql_context.read \
.format("io.github.spark_redshift_community.spark.redshift") \
.option("aws_iam_role", "<Redshift-IAM-Role-ARN>") \
.option("url", redshift_connection_url) \
.option("query", "select * from public.nyc_hour_aggregation where agg_date=to_date(dateadd(hour, -1, getdate()), 'YYYY-MM-DD') and agg_hour=extract(hour from dateadd(hour, -1, getdate()))") \
.option("tempdir", "<S3-folder>") \
.load()
df.write \
.mode("append") \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", rds_connection_url) \
.option("dbtable", table_name) \
.option("user", db_user) \
.option("password", db_password) \
.save()
job.commit()
替换上面 Python 脚本中的参数,将 Python 脚本上传到 S3:
aws s3 cp redshift-2-aurora.py s3://<S3-Bucket>/scripts
准备 Glue ETL job 的配置文件 job_configuration.json,内容如下:
{
"Name": "Redshift-2-Aurora",
"Role": "<Glue-IAM-Role-ARN>",
"ExecutionProperty": {
"MaxConcurrentRuns": 1
},
"Command": {
"Name": "glueetl",
"ScriptLocation": "s3://<S3-Bucket>/scripts/redshift-2-aurora.py",
"PythonVersion": "3",
},
"DefaultArguments": {
"--TempDir": "s3://<S3-Bucket>/temp-dir/",
"--job-bookmark-option": "job-bookmark-disable",
"--enable-continuous-cloudwatch-log": "true",
"--connections": ["htap-demo-network"]
},
"MaxRetries": 0,
"Timeout": 2880,
"GlueVersion": "4.0",
"WorkerType": "",
"NumberOfWorkers": 2
}
因为对 Aurora 和 Redshift 都是用的内网访问,需要使用 AWS CLI 创建一个名称为 htap-demo-network 的 Network 类型的 Glue connection:
aws glue create-connection \
--connection-input '{"Name":"htap-demo-network", "ConnectionType":"NETWORK", "ConnectionProperties": {},
"PhysicalConnectionRequirements":{"SubnetId":"<Your-Subnet>", "SecurityGroupIdList":["<Your-Security-Group-ID>"],"AvailabilityZone":"us-east-1a"}}' \
--region us-east-1
使用 AWS CLI 创建 Glue ETL job:
aws glue create-job --cli-input-json file://job_configuration.json --connections '{"Connections": ["htap-demo-network"]}'
我们将使用 MWAA 来触发和调用这个 Glue ETL job,具体的 DAG 的代码请参考后面的章节。
6. 使用 MWAA 来调度流程
我们使用 MWAA 来调度前面章节中的数据聚合和回写操作。
6.1 部署 MWAA
MWAA(Amazon Managed Workflows for Apache Airflow)是 AWS 提供的托管的 Airflow。使用 MWAA,我们可以用 Airflow 和 Python 创建工作流程,而无需管理底层基础设施以实现可扩展性、可用性和安全性。
我们可以参考 MWAA 的部署文档,来部署 MWAA 的环境。部署完成后,将 S3,CloudWatch,Redshift 和 Glue 的访问权限赋予 MWAA 的执行角色。
6.2 部署 DAG
因为 DAG 需要使用 AWS Secret Manager 提供访问 Redshift 的认证信息,我们用 AWS CLI 来创建一条名称为 htap-demo 的 Secret:
aws secretsmanager create-secret --name htap-demo --secret-string '{"username":"<Redshift-User-Name>","password":"<Redshift-Password>","dbname":"dev","host":"<Redshift-Endpoint>","port":5439,"namespaceName":"htap-demo","engine":"redshift"}'
准备要执行的 DAG 代码,先创建一个 Python 脚本 nyc_hour_aggregation.py,内容如下:
from airflow import DAG
from datetime import datetime, timedelta
import redshift_connector
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
# Define DAG
with DAG(
dag_id = 'nyc_hour_aggregation',
schedule_interval = '05 * * * *',
start_date = datetime(2024, 5, 1),
max_active_runs = 1,
catchup = True
) as dag:
begin = EmptyOperator(task_id="begin")
end = EmptyOperator(task_id="end")
aggregation_in_redshift = RedshiftDataOperator(
task_id="aggregation_in_redshift",
database="dev",
workgroup_name="htap-demo-wg",
secret_arn="<Redshift-Secret-ARN>",
sql="call nyc_hour_aggregation();",
poll_interval=10,
wait_for_completion=True
)
glue_etl_job = GlueJobOperator(
task_id="glue_etl_job",
job_name="Redshift-2-Aurora",
region_name="us-east-1",
verbose=True
)
begin >> aggregation_in_redshift >> glue_etl_job >> end
将上面的 Python 脚本上传到 S3 上 DAG 指定的目录:
aws s3 cp nyc_hour_aggregation.py s3://<S3-Bucket>/dags/
DAG 会在每小时后的第 5 分钟自动触发执行,这时 Zero-ETL 已经完成了上一个小时的数据同步,这样我们就把 Redshift 中聚合好的数据,回写到了 Aurora 里。
7. 性能测试
我们将对 NYC 的数据进行一个实时汇总查询的测试,测试范围是 2023 年 1 月 1 日,从 0 点到 6 点 30 分左右的实时汇总。我们在测试中对比 Redshift 和 Aurora 的并发读操作的性能。
先将测试代码 redshift-read.py 保存到本地,然后使用 GNU Parallel 工具来实现压测,我们可以参考 GNU Parallel 部署文档来部署 Parallel 工具。执行 Redshift 的测试脚本 redshift-read.py 如下:
CONF_FILE=parallel.red
rm -rf $CONF_FILE
for i in $(seq 1 <并发线程数>)
do
echo "python redshift-read.py 1 <单个线程查询的次数> > result.log" >> $CONF_FILE
done
wc -l $CONF_FILE
start_time=`date +'%Y-%m-%d %H:%M:%S'`
start_seconds=$(date --date="$start_time" +%s)
parallel -j <并发线程数> < $CONF_FILE
end_time=`date +'%Y-%m-%d %H:%M:%S'`
end_seconds=$(date --date="$end_time" +%s)
echo "本次运行时间: "$((end_seconds-start_seconds))"s"
替换上面脚本中的 <并发线程数> 和 <单个线程查询的次数>,然后在 Shell 中执行脚本。
再来测试 Aurora 并发读的性能,将 测试代码 aurora-read.py 保存到本地,执行 Aurora 的测试脚本 aurora-read.py 如下:
CONF_FILE=parallel.aurora
rm -rf $CONF_FILE
for i in $(seq 1 <并发线程数>)
do
echo "python aurora-read.py 1 <单个线程查询的次数> > result.log" >> $CONF_FILE
done
wc -l $CONF_FILE
start_time=`date +'%Y-%m-%d %H:%M:%S'`
start_seconds=$(date --date="$start_time" +%s)
parallel -j <并发线程数> < $CONF_FILE
end_time=`date +'%Y-%m-%d %H:%M:%S'`
end_seconds=$(date --date="$end_time" +%s)
echo "本次运行时间: "$((end_seconds-start_seconds))"s"
替换上面脚本中的 <并发线程数> 和 <单个线程查询的次数>,然后在 Shell 中执行脚本。
Redshift 我们使用了 128 RPU,Aurora我们使用 2 台 r6g.2xlarge 读实例,分别采用 50 个,100 个,150 个,200 个并发,每个并发查询 100 次,测试结果如下图:
可以看出,在 Aurora 中读取 Redshift 聚合后的结果,并结合 Aurora 中的实时数据,来实现 HTAP 的实时聚合的功能,查询性能是可以满足客户需求的。
8. 资源释放
测试结束后,需要释放的资源包括:
- Redshift Serverless 的 Namespace 和 Workgroup
- RDS
- Cloud9
- MWAA
本篇作者