1.前言
 
       从2020年10月开始,基于亚马逊云科技 Graviton2 的 Amazon EMR 实例逐步推出,现在已经有很多客户在不同的场景开始使用这种类型的实例。
 
       Graviton2 处理器由亚马逊云科技使用 64 位 ARM Neoverse 内核定制,对第一代 Graviton 处理器进行了多种性能优化。这包括 7 倍的性能、4 倍的计算核心数量、每个内核 2 倍的私有内存、5 倍的内存速度和每个核心 2 倍的浮点性能。此外,Graviton2 处理器还具有全天候运行的全加密 DDR4 内存功能,且每核加密性能提高 50%。这些性能提升使得用了Graviton2 的实例成为大数据工作负载的上佳之选。
 
       本文将向您展示在Amazon EMR大数据集群中的Graviton2 R6g实例对R5实例的性能对比测试。我们可以清楚地看到,在标准的TPC-DS基准测试环境(99个典型的SQL场景),R6g实例比之同等资源配置的R5实例性能获得提升,而且每小时单价有所下降,在这双重因素的叠加之下,在EMR大数据负载场景,选择新一代的R6g实例,具有更好的性价比。
 
       2.环境准备
 
       总体说明:
 
       1.测试环境(含操作客户端,R5实例,R6g实例):均位于新加坡区域。
 
       2.测试EMR组件为Spark 3.1.2(对应EMR版本为6.4.0);
 
       3.所有EMR集群均使用Amazon Glue Data Catalog作为MDS;
 
       4.提交测试任务方式:手工登录集群的Master节点使用Spark Shell提交;
 
       5.测试过程中,每种场景测试多次,确保CPU(集群),内存(集群),网络(每一个Core节点),磁盘(每一个Core节点)等资源层面使用率不高于70%;
 
       6.我们以100G的TPC-DS基准数据为测试数据,取对应查询结果做对比。
 
       2.1环境信息
 
       A.操作客户端(此客户端也是造数据的集群)
 
        
         
          
          | 实例类型 | vCPU | 内存(GiB) | 磁盘(GiB) | 角色 | 
 
          
          | r5.4xlarge | 16 | 128 | 1500 | 1个Master | 
 
         
       
 
       B.测试EMR集群:emr640-ic(基于Intel架构的R5机型)
 
        
         
          
          | 实例类型 | vCPU | 内存(GiB) | 磁盘(GiB) | 角色 | 
 
          
          | r5.4xlarge | 16 | 128 | 500 | 1个Master | 
 
          
          | r5.8xlarge | 32 | 256 | 1500 | 3个Core | 
 
         
       
 
       C.测试EMR集群:emr640-ac(基于Graviton2架构的R6g机型)
 
        
         
          
          | 实例类型 | vCPU | 内存(GiB) | 磁盘(GiB) | 角色 | 
 
          
          | r6g.4xlarge | 16 | 128 | 500 | 1个Master | 
 
          
          | r6g.8xlarge | 32 | 256 | 1500 | 3个Core | 
 
         
       
 
       注意:集群名称里面的ic表示intel cluster,ac表示Graviton2(arm) cluster的简称。
 
       可能很多读者会问,为什么测试的时候只选择这一个机型?因为我们经过多轮测试以后,发现8xlarge的机型可以满足各项资源使用率不高于70%的标准。
 
       2.2测试工具
 
       我们使用GitHub上的TPC-DS源代码编译测试用的代码。
 
       2.2.1 准备代码环境
 
       登录测试客户端实例,编译如下代码(先使用sudo su – 到root用户):
 
        
        yum install gcc make flex bison byacc git jq -y 
mkdir -p /opt/tpcds && cd /opt/tpcds
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make
 
         
       准备造数据的对应代码:
 
        
        cd /opt/tpcds
git clone https://github.com/databricks/spark-sql-perf.git
cd spark-sql-perf
wget https://github.com/sbt/sbt/releases/download/v0.13.18/sbt-0.13.18.tgz
tar zxvf sbt-0.13.18.tgz
cd build
mv sbt sbt.back
ln -s ../sbt/bin/sbt sbt
cd ..
 
         
       编辑插件文件:
 
        
        vi project/plugins.sbt
# 添加如下一行
# resolvers += "spark-packages" at "https://repos.spark-packages.org/"
 
         
       可选操作(免得后续每一个环境都需要编译代码,我们把代码上传到S3):
 
        
        cd /opt/tpcds/spark-sql-perf/
./build/sbt +package
cd /opt
tar zcvf tpcds.tgz tpcds
aws s3 cp tpcds.tgz s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz
 
         
       2.2.2 准备测试数据
 
       进入测试客户端机器
 
        
        spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
 
         
       用如下的方式造100g数据
 
        
        import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val rootDir = "s3://dse-710469168206-sin/tpcds/data100g"
val dsdgenDir = "/opt/tpcds/tpcds-kit/tools"
val scaleFactor = "100"
val format = "parquet"
val databaseName = "tpcds100g"
val sqlContext = spark.sqlContext
val tables = new TPCDSTables(sqlContext,
dsdgenDir = dsdgenDir, 
scaleFactor = scaleFactor,
useDoubleForDecimal = true, 
useStringForDate = true)
tables.genData(
location = rootDir,
format = format,
overwrite = true,
partitionTables = true, 
clusterByPartitionColumns = true, 
filterOutNullPartitionValues = false, 
tableFilter = "", 
numPartitions = 1024)
sql(s"create database $databaseName") 
tables.createExternalTables(rootDir, 
format, 
databaseName, 
overwrite = true, 
discoverPartitions = true)
tables.analyzeTables(databaseName, analyzeColumns = true)
 
         
       2.3验证数据
 
       2.3.1 控制台验证
 
       登陆S3控制台,打开对应存储桶,查看对应数据目录内容(我们造了多个数据库,有10g的,有1000g的,但是此处我们只测试100g的场景)。
 
       
 
       登录Glue控制台,找到对应数据库,查看对应数据库和表。
 
       
 
       2.3.2 代码验证
 
       使用如下代码执行查询并输出结果
 
        
        spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
import com.databricks.spark.sql.perf.tpcds.TPCDS
val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)
// name of database with TPCDS data.
val databaseName = "tpcds10g" 
sql(s"use $databaseName")
// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ic"
// how many iterations of queries to run.
val iterations = 1 
// queries to run.
val queries = tpcds.tpcds2_4Queries
// timeout per query, in seconds.
val timeout = 7200
 
//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)
// wait for finish
experiment.waitForFinish(timeout)
 
         
       验证运行99个查询后的结果输出(记得在这个master上执行aws configure配置ak/sk并设置profile为default)
 
        
        pyspark
# 然后执行
from pyspark.sql import Row
resultLocation = "s3a://dse-710469168206-sin/result/tpcds100g/ic"
result = spark.read.format("json").load(resultLocation + "/timestamp=1631844109600").select("results.name","results.executionTime")
def getRowList(row):
            lst = []
            for i in range(0, len(row.name)):
                        lst.append(Row(name=row.name[i],executionTime=row.executionTime[i]))
            return lst
resultDF = result.rdd.flatMap(lambda row: getRowList(row)).toDF().show(104)
 
         
       3.对比测试过程
 
       我们在部署Intel和Graviton2集群时均做了如下所示的Spark优化配置(在部署EMR集群时作为参数带入):
 
        
        [
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.executor.cores": "5",
            "spark.driver.cores": "16",
            "spark.executor.memory": "32g",
            "spark.driver.memory": "32g",
            "spark.executor.instances": "16",
            "spark.default.parallelism": "144",
            "spark.sql.shuffle.partitions": "1024",
            "spark.network.timeout": "900s",
            "spark.driver.maxResultSize": "0",
            "spark.executor.heartbeatInterval": "60s",
            "spark.dynamicAllocation.enabled": "true",
            "spark.yarn.scheduler.reporterThread.maxFailures": "2000",
            "spark.sql.debug.maxToStringFields": "2000",
            "spark.sql.autoBroadcastJoinThreshold": "-1"
        }
    },
    {
        "Classification": "yarn-site",
        "Properties": {
            "yarn.nodemanager.vmem-check-enabled": "false",
            "yarn.nodemanager.pmem-check-enabled": "false"
        }
    }
]
 
         
       并使用之前单节点测试集群的Glue Data Catalog作为MDS。
 
       3.1Intel集群测试
 
       我们测试的过程中发现,不管测试多少轮,其实结果都类似,所以我们取3次正常的数据作为结果。
 
       3.1.1准备测试代码
 
       启动集群后,登录Master和Core节点复制代码(sudo su -)
 
        
        cd /opt
yum install gcc make flex bison byacc git jq -y
aws s3 cp s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz tpcds.tgz
tar zxvf tpcds.tgz
 
         
       3.1.2执行查询测试
 
       登录集群Master,通过如下方式提交查询测试任务(此为100g数据源的代码,其他的请酌情修改)
 
        
        spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
import com.databricks.spark.sql.perf.tpcds.TPCDS
val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)
val databaseName = "tpcds100g" 
sql(s"use $databaseName")
// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ic"
// how many iterations of queries to run.
val iterations = 1
// queries to run.
val queries = tpcds.tpcds2_4Queries
// timeout per query, in seconds.
val timeout = 7200
//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)
// wait for finish
experiment.waitForFinish(timeout)
 
         
       注意查询结束后输出的类似如下的结果地址
 
       s3://dse-710469168206-sin/result/tpcds100g/ic/timestamp=1632029414066
 
       多执行几次,并打开Ganglia查看对应监控信息,以99个SQL无任何错误,使用率不超过70%为判断依据。
 
       3.1.3监控情况
 
       从监控数据显示,系统CPU和内存使用几乎接近如下所示(截图仅供参考),我们尽可能保证在没有任何异常的情况下,CPU和内存使用率均保持在50%左右,避免因某些因素导致测试结果失去公平性。
 
       CPU使用情况截图:
 
       
 
       内存使用情况截图:
 
       
 
       网络使用情况截图:
 
       
 
       十五分钟集群负载:
 
       
 
       十五分钟单机负载:
 
       
 
       只要碰到监控异常或者99个SQL执行结果异常的数据就剔除不要。
 
       3.2Graviton2集群测试
 
       3.2.1准备测试代码
 
       启动集群后,登录Master和Core节点复制代码(sudo su -)
 
        
        cd /opt
yum install gcc make flex bison byacc git jq -y
aws s3 cp s3://dse-710469168206-sin/tpcds/codes/tpcds.tgz tpcds.tgz
tar zxvf tpcds.tgz
 
         
       3.2.2执行查询测试
 
       登录集群Master,通过如下方式提交查询测试任务(此为100g数据源的代码,其他的请酌情修改)
 
        
        spark-shell --jars /opt/tpcds/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
import com.databricks.spark.sql.perf.tpcds.TPCDS
val sqlContext = spark.sqlContext
val tpcds = new TPCDS(sqlContext)
val databaseName = "tpcds100g" 
sql(s"use $databaseName")
// place to write results
val resultLocation = "s3://dse-710469168206-sin/result/tpcds100g/ac"
// how many iterations of queries to run.
val iterations = 1
// queries to run.
val queries = tpcds.tpcds2_4Queries
// timeout per query, in seconds.
val timeout = 7200
//run experiment
val experiment = tpcds.runExperiment(queries, iterations = iterations,resultLocation = resultLocation,forkThread = true)
// wait for finish
experiment.waitForFinish(timeout)
 
         
       注意查询结束后输出的类似如下的结果地址
 
       s3://dse-710469168206-sin/result/tpcds100g/ac/timestamp=1632029414066
 
       多执行几次,并打开Ganglia查看对应监控信息,以99个SQL无任何错误,使用率不超过70%为判断依据。
 
       3.2.3监控情况
 
       从监控数据显示,系统CPU和内存使用几乎接近如下所示(截图仅供参考),我们尽可能保证在没有任何异常的情况下,CPU和内存使用率均保持在50%左右,避免因某些因素导致测试结果失去公平性。
 
       CPU使用情况截图:
 
       
 
       内存使用情况截图:
 
       
 
       网络使用情况截图:
 
       
 
       十五分钟集群负载:
 
       
 
       十五分钟单机负载:
 
       
 
       只要碰到监控异常或者99个SQL执行结果异常的数据就剔除不要。
 
       3.3测试结果
 
       3.3.1取结果的方式
 
       登录单Master的集群的Master节点,查询对应结果
 
        
        pyspark
# 然后执行
from pyspark.sql import Row
resultLocation = "s3a://dse-710469168206-sin/result/tpcds100g/ac"
result = spark.read.format("json").load(resultLocation + "/timestamp=1632029414066").select("results.name","results.executionTime")
def getRowList(row):
            lst = []
            for i in range(0, len(row.name)):
                        lst.append(Row(name=row.name[i],executionTime=row.executionTime[i]))
            return lst
resultDF = result.rdd.flatMap(lambda row: getRowList(row)).toDF().show(104)
 
         
       3.3.2结果汇总
 
       Intel集群(emr640-ic)测试数据:
 
       
 
       Graviton2集群(emr640-ac)测试数据
 
       
 
       4.对比分析
 
       因为一共有99个SQL,所以没办法简单的通过单个SQL(不管是执行时间最长的还是最短的或者中位线)的执行时间来评判性能,所以我们先把多次(此处为3次)的执行结果取平均值,然后根据99个SQL的执行结果的评价值算出来百分比,根据平均延时(秒)和百分比的乘积算出总数,并各自基于总数折算单成本(此处的成本不是指钱,是指计算成本),最后加和得出一个总值作为测试对比结果。
 
       4.1性能对比
 
       按照上述计算方式我们算出的性能成本如下:
 
        
         
          
          | 平台 | 总时间成本 | 单位成本 | 比例 | 
 
          
          | Intel | 1102.26 | 30.5217 |  | 
 
          
          | Graviton2 | 1067.36 | 26.2709 | 16.18% | 
 
         
       
 
       从结果看,基于TPC-DS的100G数据和99个SQL查询,Graviton2(ARM)架构的EMR集群性能同比约优胜16%。
 
       4.2性价比对比
 
       根据官网的报价(https://aws.amazon.com/cn/emr/pricing/),我们选择新加坡区域,列表价如下:
 
        
         
          
          | 类型 | EC2列表价($) | EMR列表价($) | 合计 | 比例 | 
 
          
          | r5.4xlarge | 1.2160 | 0.2520 | 1.4680 |  | 
 
          
          | r6g.4xlarge | 0.9728 | 0.2016 | 1.1744 | 25.00% | 
 
          
          |  | 
 
          
          | r5.8xlarge | 2.4320 | 0.2700 | 2.7020 |  | 
 
          
          | r6g.8xlarge | 1.9456 | 0.4032 | 2.3488 | 15.03% | 
 
         
       
 
       此处我们不考虑预留实例,企业折扣等优惠方式,纯从列表价看,合并后(包括EC2实例和EMR)的总单价,Graviton2架构明显优于Intel架构。
 
       4.3结论
 
       细心的读者可能会留意到,我们并没有额外再针对Graviton2(ARM)架构重新编译代码,同样的代码在两套架构上完全通用。当然,因为我们全部用的是Spark SQL方式所以架构差异较少,如果是客户的实际业务环境还是可能会有差异的,建议读者朋友们以测试结果为准。
 
       从我们的测试结果看,从Intel架构迁移到Graviton2(ARM)架构,不仅可以获得性能的提升,还可以获得更好的性价比。
 
       本篇作者