亚马逊AWS官方博客

使用 Amazon Aurora + Redshift 实现 HTAP 解决方案

混合事务分析处理(Hybrid Transactional and Analytical Processing, HTAP)是一种在没有复杂 ETL 解决方案的情况下,进行近实时数据分析的技术,它提供了一种将 OLTP 和 OLAP 相结合的混合方法,用于实时数据处理和分析。

越来越多的客户有 HTAP 相关的需求,例如电商网站中常用的实时的大屏展示,上游的交易数据采用了传统数据库来保存,大量的统计和分析工作,需要在数仓中完成,同时有多个客户端的对统计结果进行查询,传统的数仓 ETL 链路长,延迟大,很难满足客户端对统计结果的高并发、低延时的需求。

在这篇博客中,我们将使用纽约出租车的测试数据,用来模拟一个实时交易数据,将实时数据写入 Amazon Aurora,并通过 Zero-ETL 技术将数据同步到 Amazon Redshift,在 Redshift 中对数据进行聚合,然后将聚合后的数据,再回写到 Aurora,使用 Aurora 来进行实时的汇总和查询。

1. 概述

本文在测试过程中,使用纽约出租车测试数据集,它是由纽约市出租车和豪华轿车委员会(Taxi & Limousine Commission, TLC)公布的黄色奖章出租车和上亿次出行记录。通过开放平台公开行程记录数据,可以实时和批量的访问的记录。TLC 不收集任何乘客信息。我们这里只采用了黄色出租车的出行记录。

来看一下本文用到的两个主要的产品。

  • Amazon Aurora

Amazon Aurora 是一个与 MySQL 和 PostgreSQL 兼容的完全托管的关系数据库引擎。不仅具有高端商用数据库的速度和可靠性,同时还具有开源数据库的简单性和成本效益。您目前用于现有 MySQL 和 PostgreSQL 数据库的代码、工具和应用程序都可用于 Aurora。

  • Amazon Redshift

Amazon Redshift 是一种快速、完全托管式 PB 级数据仓库服务,开发人员、数据科学家和数据分析师可以跨数据仓库和数据湖进行操作,从而构建报告和数据展示面板的应用程序、执行实时分析、协作处理数据以及构建和训练机器学习(ML)模型。我们在本文的实验中将使用 Redshift Serverless

在整个实验的过程中,我们会使用 Cloud9 来执行脚本,可以参考文档部署 Cloud9。

我们会使用如下的架构,来实现实现 HTAP 解决方案。

解释一下上面的架构图,实时的交易数据会写入 Aurora,配置 Aurora 到 Redshift 的 Zero-ETL,将实时交易数据同步到 Redshift,在 Redshift 里实现对实时数据的聚合,例如我们以小时为间隔,由 Amazon Managed Workflows for Apache Airflow (MWAA) 调度和触发,对数据进行聚合,并将聚合后的数据,写入聚合数据表。

对 Redshift 中聚合好的数据,再由 MWAA 来调度和触发,回写到 Aurora 中,当客户需要读取实时的汇总数据时,可以结合实时交易数据,和聚合数据,来汇总结果。例如:当要查询 2023 年 1 月 1 日从 0 点到 6 点 30 分的出租车费用,可以从聚合表中查询 0 点到 6 点前的聚合数据,再从明细表中汇总 6 点 0 分到 6 点 30 分的明细数据,将二者相加,即得到 6 点 30 分之前的实时汇总数据。如下图所示:

这里只列出来交易的时间戳,具体的表字段请参考后面章节中的建表语句。

2. 实时交易数据写入 Aurora

我们会将实时交易的数据,写入 Aurora 的表中,先来准备 Aurora 的参数组和集群。

2.1 配置 Aurora 参数

为了实现 Zero-ETL,需要使用特定的参数来创建 Aurora 集群,我们来创建一个 Aurora 集群使用的参数组,具体步骤请参考 Zero-ETL 的配置文档的 “Configure the Aurora MySQL source with a customized DB cluster parameter group” 章节。

需要修改的参数包括:

  • aurora_enhanced_binlog=1
  • binlog_backup=0
  • binlog_format=ROW
  • binlog_replication_globaldb=0
  • binlog_row_image=full
  • binlog_row_metadata=full

另外,确认参数 binlog_transaction_compression parameter 没有设置为 ON,参数 binlog_row_value_options 没有设置为 PARTIAL_JSON。

2.2 创建 Aurora 集群

使用上一步创建的参数组,创建名称为 htap-demo 的 Aurora 集群,具体步骤请参考 Zero-ETL 的配置文档 的 “Configure the Aurora MySQL source with a customized DB cluster parameter group” 章节,这里不再重复赘述。

2.3 在 Aurora 中创建表

可以从 Cloud9 或者其他的数据库客户端,登录上一步创建的 Aurora 集群。

在 Aurora 中创建实时交易明细表,该表用来保存实时产生的出租车的出行记录。使用如下脚本创建数据库 taxi_trips 和表 nyc_source。

CREATE DATABASE taxi_trips;

CREATE TABLE `taxi_trips`.`nyc_source` (
  `VendorID` bigint DEFAULT NULL,
  `tpep_pickup_datetime` timestamp NULL DEFAULT NULL,
  `tpep_dropoff_datetime` timestamp NULL DEFAULT NULL,
  `passenger_count` double DEFAULT NULL,
  `trip_distance` double DEFAULT NULL,
  `RatecodeID` double DEFAULT NULL,
  `store_and_fwd_flag` text,
  `PULocationID` bigint DEFAULT NULL,
  `DOLocationID` bigint DEFAULT NULL,
  `payment_type` bigint DEFAULT NULL,
  `fare_amount` double DEFAULT NULL,
  `extra` double DEFAULT NULL,
  `mta_tax` double DEFAULT NULL,
  `tip_amount` double DEFAULT NULL,
  `tolls_amount` double DEFAULT NULL,
  `improvement_surcharge` double DEFAULT NULL,
  `total_amount` double DEFAULT NULL,
  `congestion_surcharge` double DEFAULT NULL,
  `airport_fee` double DEFAULT NULL,
  `pk_id` bigint NOT NULL AUTO_INCREMENT,
   PRIMARY KEY (pk_id),
   INDEX `index_1` (`VendorID`, `tpep_pickup_datetime`, `PULocationID`, 
                    `DOLocationID`, `payment_type`)
);

创建表的时候,必须要包含主键,否则 Zero-ETL 会失败,另外要根据将来使用的查询创建索引。

2.4 Glue connection

导入数据时,需要使用连接到 Aurora 的 Glue connection, 我们可以在 Cloud9 上执行如下命令,创建名称为 htap-demo 的 Glue connection:

aws glue create-connection \
  --connection-input '{"Name": "htap-demo", "ConnectionType": "JDBC",
    "ConnectionProperties":{"JDBC_CONNECTION_URL":"jdbc:mysql://<Aurora-Endpoint>:3306/taxi_trips","USERNAME":"<Aurora-User-Name>","PASSWORD":"<Aurora-Password>", "JDBC_ENFORCE_SSL":"false"},
    "PhysicalConnectionRequirements":{"SubnetId":"<Your-subnet>","SecurityGroupIdList":["<Your-Security-Group>"],"AvailabilityZone":"us-east-1a"}}' \
  --region us-east-1

替换上面命令中的参数,我们实验用到的服务都位于美东 1。

2.5 准备写入测试数据

我们用一段 Python 代码来导入 NYC 的数据,可以在 Cloud9 或者 EC2 上执行这段代码,先来安装依赖包:

sudo yum install python3-pip
python3 -m pip install awswrangler pymysql pandas boto3 fsspec s3fs

Python 代码如下:

import awswrangler as wr
import pymysql
import pandas as pd
import boto3

bucket_name = 'nyc-tlc'
file_prefix = 'trip data'
schema_name = 'taxi_trips'
table_name = 'nyc_source'
rds_connection_name = 'htap-demo'
    
s3 = boto3.resource('s3', region_name='us-east-1')
    
bucket = s3.Bucket(bucket_name)
con_mysql = wr.mysql.connect(rds_connection_name)
for obj in bucket.objects.filter(Prefix=file_prefix):
    if obj.key.endswith('.parquet') and (obj.key).rsplit('/')[-1] >= 'yellow_tripdata_2023-01.parquet' :
        df = pd.read_parquet(f's3://{bucket_name}/{obj.key}')
        wr.mysql.to_sql(df, con_mysql, schema=schema_name, table=table_name, mode="append", use_column_names=True)
        con_mysql.commit()
        
con_mysql.close()

执行代码,导入 2023 年前 6 个月的 NYC 数据。

请注意:我们这里只导入了 2023 年前 6 个月的数据,总共 6 个数据文件,导入的数据总量是大约 2 千万条,由于导入的数据文件较大,创建 Cloud9 或者 EC2 的时候,尽量选用配置高一些的实例,例如 8xlarge。另外需要为 Cloud9 或 EC2 添加访问 S3 和 Glue 的权限。

3. Redshift Serverless 和 Zero-ETL

我们可以参考 Zero-ETL 的配置文档,配置 Redshift Serverless,Workgroup 的名称为 htap-demo-wg,并部署从 Aurora 到 Redshift Serverless 的 Zero-ETL。部署完成后,Aurora 中的数据库 taxi_trips 会同步到 Redshift 中。

我们可以在 Redshift 的查询页面中,看到同步过来的数据库 taxi_trips,以及里面包含的同名的 Schema,和 Schema 下的 nyc_source 表。如下图:

可以查询数据表,数据同步的延迟大概在 1 分钟左右。

4. Redshift 数据聚合

在 Redshift 中对 Zero-ETL 同步过来的数据,进行聚合,这步操作是由 MWAA 来调度和触发的。

4.1 在 Redshift 中创建聚合表

先在 Redshift 中默认的 dev 数据库和 public Schema 下,创建聚合表,它用来保存每个小时的聚合数据。

CREATE TABLE "nyc_hour_aggregation" (
  "VendorID" bigint,
  "agg_date" date,
  "agg_hour" int,
  "passenger_count" float8,
  "trip_distance" float8,
  "PULocationID" bigint,
  "DOLocationID" bigint,
  "payment_type" bigint,
  "fare_amount" float8,
  "tip_amount" float8,
  "tolls_amount" float8,
  "improvement_surcharge" float8,
  "total_amount" float8,
  "congestion_surcharge" float8,
  "airport_fee" float8,
  "pk_id" bigint IDENTITY NOT NULL
) diststyle auto sortkey("agg_date");

4.2 聚合

创建 Redshift 存储过程,它以每小时为间隔执行 SQL,对上一个小时内发生的实时交易数据进行聚合,并保存结果到聚合表:

CREATE OR REPLACE PROCEDURE nyc_hour_aggregation()
LANGUAGE plpgsql
AS $$
BEGIN
    insert into dev.public.nyc_hour_aggregation
      select
      "VendorID",
      to_date(tpep_pickup_datetime, 'YYYY-MM-DD') as "agg_date",
      extract(hour from tpep_pickup_datetime) as "agg_hour",
      sum("passenger_count") as "passenger_count" ,
      sum("trip_distance") as "trip_distance",
      "PULocationID" ,
      "DOLocationID" ,
      "payment_type" ,
      sum("fare_amount") as "fare_amount" ,
      sum("tip_amount") as "tip_amount",
      sum("tolls_amount") as "tolls_amount" ,
      sum("improvement_surcharge") as "improvement_surcharge",
      sum("total_amount") as "total_amount" ,
      sum("congestion_surcharge") as "congestion_surcharge" ,
      sum("airport_fee") as "airport_fee"
    from taxi_trips.taxi_trips.nyc_source
    where to_date(tpep_pickup_datetime, 'YYYY-MM-DD') = to_date(dateadd(hour, -1, getdate()), 'YYYY-MM-DD')
    and extract(hour from tpep_pickup_datetime) = extract(hour from dateadd(hour, -1, getdate()))
    group by "VendorID", "agg_date", "agg_hour", "PULocationID", "DOLocationID", "payment_type";
END;
$$;

我们将使用 MWAA 来触发和调用这个 Redshift 存储过程,具体的 DAG 的代码请参考后面的章节。

5. 聚合数据回写 Aurora

我们通过 Glue ETL 将 Redshift 中的聚合数据,回写到 Aurora,并在 MWAA 中调度这一步操作。

5.1 在 Aurora 中创建聚合表

在 Aurora 中创建数据库 taxi_trips_agg 和 聚合表 nyc_hour_aggregation,它用来保存 Redshift 回写的聚合数据。

CREATE DATABASE taxi_trips_agg;

CREATE TABLE `taxi_trips_agg`.`nyc_hour_aggregation`(
  `VendorID` bigint DEFAULT NULL,
  `agg_date` date,
  `agg_hour` int,
  `passenger_count` double DEFAULT NULL,
  `trip_distance` double DEFAULT NULL,
  `PULocationID` bigint DEFAULT NULL,
  `DOLocationID` bigint DEFAULT NULL,
  `payment_type` bigint DEFAULT NULL,
  `fare_amount` double DEFAULT NULL,
  `tip_amount` double DEFAULT NULL,
  `tolls_amount` double DEFAULT NULL,
  `improvement_surcharge` double DEFAULT NULL,
  `total_amount` double DEFAULT NULL,
  `congestion_surcharge` double DEFAULT NULL,
  `airport_fee` double DEFAULT NULL,
  `pk_id` bigint NOT NULL,
   PRIMARY KEY (pk_id),
   INDEX `index_1` (`VendorID`, `agg_date`, `PULocationID`, 
                    `DOLocationID`, `payment_type`, `agg_hour`) 
);

我们也根据将来使用的查询创建索引。

5.2 使用 Glue ETL Job 回写聚合数据

在 Cloud9 使用 AWS CLI 命令创建 Glue ETL Job,实现聚合数据从 Redshift 到 Aurora 的回写。先创建一个 Python 脚本 redshift-2-aurora.py,内容如下:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import *
import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
print(args)

table_name = 'table_hour_aggregation'
rds_connection_url = 'jdbc:mysql://<Aurora-Endpoint>:3306/taxi_trips_agg'
db_user = '<Aurora-User-Name>'
db_password = '<Aurora-Password>'
redshift_connection_url = "jdbc:redshift://<Redshift-Endpoint>?user=<Redshift-User-Name>&password=<Redshift-Password>"

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

sql_context = SQLContext(sc)

df = sql_context.read \
    .format("io.github.spark_redshift_community.spark.redshift") \
    .option("aws_iam_role", "<Redshift-IAM-Role-ARN>") \
    .option("url", redshift_connection_url) \
    .option("query", "select * from public.nyc_hour_aggregation where agg_date=to_date(dateadd(hour, -1, getdate()), 'YYYY-MM-DD') and agg_hour=extract(hour from dateadd(hour, -1, getdate()))") \
    .option("tempdir", "<S3-folder>") \
    .load()

df.write \
  .mode("append") \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", rds_connection_url) \
  .option("dbtable", table_name) \
  .option("user", db_user) \
  .option("password", db_password) \
  .save()

job.commit()

替换上面 Python 脚本中的参数,将 Python 脚本上传到 S3:

aws s3 cp redshift-2-aurora.py s3://<S3-Bucket>/scripts

准备 Glue ETL job 的配置文件 job_configuration.json,内容如下:

{
  "Name": "Redshift-2-Aurora",
  "Role": "<Glue-IAM-Role-ARN>",
  "ExecutionProperty": {
    "MaxConcurrentRuns": 1
  },
  "Command": {
    "Name": "glueetl",
    "ScriptLocation": "s3://<S3-Bucket>/scripts/redshift-2-aurora.py",
    "PythonVersion": "3",
  },
  "DefaultArguments": {
    "--TempDir": "s3://<S3-Bucket>/temp-dir/",
    "--job-bookmark-option": "job-bookmark-disable",
    "--enable-continuous-cloudwatch-log": "true",
    "--connections": ["htap-demo-network"] 
  },
  "MaxRetries": 0,
  "Timeout": 2880,
  "GlueVersion": "4.0",
  "WorkerType": "",
  "NumberOfWorkers": 2
}

因为对 Aurora 和 Redshift 都是用的内网访问,需要使用 AWS CLI 创建一个名称为 htap-demo-network 的 Network 类型的 Glue connection:

aws glue create-connection \
    --connection-input '{"Name":"htap-demo-network", "ConnectionType":"NETWORK", "ConnectionProperties": {},
      "PhysicalConnectionRequirements":{"SubnetId":"<Your-Subnet>", "SecurityGroupIdList":["<Your-Security-Group-ID>"],"AvailabilityZone":"us-east-1a"}}' \
    --region us-east-1

使用 AWS CLI 创建 Glue ETL job:

aws glue create-job --cli-input-json file://job_configuration.json --connections '{"Connections": ["htap-demo-network"]}'

我们将使用 MWAA 来触发和调用这个 Glue ETL job,具体的 DAG 的代码请参考后面的章节。

6. 使用 MWAA 来调度流程

我们使用 MWAA 来调度前面章节中的数据聚合和回写操作。

6.1 部署 MWAA

MWAA(Amazon Managed Workflows for Apache Airflow)是 AWS 提供的托管的 Airflow。使用 MWAA,我们可以用 Airflow 和 Python 创建工作流程,而无需管理底层基础设施以实现可扩展性、可用性和安全性。

我们可以参考 MWAA 的部署文档,来部署 MWAA 的环境。部署完成后,将 S3,CloudWatch,Redshift 和 Glue 的访问权限赋予 MWAA 的执行角色。

6.2 部署 DAG

因为 DAG 需要使用 AWS Secret Manager 提供访问 Redshift 的认证信息,我们用 AWS CLI 来创建一条名称为 htap-demo 的 Secret:

aws secretsmanager create-secret --name htap-demo --secret-string '{"username":"<Redshift-User-Name>","password":"<Redshift-Password>","dbname":"dev","host":"<Redshift-Endpoint>","port":5439,"namespaceName":"htap-demo","engine":"redshift"}'

准备要执行的 DAG 代码,先创建一个 Python 脚本 nyc_hour_aggregation.py,内容如下:

from airflow import DAG
from datetime import datetime, timedelta
import redshift_connector
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator

# Define DAG
with DAG(
    dag_id = 'nyc_hour_aggregation',
    schedule_interval = '05 * * * *',
    start_date = datetime(2024, 5, 1),
    max_active_runs = 1,
    catchup = True
) as dag:

    begin = EmptyOperator(task_id="begin")
    end = EmptyOperator(task_id="end")

    aggregation_in_redshift = RedshiftDataOperator(
        task_id="aggregation_in_redshift",
        database="dev",
        workgroup_name="htap-demo-wg",
        secret_arn="<Redshift-Secret-ARN>",
        sql="call nyc_hour_aggregation();",
        poll_interval=10,
        wait_for_completion=True
    )

    glue_etl_job = GlueJobOperator(
        task_id="glue_etl_job",
        job_name="Redshift-2-Aurora",
        region_name="us-east-1",
        verbose=True
    )

    begin >> aggregation_in_redshift >> glue_etl_job >> end

将上面的 Python 脚本上传到 S3 上 DAG 指定的目录:

aws s3 cp nyc_hour_aggregation.py s3://<S3-Bucket>/dags/

DAG 会在每小时后的第 5 分钟自动触发执行,这时 Zero-ETL 已经完成了上一个小时的数据同步,这样我们就把 Redshift 中聚合好的数据,回写到了 Aurora 里。

7. 性能测试

我们将对 NYC 的数据进行一个实时汇总查询的测试,测试范围是 2023 年 1 月 1 日,从 0 点到 6 点 30 分左右的实时汇总。我们在测试中对比 Redshift 和 Aurora 的并发读操作的性能。

先将测试代码 redshift-read.py 保存到本地,然后使用 GNU Parallel 工具来实现压测,我们可以参考 GNU Parallel 部署文档来部署 Parallel 工具。执行 Redshift 的测试脚本 redshift-read.py 如下:

CONF_FILE=parallel.red
rm -rf $CONF_FILE
for i in $(seq 1 <并发线程数>)
do
   echo "python redshift-read.py 1 <单个线程查询的次数> > result.log" >> $CONF_FILE
done
wc -l $CONF_FILE

start_time=`date +'%Y-%m-%d %H:%M:%S'`
start_seconds=$(date --date="$start_time" +%s)
parallel -j <并发线程数> < $CONF_FILE
end_time=`date +'%Y-%m-%d %H:%M:%S'`
end_seconds=$(date --date="$end_time" +%s)
echo "本次运行时间: "$((end_seconds-start_seconds))"s"

替换上面脚本中的 <并发线程数> 和 <单个线程查询的次数>,然后在 Shell 中执行脚本。

再来测试 Aurora 并发读的性能,将 测试代码 aurora-read.py 保存到本地,执行 Aurora 的测试脚本 aurora-read.py 如下:

CONF_FILE=parallel.aurora
rm -rf $CONF_FILE
for i in $(seq 1 <并发线程数>)
do
   echo "python aurora-read.py 1 <单个线程查询的次数> > result.log" >> $CONF_FILE
done
wc -l $CONF_FILE

start_time=`date +'%Y-%m-%d %H:%M:%S'`
start_seconds=$(date --date="$start_time" +%s)
parallel -j <并发线程数> < $CONF_FILE
end_time=`date +'%Y-%m-%d %H:%M:%S'`
end_seconds=$(date --date="$end_time" +%s)
echo "本次运行时间: "$((end_seconds-start_seconds))"s"

替换上面脚本中的 <并发线程数> 和 <单个线程查询的次数>,然后在 Shell 中执行脚本。

Redshift 我们使用了 128 RPU,Aurora我们使用 2 台 r6g.2xlarge 读实例,分别采用 50 个,100 个,150 个,200 个并发,每个并发查询 100 次,测试结果如下图:

可以看出,在 Aurora 中读取 Redshift 聚合后的结果,并结合 Aurora 中的实时数据,来实现 HTAP 的实时聚合的功能,查询性能是可以满足客户需求的。

8. 资源释放

测试结束后,需要释放的资源包括:

  • Redshift Serverless 的 Namespace 和 Workgroup
  • RDS
  • Cloud9
  • MWAA

本篇作者

Dalei Xu

亚马逊云科技解决方案架构师,负责 AWS 数据分析的解决方案的咨询和架构设计。多年从事一线开发,在数据开发、架构设计和组件管理方面积累了丰富的经验,希望能将 AWS 优秀的服务组件,推广给更多的企业用户,实现与客户的双赢和共同成长。

Zhiyong Su

亚马逊云科技解决方案架构师,主要负责企业级客户的上云或跨云迁移工作,同时致力于亚马逊云服务在国内的应用及推广。

冯源

亚马逊云科技解决方案架构师,曾服务于南洋理工大学、惠普和新智云。工作涉及企业混合云环境运维管理、运营管理、混合云平台和云服务解决方案。有十余年企业基础设施咨询及实施、项目管理和交付以及混合云平台研发管理经验。2022 年加入 AWS,负责零售、餐饮和制造等行业客户。