亚马逊AWS官方博客

快速构建基于 AWS Glue 的抽取跨区域 MySQL 8 的数据管道

在这个互联网高速发展,信息爆炸的时代,数据对于业务决策是至关重要的一环。随着数据类型的日益复杂,除了传统的交易数据,各种日志信息,行为数据也蕴含丰富的信息。如何简单高效的构建一个复杂的数据处理流程至关重要。

AWS Glue 是一种完全托管的数据目录和 ETL工具,如果您是首次使用AWS Glue详细演示和概念讲解可参照此博客。当前AWS Glue原生的JDBC连接库不支持MySQL 8,本文展示如何利用自定义的JDBC库连接外部数据源。本文以MySQL 8 举例,但任何支持JDBC连接的数据源都适用于此方法。由于目前国内宁夏区域的成本更经济,所以生产系统的数据库在北京,但数据处理系统在宁夏的情况并不少见。

本文演示在宁夏区使用AWS Glue服务把位于北京区域私有子网中的RDS (Mysql 8)的数据定时抽取到位于宁夏的S3中,并做处理。本文假定您已经有配置AWS VPC,子网,安全组,角色的经验,如不清楚概念和如何配置请参照AWS官网

 

第一步:准备MySQL 8数据源

若已经有现成的数据源及数据可略过此部分。

注意:RDS数据库不能开公有访问(Public accessibility选择No),模拟私有子网中的RDS。

 

在AWS RDS中创建一个RDS数据库,如果不知道如何创建AWS RDS数据库请参照此文,数据库引擎版本选择8.0.11或以上。记住选择的VPC和安全组,下一步配置Glue的时候需要用到。

本次测试数据是由TPC-DS生成的,建表DDL如下:

CREATE TABLE `catalog_sales`(
  `cs_sold_date_sk` bigint, 
  `cs_sold_time_sk` bigint, 
  `cs_ship_date_sk` bigint, 
  `cs_bill_customer_sk` bigint, 
  `cs_bill_cdemo_sk` bigint, 
  `cs_bill_hdemo_sk` bigint, 
  `cs_bill_addr_sk` bigint, 
  `cs_ship_customer_sk` bigint, 
  `cs_ship_cdemo_sk` bigint, 
  `cs_ship_hdemo_sk` bigint, 
  `cs_ship_addr_sk` bigint, 
  `cs_call_center_sk` bigint, 
  `cs_catalog_page_sk` bigint, 
  `cs_ship_mode_sk` bigint, 
  `cs_warehouse_sk` bigint, 
  `cs_item_sk` bigint, 
  `cs_promo_sk` bigint, 
  `cs_order_number` bigint, 
  `cs_quantity` bigint, 
  `cs_wholesale_cost` double, 
  `cs_list_price` double, 
  `cs_sales_price` double, 
  `cs_ext_discount_amt` double, 
  `cs_ext_sales_price` double, 
  `cs_ext_wholesale_cost` double, 
  `cs_ext_list_price` double, 
  `cs_ext_tax double` double, 
  `cs_coupon_amt` double, 
  `cs_ext_ship_cost` double, 
  `cs_net_paid` double, 
  `cs_net_paid_inc_tax` double, 
  `cs_net_paid_inc_ship` double, 
  `cs_net_paid_inc_ship_tax` double, 
  `cs_net_profit double` double,
  PRIMARY KEY ( `cs_item_sk`,`cs_order_number` ));
CREATE TABLE `warehouse`(
  `w_warehouse_sk` bigint, 
  `w_warehouse_id` VARCHAR(100) , 
  `w_warehouse_name` VARCHAR(100) , 
  `w_warehouse_sq_ft` bigint, 
  `w_street_number` bigint, 
  `w_street_name` VARCHAR(255) , 
  `w_street_type` VARCHAR(255) , 
  `w_suite_number` VARCHAR(255) , 
  `w_city` VARCHAR(25) , 
  `w_county` VARCHAR(25) , 
  `w_state` VARCHAR(25) , 
  `w_zip` bigint, 
  `w_country` VARCHAR(100) , 
  `w_gmt_offset` bigint,
  PRIMARY KEY ( `w_warehouse_sk` ));

接下来插入一些测试数据

INSERT INTO warehouse VALUES
(1,"AAAAAAAABAAAAAAA","Conventional childr",977787,651,"6th" ,"Parkway","Suite 470","Shiloh","San Miguel County","NM",89275,"United States",-7),
(2,"AAAAAAAACAAAAAAA","Important issues liv",138504,600,"View First","Avenue","Suite P","Fairview","Ziebach County","SD",55709,"United States",-6),
(3,"AAAAAAAADAAAAAAA","Doors canno",294242,534,"Ash Laurel","Dr.","Suite 0","Five Points","Ziebach County","SD",56098,"United States",-6),
(4,"AAAAAAAAEAAAAAAA","Bad cards must make.",621234,368,"Wilson Elm","Drive","Suite 80","Five Points","Richland County","OH",46098,"United States",-5),
(5,"AAAAAAAAAAAAAAA","Plain,reluctant",514427,410,"3rd" ,"ST","Suite 370","Shiloh","Ziebach County","SD",59275,"United States",-6),
(6,"AAAAAAAAGAAAAAAA","Local,mass universi",838797,957,"Lincoln Adams","Dr.","Suite X","Five Points","Oglethorpe County","GA",36098,"United States",-5);

INSERT INTO catalog_sales VALUES
 (2452539 , 62417 , 2452614 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 20225 , 15 , 3 , 47071 , 1169 , 144033688 , 77 , 49.08 , 130.55 , 60.05 , 5428.5 , 4623.85 , 3779.16 , 10052.35 , 92.47 , 0.0 , 3417.26 , 4623.85 , 4716.32 , 8041.11 , 8133.58 , 844.69),
 (2452539 , 62417 , 2452595 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 20465 , 12 , 2 , 138789 , 122 , 144033688 , 25 , 76.99 , 147.82 , 0.0 , 3695.5 , 0.0 , 1924.75 , 3695.5 , 0.0 , 0.0 , 1330.25 , 0.0 , 0.0 , 1330.25 , 1330.25 , -1924.75),
 (2452539 , 62417 , 2452567 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 20354 , 18 , 5 , 75657 , 806 , 144033688 , 72 , 19.68 , 45.06 , 39.2 , 421.92 , 2822.4 , 1416.96 , 3244.32 , 169.34 , 0.0 , 875.52 , 2822.4 , 2991.74 , 3697.92 , 3867.26 , 1405.44),
 (2452539 , 62417 , 2452612 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 20606 , 9 , 1 , 279408 , 588 , 144033688 , 70 , 97.07 , 125.22 , 102.68 , 1577.8 , 7187.6 , 6794.9 , 8765.4 , 431.25 , 0.0 , 1489.6 , 7187.6 , 7618.85 , 8677.2 , 9108.45 , 392.7),
 (2452539 , 62417 , 2452624 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 22191 , 3 , 1 , 13807 , 811 , 144033688 , 75 , 78.43 , 112.93 , 60.98 , 3896.25 , 4573.5 , 5882.25 , 8469.75 , 411.61 , 0.0 , 3557.25 , 4573.5 , 4985.11 , 8130.75 , 8542.36 , -1308.75),
 (2452539 , 62417 , 2452580 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 22277 , 19 , 2 , 23745 , 1165 , 144033688 , 26 , 29.46 , 68.34 , 33.48 , 906.36 , 870.48 , 765.96 , 1776.84 , 0.0 , 0.0 , 124.28 , 870.48 , 870.48 , 994.76 , 994.76 , 104.52),
 (2452539 , 62417 , 2452548 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 20314 , 8 , 2 , 131695 , 7 , 144033688 , 34 , 85.14 , 136.22 , 19.07 , 3983.1 , 648.38 , 2894.76 , 4631.48 , 25.93 , 0.0 , 555.56 , 648.38 , 674.31 , 1203.94 , 1229.87 , -2246.38),
 (2452539 , 62417 , 2452570 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 20462 , 17 , 1 , 218911 , 1484 , 144033688 , 48 , 48.35 , 88.96 , 61.38 , 1323.84 , 2946.24 , 2320.8 , 4270.08 , 0.0 , 0.0 , 2135.04 , 2946.24 , 2946.24 , 5081.28 , 5081.28 , 625.44),
 (2452539 , 62417 , 2452603 , 4961658 , 370240 , 5575 , 2777357 , 4961658 , 370240 , 5575 , 2777357 , 30 , 22370 , 18 , 6 , 172341 , 622 , 144033688 , 74 , 43.45 , 122.96 , 33.19 , 6642.98 , 2456.06 , 3215.3 , 9099.04 , 49.12 , 0.0 , 2274.76 , 2456.06 , 2505.18 , 4730.82 , 4779.94 , -759.24),
 (2452539 , 16946 , 2452624 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20277 , 4 , 3 , 136947 , 641 , 144033689 , 93 , 34.76 , 74.73 , 17.18 , 5352.15 , 1597.74 , 3232.68 , 6949.89 , 63.9 , 0.0 , 764.46 , 1597.74 , 1661.64 , 2362.2 , 2426.1 , -1634.94),
 (2452539 , 16946 , 2452609 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20469 , 8 , 6 , 82428 , 2 , 144033689 , 58 , 9.84 , 10.13 , 6.17 , 229.68 , 357.86 , 570.72 , 587.54 , 14.31 , 0.0 , 158.34 , 357.86 , 372.17 , 516.2 , 530.51 , -212.86), 
 (2452539 , 16946 , 2452580 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20386 , 18 , 1 , 105888 , 135 , 144033689 , 100 , 62.86 , 173.49 , 32.96 , 14053.0 , 3296.0 , 6286.0 , 17349.0 , 71.19 , 1516.16 , 1561.0 , 1779.84 , 1851.03 , 3340.84 , 3412.03 , -4506.16),
 (2452539 , 16946 , 2452613 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20559 , 4 , 6 , 109273 , 1327 , 144033689 , 22 , 60.04 , 64.84 , 27.23 , 827.42 , 599.06 , 1320.88 , 1426.48 , 17.97 , 0.0 , 670.34 , 599.06 , 617.03 , 1269.4 , 1287.37 , -721.82),
 (2452539 , 16946 , 2452587 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20634 , 9 , 1 , 100591 , 332 , 144033689 , 8 , 61.81 , 100.75 , 83.62 , 137.04 , 668.96 , 494.48 , 806.0 , 33.44 , 0.0 , 394.88 , 668.96 , 702.4 , 1063.84 , 1097.28 , 174.48),
 (2452539 , 16946 , 2452586 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20471 , 14 , 2 , 150157 , 855 , 144033689 , 50 , 80.2 , 144.36 , 135.69 , 433.5 , 6784.5 , 4010.0 , 7218.0 , 542.76 , 0.0 , 3464.5 , 6784.5 , 7327.26 , 10249.0 , 10791.76 , 2774.5 ),
 (2452539 , 16946 , 2452569 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20436 , 7 , 1 , 204859 , 576 , 144033689 , 27 , 41.17 , 112.8 , 109.41 , 91.53 , 2954.07 , 1111.59 , 3045.6 , 206.78 , 0.0 , 91.26 , 2954.07 , 3160.85 , 3045.33 , 3252.11 , 1842.48 ),
 (2452539 , 16946 , 2452627 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20730 , 1 , 4 , 76770 , 142 , 144033689 , 57 , 72.81 , 104.84 , 13.62 , 5199.54 , 776.34 , 4150.17 , 5975.88 , 15.52 , 0.0 , 537.51 , 776.34 , 791.86 , 1313.85 , 1329.37 , -3373.83 ),
 (2452539 , 16946 , 2452579 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20264 , 5 , 5 , 3381 , 104 , 144033689 , 26 , 96.86 , 213.09 , 196.04 , 443.3 , 5097.04 , 2518.36 , 5540.34 , 50.97 , 0.0 , 110.76 , 5097.04 , 5148.01 , 5207.8 , 5258.77 , 2578.68),
 (2452539 , 16946 , 2452611 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20238 , 10 , 2 , 226405 , 281 , 144033689 , 21 , 50.34 , 123.33 , 45.63 , 1631.7 , 958.23 , 1057.14 , 2589.93 , 0.0 , 95.82 , 595.56 , 862.41 , 862.41 , 1457.97 , 1457.97 , -194.73),
 (2452539 , 16946 , 2452564 , 4096275 , 861512 , 6396 , 60433 , 4096275 , 861512 , 6396 , 60433 , 24 , 20400 , 17 , 4 , 242191 , 711 , 144033689 , 4 , 39.54 , 48.63 , 4.37 , 177.04 , 17.48 , 158.16 , 194.52 , 0.87 , 0.0 , 5.8 , 17.48 , 18.35 , 23.28 , 24.15 , -140.68 )

这两张表中的w_warehouse_sk 和cs_warehouse_sk可做关联,用于后续的数据处理。

 

第二步:跨区域网络配置

如无跨区域的资源需要此步骤可省略。

由于Glue工作于宁夏区而数据源RDS在北京区。RDS作为交易数据库通常位于私有子网,不连通互联网。所以需要调通Glue和RDS之间的网络配置。假设北京的RDS位于VPC 1 (vpc-1111)网段为10.0.0.0/16,AWS Glue任务会使用VPC 2(vpc-2222)网段为172.31.0.0/16。网段不能重叠否则无法建立对等连接。

在北京区域的VPC控制台创建一个VPC 对等连接,请求方为vpc-1111,接收方为vpc-2222。

 

创建完成后需要到宁夏区域vpc-2222的VPC控制台对等连接中接受此请求。等待状态变为绿色的“活动”,说明对等连接生效。

 

编辑VPC-1111的路由表,该路由表要关联给RDS所在的子网。加上VPC-2222的网段到对等连接的路由。

 

同理编辑VPC-2222的路由表,该路由表要关联给Glue指定的子网。加上VPC-1111的网段到对等连接的路由。

 

现在VPC-1111和VPC-2222之间的私有网络已经打通,如果对等连接配置有任何问题请参照官网

除了网络外,还需要配置安全组规则,对于Glue和RDS所在的网段开放。假设RDS使用安全组sg-1111,Glue将会使用安全组sg-2222(如没有请先创建)。

Sg-1111入站规则配置例子如下(出站全放开):

 

Sg-2222入站规则配置例子如下(加一条对sg-2222全开入站规则,出站全放开):

 

 

第三步:配置Glue工作流

配置AWS Glue 数据源连接

连接是为了指定VPC,子网和安全组,以便Glue知道用什么样的网络配置通信。如果是访问非VPC内资源如S3则不需要创建连接。

 

1.登陆AWS Glue控制台,选择宁夏区,选择连接,创建一个新MySQL 8连接。

2.连接类型选择JDBC,并把第一步中的RDS JDBC url 用户名和密码填写上去,VPC,子网和安全组选择上一步中的vpc-2222,绑定了路由表的任意子网和sg-2222。注意:Glue所在的子网需要有S3 Endpoint的路由。

创建成功连接后不要测试连接,因为Glue默认的MySQL JDBC驱动不支持MySQL 8 版本。

配置AWS Glue 作业

Glue的作业是定义单个的ETL操作,接下来我们会定义三个作业:从RDS读取catalog_sales,从RDS读取warehouse,和聚合两张表。

 

1.选择宁夏区域,并选择添加作业

2.作业属性中的IAM角色选择一个带有AmazonS3FullAccess策略的角色,如没有请创建。其他配置如图所示。

3.在“安全配置、脚本库和作业参数(可选)”设置中配置作业参数,参数值之后可以改

4.下一步的连接选择前一步创建的MySQL 8 连接。再下一步到脚本编辑页面,贴上如下脚本。点击保存并退出。同时下载mysql-connector-java-8.0.18.jar,并上传到jdbcS3path所指定的S3存储桶位置。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

args = getResolvedOptions(sys.argv,['tablename','dbuser','dbpassword','dburl','jdbcS3path','s3OutputPath'])
#改成自己的mysql8
connection_mysql8_options = {
    "url": args['dburl'],
    "dbtable": args['tablename'],
    "user": args['dbuser'],
    "password": args['dbpassword'],
    "customJdbcDriverS3Path": args['jdbcS3path']+"mysql-connector-java-8.0.18.jar", #先编译jdbc jar 传到S3
    "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
# 从MySQL中读取数据
df_catalog = glueContext.create_dynamic_frame.from_options(connection_type="mysql",connection_options=connection_mysql8_options)
#加上filter 一般增量加载可以按照更新时间来过滤
df_filter = Filter.apply(frame = df_catalog, f = lambda x: x["cs_sold_date_sk"] >=2452539)
#写入s3位置
writer = glueContext.write_dynamic_frame.from_options(frame = df_filter, connection_type = "s3", connection_options = {"path": args['s3OutputPath']+args['tablename']}, format = "parquet")

5.此任务把RDS里的catalog_sales表加载到S3中,同样的配置创建另一个作业加载warehouse表,更改作业配置和脚本。脚本如下:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

args = getResolvedOptions(sys.argv,['tablename','dbuser','dbpassword','dburl','jdbcS3path','s3OutputPath'])
connection_mysql8_options = {
    "url": args['dburl'],
    "dbtable": args['tablename'],
    "user": args['dbuser'],
    "password": args['dbpassword'],
    "customJdbcDriverS3Path": args['jdbcS3path']+"mysql-connector-java-8.0.18.jar", #先编译jdbc jar 传到S3
    "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
# 从MySQL中读取数据
df_warehouse = glueContext.create_dynamic_frame.from_options(connection_type="mysql",connection_options=connection_mysql8_options)
#写入s3位置
writer = glueContext.write_dynamic_frame.from_options(frame = df_warehouse, connection_type = "s3", connection_options = {"path": args['s3OutputPath']+args['tablename']}, format = "parquet")

6.接下来还需要创建一个聚合任务,聚合两张表生成聚合表可以供报表查询。创建流程还是同上(连接不是必须选的,S3是公网资源),传入作业参数为

脚本使用Glue中存储的S3数据结构,具体存储S3的数据结构将在爬网程序章节介绍。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

args = getResolvedOptions(sys.argv,['s3OutputPath'])

df=spark.sql("select w_warehouse_name, w_city,count(*) as cnt_sales, sum(cs_list_price) as total_revenue,sum(cs_net_profit_double) as total_net_profit,sum(cs_wholesale_cost) as total_cost from default.catalog_sales join default.warehouse on cs_warehouse_sk = w_warehouse_sk group by w_warehouse_name, w_city")
df.write.mode("overwrite").format("parquet").save(args['s3OutputPath']+"warehouse_report/")

创建爬网程序

数据在S3上之后失去了在数据库中的表结构,爬网程序则是爬取S3中数据的表结构并能使其他程序如Apache Presto或者Amazon Athena直接查询S3中的数据。下面我们将配置一个爬网程序爬取刚刚加载到S3中的catalog_sales 和warehouse的数据结构。

 

1.创建新的爬网程序

2.选择S3位置,如果两张表放在同一个文件夹路径下,可以提供共同的父路径。如果没有共享父路径可以逐一添加。

3.IAM角色可以选择和Glue 工作一样的(需要S3的读权限),计划选择按需,输出数据库可以选择default。其他可以保持默认。

4.如果Glue RDS工作完成后,运行爬网程序并成功后可以看到Glue的数据目录中可以看到两张表:

 

创建Glue工作流

如果需要编排复杂的数据处理流程则需要处理任务之间的依赖关系,Glue的工作流程功能可以解决上下游依赖及定时问题。

1.首先创建一个工作流程

2.创建好后在工作流程中添加一个按需的触发器作为起始点

3.再添加了之前创建的两个RDS摄取工作,Glue工作和工作之间需要触发器串联,接下来添加一个事件触发器

4.触发器选择事件触发,并且要选择All监视事件后出发,这个触发器会同时用于两个RDS工作。

5.接下来在触发器后添加爬网程序,后触发聚合Glue工作,完成的工作流程如下。

可根据需要在聚合任务后在添加一个爬网程序来爬取新生成表的结构。

 

终于我们完成一个完整的ETL处理流程的创建了。由于工作流程的起始触发器是按需,所以需要手动选择工作流程并在操作中选择运行。如果需要定时任务,则可以把起始触发器修改为定时类型。用Glue爬好的S3表可以直接被Amazon Athena服务查询哟,感兴趣的快去试一试吧!

 

 

本篇作者

贺浏璐

AWS解决方案架构师,负责AWS云计算方案的咨询和架构设计,同时致力于大数据方面的研究和应用。曾担任亚马逊大数据团队数据工程师,在大数据架构,数据管道业务处理,和Business Intelligence方面有丰富的实操经验。