1.前言
从2020年10月开始,基于亚马逊云科技 Graviton2 的 Amazon EMR 实例逐步推出,现在已经有很多客户在不同的场景开始使用这种类型的实例。
Graviton2 处理器由亚马逊云科技使用 64 位 ARM Neoverse 内核定制,对第一代 Graviton 处理器进行了多种性能优化。这包括 7 倍的性能、4 倍的计算核心数量、每个内核 2 倍的私有内存、5 倍的内存速度和每个核心 2 倍的浮点性能。此外,Graviton2 处理器还具有全天候运行的全加密 DDR4 内存功能,且每核加密性能提高 50%。这些性能提升使得用了Graviton2 的实例成为大数据工作负载的上佳之选。
本文将向您展示在Amazon EMR大数据集群中的Graviton2 R6g实例对R5实例的性能对比测试。我们可以清楚地看到,在标准的TPC-DS基准测试环境(99个典型的SQL场景),R6g实例比之同等资源配置的R5实例性能获得提升,而且每小时单价有所下降,在这双重因素的叠加之下,在EMR大数据负载场景,选择新一代的R6g实例,具有更好的性价比。
2.环境准备
总体说明:
1.测试环境(含操作客户端,R5实例,R6g实例):均位于新加坡区域。
2.测试EMR组件为Spark 3.1.2(对应EMR版本为6.4.0);
3.所有EMR集群均使用Amazon Glue Data Catalog作为MDS;
4.提交测试任务方式:手工登录集群的Master节点使用Spark Shell提交;
5.测试过程中,每种场景测试多次,确保CPU(集群),内存(集群),网络(每一个Core节点),磁盘(每一个Core节点)等资源层面使用率不高于70%;
6.我们以100G的TPC-DS基准数据为测试数据,取对应查询结果做对比。
2.1环境信息
A.操作客户端(此客户端也是造数据的集群)
实例类型 |
vCPU |
内存(GiB) |
磁盘(GiB) |
角色 |
r5.4xlarge |
16 |
128 |
1500 |
1个Master |
B.测试EMR集群:emr640-ic(基于Intel架构的R5机型)
实例类型 |
vCPU |
内存(GiB) |
磁盘(GiB) |
角色 |
r5.4xlarge |
16 |
128 |
500 |
1个Master |
r5.8xlarge |
32 |
256 |
1500 |
3个Core |
C.测试EMR集群:emr640-ac(基于Graviton2架构的R6g机型)
实例类型 |
vCPU |
内存(GiB) |
磁盘(GiB) |
角色 |
r6g.4xlarge |
16 |
128 |
500 |
1个Master |
r6g.8xlarge |
32 |
256 |
1500 |
3个Core |
注意:集群名称里面的ic表示intel cluster,ac表示Graviton2(arm) cluster的简称。
可能很多读者会问,为什么测试的时候只选择这一个机型?因为我们经过多轮测试以后,发现8xlarge的机型可以满足各项资源使用率不高于70%的标准。
2.2测试工具
我们使用GitHub上的TPC-DS源代码编译测试用的代码。
2.2.1 准备代码环境
登录测试客户端实例,编译如下代码(先使用sudo su – 到root用户):
yum install gcc make flex bison byacc git jq -y
mkdir -p /opt/tpcds && cd /opt/tpcds
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make
准备造数据的对应代码:
cd /opt/tpcds
git clone https://github.com/databricks/spark-sql-perf.git
cd spark-sql-perf
wget https://github.com/sbt/sbt/releases/download/v0.13.18/sbt-0.13.18.tgz
tar zxvf sbt-0.13.18.tgz
cd build
mv sbt sbt.back
ln -s ../sbt/bin/sbt sbt
cd ..
编辑插件文件:
vi project/plugins.sbt
# 添加如下一行
# resolvers += "spark-packages" at "https://repos.spark-packages.org/"
可选操作(免得后续每一个环境都需要编译代码,我们把代码上传到S3):
cd /opt/tpcds/spark-sql-perf/
./build/sbt +package
cd /opt
tar zcvf tpcds.tgz tpcds
aws s3 cp tpcds.tgz s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz
2.2.2 准备测试数据
进入测试客户端机器
spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
用如下的方式造100g数据
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val rootDir = "s3://dse-710469168206-sin/tpcds/data100g"
val dsdgenDir = "/opt/tpcds/tpcds-kit/tools"
val scaleFactor = "100"
val format = "parquet"
val databaseName = "tpcds100g"
val sqlContext = spark.sqlContext
val tables = new TPCDSTables(sqlContext,
dsdgenDir = dsdgenDir,
scaleFactor = scaleFactor,
useDoubleForDecimal = true,
useStringForDate = true)
tables.genData(
location = rootDir,
format = format,
overwrite = true,
partitionTables = true,
clusterByPartitionColumns = true,
filterOutNullPartitionValues = false,
tableFilter = "",
numPartitions = 1024)
sql(s"create database $databaseName")
tables.createExternalTables(rootDir,
format,
databaseName,
overwrite = true,
discoverPartitions = true)
tables.analyzeTables(databaseName, analyzeColumns = true)
2.3验证数据
2.3.1 控制台验证
登陆S3控制台,打开对应存储桶,查看对应数据目录内容(我们造了多个数据库,有10g的,有1000g的,但是此处我们只测试100g的场景)。
登录Glue控制台,找到对应数据库,查看对应数据库和表。
2.3.2 代码验证
使用如下代码执行查询并输出结果
spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
import com.databricks.spark.sql.perf.tpcds.TPCDS
val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)
// name of database with TPCDS data.
val databaseName = "tpcds10g"
sql(s"use $databaseName")
// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ic"
// how many iterations of queries to run.
val iterations = 1
// queries to run.
val queries = tpcds.tpcds2_4Queries
// timeout per query, in seconds.
val timeout = 7200
//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)
// wait for finish
experiment.waitForFinish(timeout)
验证运行99个查询后的结果输出(记得在这个master上执行aws configure配置ak/sk并设置profile为default)
pyspark
# 然后执行
from pyspark.sql import Row
resultLocation = "s3a://dse-710469168206-sin/result/tpcds100g/ic"
result = spark.read.format("json").load(resultLocation + "/timestamp=1631844109600").select("results.name","results.executionTime")
def getRowList(row):
lst = []
for i in range(0, len(row.name)):
lst.append(Row(name=row.name[i],executionTime=row.executionTime[i]))
return lst
resultDF = result.rdd.flatMap(lambda row: getRowList(row)).toDF().show(104)
3.对比测试过程
我们在部署Intel和Graviton2集群时均做了如下所示的Spark优化配置(在部署EMR集群时作为参数带入):
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.executor.cores": "5",
"spark.driver.cores": "16",
"spark.executor.memory": "32g",
"spark.driver.memory": "32g",
"spark.executor.instances": "16",
"spark.default.parallelism": "144",
"spark.sql.shuffle.partitions": "1024",
"spark.network.timeout": "900s",
"spark.driver.maxResultSize": "0",
"spark.executor.heartbeatInterval": "60s",
"spark.dynamicAllocation.enabled": "true",
"spark.yarn.scheduler.reporterThread.maxFailures": "2000",
"spark.sql.debug.maxToStringFields": "2000",
"spark.sql.autoBroadcastJoinThreshold": "-1"
}
},
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
}
}
]
并使用之前单节点测试集群的Glue Data Catalog作为MDS。
3.1Intel集群测试
我们测试的过程中发现,不管测试多少轮,其实结果都类似,所以我们取3次正常的数据作为结果。
3.1.1准备测试代码
启动集群后,登录Master和Core节点复制代码(sudo su -)
cd /opt
yum install gcc make flex bison byacc git jq -y
aws s3 cp s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz tpcds.tgz
tar zxvf tpcds.tgz
3.1.2执行查询测试
登录集群Master,通过如下方式提交查询测试任务(此为100g数据源的代码,其他的请酌情修改)
spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
import com.databricks.spark.sql.perf.tpcds.TPCDS
val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)
val databaseName = "tpcds100g"
sql(s"use $databaseName")
// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ic"
// how many iterations of queries to run.
val iterations = 1
// queries to run.
val queries = tpcds.tpcds2_4Queries
// timeout per query, in seconds.
val timeout = 7200
//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)
// wait for finish
experiment.waitForFinish(timeout)
注意查询结束后输出的类似如下的结果地址
s3://dse-710469168206-sin/result/tpcds100g/ic/timestamp=1632029414066
多执行几次,并打开Ganglia查看对应监控信息,以99个SQL无任何错误,使用率不超过70%为判断依据。
3.1.3监控情况
从监控数据显示,系统CPU和内存使用几乎接近如下所示(截图仅供参考),我们尽可能保证在没有任何异常的情况下,CPU和内存使用率均保持在50%左右,避免因某些因素导致测试结果失去公平性。
CPU使用情况截图:
内存使用情况截图:
网络使用情况截图:
十五分钟集群负载:
十五分钟单机负载:
只要碰到监控异常或者99个SQL执行结果异常的数据就剔除不要。
3.2Graviton2集群测试
3.2.1准备测试代码
启动集群后,登录Master和Core节点复制代码(sudo su -)
cd /opt
yum install gcc make flex bison byacc git jq -y
aws s3 cp s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz tpcds.tgz
tar zxvf tpcds.tgz
3.2.2执行查询测试
登录集群Master,通过如下方式提交查询测试任务(此为100g数据源的代码,其他的请酌情修改)
spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
import com.databricks.spark.sql.perf.tpcds.TPCDS
val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)
val databaseName = "tpcds100g"
sql(s"use $databaseName")
// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ac"
// how many iterations of queries to run.
val iterations = 1
// queries to run.
val queries = tpcds.tpcds2_4Queries
// timeout per query, in seconds.
val timeout = 7200
//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)
// wait for finish
experiment.waitForFinish(timeout)
注意查询结束后输出的类似如下的结果地址
s3://dse-710469168206-sin/result/tpcds100g/ac/timestamp=1632029414066
多执行几次,并打开Ganglia查看对应监控信息,以99个SQL无任何错误,使用率不超过70%为判断依据。
3.2.3监控情况
从监控数据显示,系统CPU和内存使用几乎接近如下所示(截图仅供参考),我们尽可能保证在没有任何异常的情况下,CPU和内存使用率均保持在50%左右,避免因某些因素导致测试结果失去公平性。
CPU使用情况截图:
内存使用情况截图:
网络使用情况截图:
十五分钟集群负载:
十五分钟单机负载:
只要碰到监控异常或者99个SQL执行结果异常的数据就剔除不要。
3.3测试结果
3.3.1取结果的方式
登录单Master的集群的Master节点,查询对应结果
pyspark
# 然后执行
from pyspark.sql import Row
resultLocation = "s3a://dse-710469168206-sin/result/tpcds100g/ac"
result = spark.read.format("json").load(resultLocation + "/timestamp=1632029414066").select("results.name","results.executionTime")
def getRowList(row):
lst = []
for i in range(0, len(row.name)):
lst.append(Row(name=row.name[i],executionTime=row.executionTime[i]))
return lst
resultDF = result.rdd.flatMap(lambda row: getRowList(row)).toDF().show(104)
3.3.2结果汇总
Intel集群(emr640-ic)测试数据:
Graviton2集群(emr640-ac)测试数据
4.对比分析
因为一共有99个SQL,所以没办法简单的通过单个SQL(不管是执行时间最长的还是最短的或者中位线)的执行时间来评判性能,所以我们先把多次(此处为3次)的执行结果取平均值,然后根据99个SQL的执行结果的评价值算出来百分比,根据平均延时(秒)和百分比的乘积算出总数,并各自基于总数折算单成本(此处的成本不是指钱,是指计算成本),最后加和得出一个总值作为测试对比结果。
4.1性能对比
按照上述计算方式我们算出的性能成本如下:
平台 |
总时间成本 |
单位成本 |
比例 |
Intel |
1102.26 |
30.5217 |
|
Graviton2 |
1067.36 |
26.2709 |
16.18% |
从结果看,基于TPC-DS的100G数据和99个SQL查询,Graviton2(ARM)架构的EMR集群性能同比约优胜16%。
4.2性价比对比
根据官网的报价(https://aws.amazon.com/cn/emr/pricing/),我们选择新加坡区域,列表价如下:
类型 |
EC2列表价($) |
EMR列表价($) |
合计 |
比例 |
r5.4xlarge |
1.2160 |
0.2520 |
1.4680 |
|
r6g.4xlarge |
0.9728 |
0.2016 |
1.1744 |
25.00% |
|
r5.8xlarge |
2.4320 |
0.2700 |
2.7020 |
|
r6g.8xlarge |
1.9456 |
0.4032 |
2.3488 |
15.03% |
此处我们不考虑预留实例,企业折扣等优惠方式,纯从列表价看,合并后(包括EC2实例和EMR)的总单价,Graviton2架构明显优于Intel架构。
4.3结论
细心的读者可能会留意到,我们并没有额外再针对Graviton2(ARM)架构重新编译代码,同样的代码在两套架构上完全通用。当然,因为我们全部用的是Spark SQL方式所以架构差异较少,如果是客户的实际业务环境还是可能会有差异的,建议读者朋友们以测试结果为准。
从我们的测试结果看,从Intel架构迁移到Graviton2(ARM)架构,不仅可以获得性能的提升,还可以获得更好的性价比。
本篇作者