亚马逊AWS官方博客

从托管服务 Amazon Glue 访问客户 Amazon VPC 内部私有数据存储

专题摘要

亚马逊云科技托管服务访问用户 Amazon VPC 内部私有资源,通常发生在计算或接口类的托管服务访问用户 VPC 内部资源的情形下。本文主要讲述云上的 ETL 服务 Amazon Glue,如何访问位于客户 VPC 内部的 Amazon RDS 数据库或 Amazon MSK 中的数据。

Amazon Glue 访问 RDS 通常是通过建立 JDBC 或 Network 类型的 Glue Connection 连接以读取 Amazon RDS 数据库中的数据。它的原理也是在设定的 VPC 和子网中生成弹性网络接口 ENI。不过,相对于 Amazon Lambda,由于 Glue 是用来对数据进行 ETL 处理的,所以在生成弹性网卡 ENI 的机制上和 Lambda 相比,不尽相同。

本文是“在亚马逊云科技上围绕 Amazon VPC 打造内外兼修的合适架构”系列主题的第四部分。本系列专题由如下几部分组成:

场景介绍

在数据湖架构中,经常需要将数据从各个源系统中的不同数据源集成到数据湖上进行存储、处理和分析。在亚马逊云科技中,Amazon Glue 是进行数据 ETL(Extract-Transform-Load)的专业工具,对来自于应用的关系数据库 RDS 以及数据仓库 Redshift 中的数据进行抽取和处理。Glue 支持 Scala 或者 PySpark 脚本进行编程。

Glue 是无服务器的托管服务,在 Glue 中可以通过建立 Glue Connection 连接来访问客户 VPC 中 RDS 的数据。我们可以根据实际情况,建立不同的 Glue Connection 来访问不同的数据存储,包括 JDBC Connection,RDS Connection 甚至是 Network Connection。

如上图所示:

  • 无论是 JDBC Connection 还是 RDS Connection,都可以通过 Glue Crawler 从数据存储中抓取 Schema,生成 Glue Data Catalog,并在 Glue Job 中通过 Scala 或者 PySpark 脚本访问 Glue Data Catalog。这两种方式本质是一致的,只不过在建立 RDS Connection 的时候,会缺省为 Connection 配置和 RDS 同样的子网,更加简便;而 JDBC Connection 中,用户需要手动选择相应的 VPC 和子网信息。
  • 在 Glue 里面建立 Network Connection,可以打通网络本身,然后通过 Glue Job 中支持的语言,例如 Scala 调用 Java JDBC 来访问 RDS 中的数据。Network Connection 同 JDBC Connection 一样,也可以选择相应的 VPC 和子网配置。

实验环境配置

我们通过如下实验,创建不同的 Glue Connection,并在 Amazon Glue Job 中通过加入 Glue Connection 来访问 RDS。

不同的 Glue Connection 在创建的时候可以选择相应的网络配置。如下表所示,在我们的实验中,建立三种类型的访问连接,RDS Connection MySQL-RDS-Connection, JDBC Connection MySQL-JDBC-Connection, Network Connection MySQL-Network-Connection。特别需要注意的是,由于 Glue 会为每一个配置的 DPU 生成一个弹性网络接口 ENI,而这些 Glue 的组件需要相互通信,因此其所关联的安全组需要设置自引用的规则。

类型 名称 子网 安全组 备注说明
Amazon RDS MySQL-RDS-Connection 自动选择和 RDS 相同的子网
TEST Private Subnet (AZ1)
rds-sg Amazon RDS Connection 实际上也是 JDBC 类型的 Connection。只不过 RDS Connection 默认会选择 RDS 所在的子网和安全组,因此需要对安全组增加一个允许所有入站和出站流量的自引用规则。以保证 Glue 的相关组件能够正常运行,当然这个自引用也保证了 Glue 在网络层面上能够访问 RDS 服务。
JDBC MySQL-JDBC-Connection TEST Private Subnet (AZ1) glue-jdbc-sg JDBC Connection 可以选择 VPC,子网 Subnet 和安全组 Security Group。这里我们选择和 RDS 同样的 VPC 和子网,安全组 glue-jdbc-sg 中需要有一个允许所有入站和出站流量的自引用规则。而且安全组 rds-sg 允许来自于安全组 glue-jdbc-sg 中的 3306 端口的入站流量,来允许访问 RDS 数据库。
Network MySQL-Network-Connection TEST Private Subnet (AZ1) glue-network-sg 和 JDBC Connection 类似,只不过 Network Connection 中不需要设置数据等的访问凭据。Network Connection 由于没有实际的数据源,因而不能通过爬虫 Glue Crawler 进行 Schema 爬取,只能通过 Glue Job 中的代码直接访问数据源。

在实验之前,我们根据架构图创建或修改现有安全组。安全组 “rds-sg” 的规则配置如下表所示:

Security Group “rds-sg” Inbound Rule
Protocol Port range Source Description
TCP(ALL) 0–65535 rds-sg 允许来自于自身安全组的所有 TCP 入站流量(允许 Glue 的组件相互通信,允许 Glue 访问 RDS MySQL)。
TCP(MySQL) 3306 glue-jdbc-sg 允许来自于安全组 glue-jdbc-sg 发起的在端口 3306 访问 RDS MySQL 的入站流量。
TCP(MySQL) 3306 glue-network-sg 允许来自于安全组 glue-network-sg 发起的在端口 3306 访问 RDS MySQL 的入站流量。
Security Group “rds-sg” Outbound Rule
Protocol Port range Destination Description
TCP(ALL) 0–65535 rds-sg 允许访问自身安全组的所有 TCP 出站流量(允许 Glue 的组件相互通信,允许 Glue 访问 RDS MySQL)。
TCP(HTTPS) 443 s3-prefix-list-id 允许通过网关终端节点 Gateway Endpoint 访问 S3 的出站流量。

安全组 “glue-jdbc-sg” 的规则配置如下表所示:

Security Group “glue-jdbc-sg” Inbound Rule
Protocol Port range Source Description
TCP(ALL) 0–65535 glue-jdbc-sg 允许来自于自身安全组的所有 TCP 入站流量(允许 Glue 的组件相互通信)。
Security Group “glue-jdbc-sg” Outbound Rule
Protocol Port range Destination Description
TCP(ALL) 0–65535 glue-jdbc-sg 允许访问自身安全组的的所有 TCP 出站流量(允许 Glue 的组件相互通信)。
TCP(MySQL) 3306 rds-sg 允许在端口 3306 访问和安全组 rds-sg 相关联的 RDS MySQL 的出站流量。
TCP(HTTPS) 443 s3-prefix-list-id 允许通过网关终端节点 Gateway Endpoint 访问 S3 的出站流量。

安全组 “glue-network-sg” 的规则配置如下表所示:

Security Group “glue-network-sg” Inbound Rule
Protocol Port range Source Description
TCP(ALL) 0–65535 glue-network-sg 允许来自于自身安全组的所有 TCP 入站流量(允许 Glue 的组件相互通信)。
Security Group “glue-network-sg” Outbound Rule
Protocol Port range Destination Description
TCP(ALL) 0–65535 glue-network-sg 允许访问自身安全组的的所有 TCP 出站流量(允许 Glue 的组件相互通信)。
TCP(MySQL) 3306 rds-sg 允许在端口 3306 访问和安全组 rds-sg 相关联的 RDS My SQL 的出站流量。
TCP(HTTPS) 443 s3-prefix-list-id 允许通过网关终端节点 Gateway Endpoint 访问 S3 的出站流量。

这里需要注意,当 Glue 通过建立 Glue Connection 来访问私有子网中的 RDS MySQL 的同时,也意味着该 Connection 已经变成了“私有的”,而 Glue 在运行时,由于需要加载 Glue Job 的作业代码、依赖库或者访问其他的 S3 存储桶中的数据,因而需要创建相应的 S3 终端节点。

这里的 S3 终端节点必须为网关(Gateway)类型的终端节点,因而在设置 Glue 的安全组的时候,是通过 VPC 中的 “s3-prefix-list-id”  来设定出站规则,这和设置其他接口(Interface)类型终端节点方式不同,后者一般可以通过终端节点所在的安全组或者子网的 CIDR 的方式进行设置。

我们按照如下的步骤进行操作:

  1. 创建 S3 存储桶 glue-rds-test-001,并创建不同的 Prefix 来存放不同的数据。其中 Prefix scripts 用来存放 Glue Job 的脚本文件,Prefix jar 用来存放依赖 jar 包,Prefix output 用来存放数据在 Glue Job 处理变换后的结果。同时,下载 MySQL  JDBC 驱动 mysql-connector-java-8.0.30.jar 并上载到 Prefix jar/中。

在 IAM 服务中创建 Glue Job 运行时的角色 GlueAccessRole,并赋予托管权限策略 “AWSGlueConsoleFullAccess” 和 “AWSGlueServiceRole”,以及内联策略 “S3BucketAccessGlue”,如下图所示:

其中,内联策略 “S3BucketAccessGlue” 拥有访问创建的 S3 存储桶 “glue-rds-test-001” 的权限,如下所示:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "s3:ListAllMyBuckets",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws-cn:s3:::glue-rds-test-001"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl",
                "s3:GetObject",
                "s3:GetObjectAcl",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws-cn:s3:::glue-rds-test-001/*"
        }
    ]
}
  1. 按照如下图所示,在 Glue 服务控制台中,选择【Connections】,点击<Add Connection>,依次输入如下信息来创建连接 MySQL-RDS-Connection。创建时,需要根据前文的信息填入相应的 RDS 访问的用户名,密码和数据库等信息。



  1. 创建完成后可以通过创建好的角色 GlueAccessRole 对该连接进行测试。在没有创建 S3 终端节点的条件下,会显示如下错误:

  1. 我们需要创建访问 S3 的 VPC 网关终端节点,以保证 Glue 可以访问 S3。在 VPC 服务控制台中,选择导航栏中【Endpoints】,点击右上角的按钮<创建终端节点>后,按照下图所示创建 S3 Gateway 类型的终端节点 endpoint-S3


  1. 创建好访问 S3 服务的网关终端节点后,再次测试,会显示连接成功。在这里需要注意的是,连接测试成功,不代表 Glue 在运行作业时能够成功。因为,Glue 在测试 RDS 连接的时候只会检测是否有 Gateway 类型的 S3 终端节点配置,而不会检测是否 Glue Connection 所配置的网络环境能够访问 S3。因此在运行 Glue Job 时,还需要确保在 Glue 所配置的网络环境中的安全组和安全访问控制列表 Network ACL 中允许出站流量访问 Amazon S3 服务。


  1. 通过类似的方式,还可以创建连接 MySQL-JDBC-ConnectionMySQL-Network-Connection。这两个连接中,VPC 的子网可以手动选择前面的 “TEST Private Subnet(AZ1)”,安全组分别配置为 “glue-jdbc-sg” 和 “glue-network-sg”。
  2. 在 Glue 服务控制台中创建名为 rds_test 的数据库,并创建爬网程序 MySQL-JDBC-CrawlerMySQL-RDS-Crawler,目标数据库为 rds_test,数据表前缀分别为 rdsjdbc。创建完成后,点击<Run crawler>,来分别抓取数据表结构。

  1. Crawler 运行成功后,可以在【Glue Data Catalog】中的数据库 “rds_test” 看到抓取的数据表。“rdsmydatabase_employees” 为通过连接 “MySQL-RDS- Connection”- 抓取的数据表,而 “jdbcmydatabase_employees” 为通过连接 “MySQL-JDBC-Connection” 抓取的数据表。

  1. 通过 Glue Job 或者 Glue Studio 创建名为 RDS Connection Job 的作业,该 Job 运行存储在 s3://glue-rds-test-001/scripts/ 中的 RDS Connection Job.scala 脚本,完成从数据表 rdsmydatabase_employees 读取数据并转换保存到 S3 存储桶的作业。 通过 Glue Studio 创建的脚本如下所示:
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    // Script generated for node MySQL RDS
    val MySQLRDS_node1 = glueContext.getCatalogSource(database="rds_test", tableName="rdsmydatabase_employees", transformationContext="MySQLRDS_node1").getDynamicFrame()

    // Script generated for node ApplyMapping 1
    val ApplyMapping1_node2 = MySQLRDS_node1.applyMapping(mappings=Seq(("lastname", "string", "lastname", "string"), ("firstname", "string", "firstname", "string"), ("jobtitle", "string", "jobtitle", "string"), ("officecode", "string", "officecode", "string"), ("reportsto", "int", "reportsto", "int"), ("email", "string", "email", "string"), ("employeenumber", "int", "employeenumber", "int")), caseSensitive=false, transformationContext="ApplyMapping1_node2")

    // Script generated for node S3 bucket
    val S3bucket_node3 = glueContext.getSinkWithFormat(connectionType="s3", options=JsonOptions("""{"path": "s3://glue-rds-test-001/output/rds-conn/employees/", "partitionKeys": []}"""), transformationContext="S3bucket_node3", format="csv").writeDynamicFrame(ApplyMapping1_node2)

    Job.commit()
  }
}
  1. 当我们用 Glue Studio 添加相应的 Glue Data Catalog 中的数据源 DataSource 的时候,该数据源对应的 Glue Connection 会被自动加入到作业 Job 的配置中,可以在作业配置信息中的【Advanced Properties】当中的 【Connections】区域看到,如下图所示。这里需要注意一点,如果用 Glue Job 手动编写脚本,则需要手动配置所需要的 Connection,以保证该 Connection 加入到 Glue 作业运行的上下文当中。从这张图也可以看到 RDS Connection 实质上就是 JDBC 类型的 Connection。

  1. 成功运行该任务后,就可以看到,数据从 MySQL 数据库的 myDatabase 中进行抽取转换处理后,成功的保存到了 S3 对象存储中。

  1. 通过类似的 Job,也可以选择表 jdbcmydatabase_employees 作为源数据,通过 Glue 连接 “MySQL-JDBC-Connection” 获取 RDS 中的数据进行处理。
  2. 当然,我们也可以在 Glue 中创建 Job 任务,基于 Glue Connection 连接  “MySQL-Network-Connection” ,手动编写 Scala 脚本直接调用 Java JDBC 来获取 RDS 数据库中的。
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import java.sql.{DriverManager, Connection, ResultSet, Statement}

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
  
    val JDBC_DRIVER = "com.mysql.jdbc.Driver"
    val DB_URL = "jdbc:mysql://XXXXXXXXXX.csz5pwpxpwqe.rds.cn-northwest-1.amazonaws.com.cn:3306/myDatabase"
    
    // Username and password can be saved in Amazon Secret Manager.
    val USER = "dbuser"
    val PASSWORD = "dbpassword"
    Class.forName(JDBC_DRIVER)

    var conn:Connection = DriverManager.getConnection(DB_URL, USER, PASSWORD)
    var stmt:Statement = conn.createStatement()
    val sql = "SELECT * FROM employees"
    var rs:ResultSet = stmt.executeQuery(sql)
    
    while(rs.next()){
      
        val firstName = rs.getString("firstName")
        val title = rs.getString("jobTitle")
        println("employee: " + firstName + ", title: " + title)
    }
    rs.close()
    stmt.close()
    conn.close()

    Job.commit()
  }
}
  1. 由于 “MySQL-Network-Connection” 连接本身只是网络层面的连接,而且是手动编写的脚本,因此,需要在作业配置【Advanced Properties】当中的【Connections】区域中手动添加连接 MySQL-Network-Connection,而且需要指定访问数据的驱动包路径 s3://glue-rds-test-001/jar/mysql-connector-java-8.0.30.jar,如下图所示:

  1. 成功运行该任务后,就可以在日志中查看从数据库中查询出来的数据。

需要注意的是:RDS 类型和 JDBC 类型的连接,也保证了网络层面的联通,因此在本实验中的场景中,可以代替 network 类型的连接,但是在某些情况下,如果存储不是关系型数据库,因此无法建立 JDBC 连接,这种情况下我们可以直接建立 Network 类型的连接。

在上述的实验中,我们建立了三种类型的 Glue Connection 连接,并编写了相应的作业。通过引用创建的 Glue Connection 连接从 RDS 中获取数据。我们可以看到在使用三种 Glue Connection 时,有如下的相同点和区别:

Glue Connection  类型 是否可以指定 VPC/子网以及安全组 创建 Glue Connection 连接是否需要用户名和密码 是否可以通过 Glue Crawler 进行 Schema 爬取 是否支持通过 Scala JDBC 脚本访问 RDS 是否需要添加 JDBC Driver 依赖包
RDS 类型
JDBC 类型
Network 类型

结论和总结

在本实验中,我们得出如下的结论:

通过 Glue 访问 RDS 有多种方式,可以借助 Glue Connection 通过 Glue Crawler 爬取 Glue Data Catalog,从而通过数据目录 Glue Data Catalog,可以方便地操作 RDS 中的数据。Amazon RDS Connection 实际上也是 JDBC 的连接,会默认关联 RDS 所在的子网和安全组。对于 JDBC 的 Connection,在创建的时候可以指定所在的 VPC,子网和安全组信息,所选择的安全组除了自引用规则,还需要有允许访问 RDS 所在的安全组的出站流量。

特别提示

Glue 在运行时,在所配置的 VPC 的子网中,会为每一个 DPU 创建一个弹性网络接口 ENI,因此所在子网中的空闲 IP 必须大于所配置的 DPU。而且由于 Glue 运行中的各个组件需要通信,因此需要为其所在的安全组添加允许所有 TCP 协议的自引用规则。

对于 Network 的 Connection,在建立的时候,因为该 Connection 主要是在网络层面上,保证 Glue 能够访问到 RDS,因此不需要输入 RDS 的用户名和密码。连接的其他设置和 JDBC 类型的 Connection 类似。

在 Glue 的 Job 中,通过 Glue Studio 引入对应的 Glue Data Catalog 中的数据表时,会默认引入连接本身,以保证网络和连接的畅通。例如访问数据源 rds_test 中的数据表 rdsmydatabase_employees 会自动引入连接 Amazon RDS Connection。而在通过 Scala 脚本直接访问 RDS 的过程中,则需要手动加入相关的连接 Connection,我们在实验中的三种 Glue Connection 连接都支持通过脚本直接访问 RDS。在示例脚本中的用户名和密码可以借助 Amazon Secretes Manager 进行安全保存。

一般来说 Glue Connection 所配置的子网都为私有子网,而 Glue Job 在运行时,因为 Glue Job 的源代码,依赖的包等都保存在 S3 上,需要访问 S3 服务。因此,当该私有子网没有通过 NAT 网关访问互联网的路由时,则需要创建 S3 Gateway(而不是 Interface)类型的 VPC 终端节点,这样 Glue Connection 测试才能通过。同时还需要保证其所在的安全组有通向 S3 的出站规则,允许访问 S3 服务。

另外,一个 Glue Job 理论上一次只能访问一个 VPC,如果要访问多个 VPC,需要建立 VPC 对等链接(VPC peering)或者通过 S3 存储桶作为中间存储位置,将工作拆分成两个任务,将前一任务的输出保存到 S3 存储桶作为后一任务的输入。

参考连接

Setting up a VPC to connect to JDBC data stores for Amazon Glue

https://docs.aws.amazon.com/glue/latest/dg/setup-vpc-for-glue-access.html

Why does my Amazon Glue test connection fail?

https://aws.amazon.com/premiumsupport/knowledge-center/glue-test-connection-failed/

Programming Amazon Glue ETL scripts in Scala

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-scala.html

本篇作者

张亮

亚马逊云科技解决方案架构师,有近 17 年的 IT 从业经验,曾就职于 DXC,Misys 等公司。在多个行业的企业应用开发、架构设计及建设方面有丰富的实践经验。目前主要负责合作伙伴的架构咨询和方案设计,致力于亚马逊云科技云服务在国内的应用及推广。