Tag: Amazon EMR


使用Apache Kylin和Amazon EMR进行云上大数据OLAP分析

作者:史少锋 Kyligence资深架构师,Apache Kylin committer & PMC

 

公司简介

上海跬智信息技术有限公司 (Kyligence) 是由Apache Kylin (首个来自中国的 Apache 软件基金会顶级开源项目) 核心团队组建,专注于大数据分析领域创新的数据科技公司。Apache Kylin是近两年迅速崛起的开源分布式大数据分析引擎,它充分利用Hadoop MapReduce,HBase,Spark等成熟技术,对超大数据集进行预计算(构建Cube),从而当在线查询请求到来时,通过检索Cube以亚秒级低延迟返回结果,实现真正的大数据上的交互式分析。对于用户来说,Kylin屏蔽了底层平台的技术细节,用户只需要掌握多维模型、数据仓库、SQL等知识,就可以通过Kylin的Web界面进行建模,Kylin将自动生成任务对数据进行计算。计算完成后用户即可通过各类可视化工具连入Kylin进行分析,易用性非常高。今天,Apache Kylin已经在众多企业得到广泛应用,如今日头条等。

解决方案

Kyligence为各行各业的客户提供基于AWS公有云平台的Hadoop数据仓库解决方案。Elastic MapReduce (Amazon EMR) 是AWS推出的云上Hadoop方案,这一方案使得Hadoop的部署、监控、扩容变的非常灵活方便。Amazon EMR将计算和存储分离,可以使用S3做数据存储,用户可随需启停Hadoop而不用担心数据丢失,用户只需为运行时使用的资源付费,从而大大减少运维成本。以最近发布的Apache Kylin v2.0和Amazon EMR 5.5(海外版)为例,在AWS云上使用Kylin是非常简单快速的。

1. 启动EMR集群

如果您已经有在运行的,包含了HBase 1.x服务的EMR集群,那么这一步可以跳过,您可以使用现有集群进行此实验。

EMR的启动非常简单,登录AWS控制台,选择Amazon EMR服务,点击“Create Cluster”,选择最新的5.5版本,类型为HBase:

这里您可以选择合适的硬件配置;默认是m3.xlarge 3个节点,其中1个节点为master,另外两个为core节点。选择合适的EC2 key pair,随后点击“Create cluster”,AWS便会开始自动安装和配置Hadoop/HBase集群。

大约20分钟后,集群状态显示为“Waiting Cluster ready”,这意味着集群准备就绪可以使用了。

 

2. 安装Apache Kylin 2.0

Apache Kylin以Hadoop client的方式运行,使用标准协议/API与集群交互。您可以将它安装在集群的任意节点上,通常建议安装在一个单独的client节点上。在这里我们为了简单,就把Kylin安装在master节点上。

在AWS控制台上,您可以获取SSH到Amazon EMR的方法;点击“Master public DNS”旁边的SSH链接,即可获得,如下图所示:

SSH登录到master节点后,创建一个Kylin安装目录,下载并解压Apache Kylin 2.0的二进制包:

sudo mkdir /usr/local/kylin

sudo chown hadoop /usr/local/kylin

cd /usr/local/kylin

wget http://www-us.apache.org/dist/kylin/apache-kylin-2.0.0/apache-kylin-2.0.0-bin-hbase1x.tar.gz 

tar –zxvf apache-kylin-2.0.0-bin-hbase1x.tar.gz

如果下载速度较慢,可以至Kylin官网寻找并使用更接近的下载镜像。

由于一个已知的问题KYLIN-2587,您需要手动在Kylin里设置一个参数:用编辑器打开/etc/hbase/conf/hbase-site.xml,在其中寻找到“hbase.zookeeper.quorum”这个参数,然后将它以及它的值,拷贝到Kylin目录下的conf/kylin_job_conf.xml文件中。如下所示:

<property>
  <name>hbase.zookeeper.quorum</name>
  <value>ip-nn-nn-nn-nn.ap-northeast-2.compute.internal</value>
</property>

(注意请使用在您环境中真实获得的zookeeper地址)

3. 创建sample cube并启动Kylin

Kylin自带了一个小的数据集以及定义好的cube,只需要运行bin/sample.sh就可以将数据导入到Hive和HBase中:

export KYLIN_HOME=/usr/local/kylin/apache-kylin-2.0.0-bin

$KYLIN_HOME/bin/sample.sh

随后,就可以启动Kylin了:

$KYLIN_HOME/bin/kylin.sh start

大约若干秒以后,Kylin服务就会完成启动,在7070端口等待用户请求。

4. 修改安全组以允许访问Kylin

Amazon EMR默认创建了两个安全组,分别给Amazon EMR master和Amazon EMR core,要想从外网访问Kylin,需要设置相应规则;这里我们是将Kylin部署到了master节点,所以需要编辑master的安全组:

添加规则,允许7070端口从外面地址访问,为安全起见,建议只开放给最小的IP群,例如仅自己的地址:

接下来,在浏览器中输入http://<master-public-address>:7070/kylin,就会显示Kylin的登录页,使用默认账号ADMIN加密码KYLIN完成登录:

5. 构建Sample Cube

登录Kylin后,选择“learn_kylin”的项目,就会看到“kylin_sales”的样例Cube。此Cube模拟一个电商对其销售记录从多维度进行分析的场景,维度包括了时间(天,周,年)、区域、卖家、买家、商品分类等。

此时Cube只有定义,还没有加载数据,状态是“disabled”,需要触发一次Build。点击“Actions”,“Build”,然后选择一个时间范围,Kylin会以此条件从Hive加载数据进行一系列计算(样例数据已经导入到Hive):

所有的MapReduce, Spark, HBase操作,Kylin会自动生成并依次执行。大约七八分钟后,任务进度到100%构建完成,接下来此Cube就可以使用了:

6. 查询Cube

Cube构建完成后状态变更为“Ready”,可以使用SQL对其进行查询。虽然Kylin将原始数据构建成了多维Cube,但是对外的查询接口依旧是标准SQL,且表名、字段名依然是用原始的名称。这意味着用户一方面可以不用学习新的工具和语法,另一方面以前在Hive中执行的查询语句,基本上可以直接在Kylin中执行。

在Kylin主页点击“Insight”,切换到查询视图。此时页面的左导航处显示可以通过Cube进行查询的表和列。这里您可以尝试手写一条SQL,如下面的这条语句,按年计算交易总金额和记录数:

select YEAR_BEG_DT, sum(price), count(*) from kylin_sales inner join kylin_cal_dt on part_dt = cal_dt group by YEAR_BEG_DT;

点击“Submit”,Kylin随即执行并显示结果,如下图所示:

至此您已经完成了在AWS上运行Hadoop + Kylin的任务!从上图可以看出,Kylin的执行只耗费了0.61秒,接下来您使用可视化工具如Tableau, Excel, Saiku, Zeppelin, SmartBI等通过Kylin的ODBC/JDBC驱动连接Kylin server,体验交互式OLAP分析。

使用完以后,记得关闭Amazon EMR集群以节省费用。

成功案例

Strikingly基于AWS公有云和Kylin搭建了大数据解决方案。Strikingly (https://strikingly.com/)是一个简单、易用、美观的Web建站平台,产品中有一个面向用户的 Web Analytics Dashboard,它从各个维度以及不同时间尺度上展现了用户网站包括 Unique View, Geo Distribution 等数据,如下图所示。

用户数据查询的类型主要集中于对于各个维度数据的 group by 和 count distinct,都是比较耗时的查询操作。随着数据规模不断增长,以往的解决办法面临性能瓶颈。研究之后,Strikingly采用了Kylin + Amazon EMR的方案:每隔5分钟将原始数据备份到指定的S3 bucket上。通过预先定义好的Lambda函数对原始数据进行处理,转换成Kylin + Amazon EMR可以处理的数据格式,然后交由Kylin + Amazon EMR计算。通过引入Kylin,成功将数据查询的延迟从5~10秒降低到了1秒以内。当数据量增加的时候,可以通过Amazon EMR控制台动态添加core machine横向扩展服务,提高吞吐能力,这保证了可以对未来一段时间内可以预见的数据规模增加提供足够的技术支撑。

总结

借助于独有的分布式预计算技术,Apache Kylin比其它各类OLAP引擎能提供更高的查询性能和并发能力,并且随着数据量增加,查询延迟依旧可以保持在亚秒级(参考Kylin的SSB测试:https://github.com/Kyligence/ssb-kylin)。近些年云计算技术日趋成熟,越来越多的企业正在将大数据分析迁移到云上,AWS无疑是很热门的选择,而在选择大数据OLAP方案时,低延迟、高并发、可扩展是重要的考虑因素,Kylin结合Amazon EMR可以使云上的大数据分析变得简单而强大。
想要了解Apache Kylin的更多信息,您可以参考Apache Kylin官网(https://kylin.apache.org);如需要了解具体解决方案、商业版及专业服务,请联系Kyligence。

 

作者介绍

史少锋,Kyligence 技术合伙人&资深架构师。高级软件架构师,Apache Kylin核心开发者和项目管理委员会成员(PMC),专注于大数据分析和云计算技术。曾任eBay全球分析基础架构部大数据高级工程师,IBM云计算部门软件架构师;曾是IBM公有云Bluemix dev&ops团队核心成员,负责平台的规划、开发和运营。

巧用Amazon EMR节省数据分析成本

Amazon EMR是云上的数据分析平台,通过Amazon EMR的图形化或命令行接口,用户可以快速搭建和部署基于Amazon EC2实例的数据分析系统,并能动态扩展集群。Amazon EMR也可以读写其他AWS数据存储服务,例如Amazon S3和Amazon DynamoDB。

最新版本的Amazon EMR涵盖的服务包括:Hadoop、Zeppelin、Tez、Ganglia、HBase、Pig、Hive、Presto、ZooKeeper、Sqoop、Mahout、Hue、Phoenix、Oozie、Spark、Flink、Hcatalog。利用以上Amazon EMR包含的众多服务,用户能够实现日志文件分析、流式数据分析、机器学习、工作流管理等任务。本文就用户最常用的日志文件分析任务,巧用Amazon EMR以节省数据分析成本。

三个关键点

第一,使用Amazon S3存储待分析数据。使用Amazon S3存储数据主要有以下几方面优势。

  • 节约成本:相比使用HDFS集群,Amazon S3是单纯的存储服务,用户在存储数据文件的时候,只需要为使用的存储容量付费,无需为服务器及硬盘付费,而用机器搭建HDFS集群,这部分投入是必须的。
  • 数据持久性高:旨在提供99.999999999%的数据持久性,最大化地降低了数据丢失的可能。
  • 计算和存储分离:使用Amazon S3存储数据,实际上是实现了计算和存储的分离,这一点很关键,它使得Amazon EMR集群能够随时扩容、缩容、删除,降低了数据丢失的可能。
  • 无需修改程序代码:与HDFS存储相比,用户并不需要修改程序的代码。以Hive建表语句为例,只需要将Location的位置改为Amazon S3的目录即可,即LOCATION‘s3://sampledata/userrecord/’

第二,定时运行Amazon EMR集群。

日志文件分析这种批量数据处理的任务,并不是每时每刻都需要运行任务,可以每天定时运行Amazon EMR集群进行分析,分析完成后再将集群删除。与用户自己搭建数据分析集群相比,Amazon EMR让集群的创建非常容易,只需要一条命令即可搭建所需集群,这也让随时删除、随时创建集群具备可行性,如果没有Amazon EMR,相信用户不会将自己辛苦搭建起来的集群随便地删除。

定时运行集群与云计算的按需计费模式相结合,带来的最大优势就是节省成本。如果某个任务只需要1个小时就能得出分析结果,那么,与全天候运行的集群相比,定时运行的集群所节约的成本是非常可观的。

第三,利用外部存储。

  • 元数据:本文所提供的方案中使用的是Hive表结构数据,将其存储在外部的数据库中是因为Amazon EMR集群是定时运行的,因此,其元数据不要存储在本地,否则Amazon EMR集群关闭后,元数据也将被删除。
  • 计算结果:Amazon EMR定时运行产生的结果需要存储在外部,如Amazon S3中,或者存储在数据库中。

系统的架构

图1为一个典型架构,这里主要使用了Amazon EMR中的Hive、Presto及Sqoop服务,并利用Amazon RDS MySQL存储Hive元数据和查询结果数据。

在此架构中,Hive用来创建表,并维护表结构元数据,这些元数据被存储在Amazon RDS MySQL中。Presto用来执行查询,Presto利用Hive已经定义的表结构。Sqoop用来将Presto产生的结果数据转存到Amazon RDS的MySQL中。

以上这些步骤需要按照顺序在Amazon EMR每次启动的时候执行,实现这个顺序执行的功能,需要用到Amazon EMR的Step。在每一个Step中,用户可以自定义需要运行的任务,例如以上提到的Hive任务、Presto查询、数据转存等都可以放在Step中运行。

用户可以创建Amazon EMR集群,以及Amazon EMR集群创建成功后需要执行的Step,在一条AWS命令中事先写好,例如下面是一条简化过的命令,它创建了一个名为Loganaly的Amazon EMR集群,包含了Hive、Presto和Sqoop服务。Auto-terminate参数说明这个集群中所有的Step执行完成后,集群自动被删除。Configuration参数的内容指定了存储Hive元数据的数据库信息,例如IP、用户名等。Steps参数定义了所需要执行的任务。而在Instance-groups中定义了集群中机器的数量和配置。

aws--region cn-north-1 emr create-cluster --name "loganaly" --release-label emr-5.0.0 \

--applications Name=Hive Name=Presto Name=Sqoop \

--auto-terminate \

--configurations s://bucketname/cfgfile \

--steps \

Type=Hive,Name="HiveStep",Args=[……] \

Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=[……] \

Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=[……] \

--instance-groups \

Name=Master,InstanceGroupType=MASTER,InstanceType=m3.xlarge,InstanceCount=1 \

Name=Core,InstanceGroupType=CORE,InstanceType=r3.xlarge,InstanceCount=2 \

Name=Task,InstanceGroupType=TASK,InstanceType=r3.xlarge,InstanceCount=3

最后,用户需要让这个命令定时执行,那么,可以把这个命令放在/bin/bash脚本中,然后利用Linux crontab实现定期执行。或者利用Windows的定时任务实现定期执行。如果是在国外AWS区域,还可以利用竞价实例进一步节省成本。方法也很简单,仅需在Instance-group的参数中增加竞价的美元数即可。

--instance-groups \

Name=Master,InstanceGroupType=MASTER,InstanceType=m3.xlarge,BidPrice=0.2, InstanceCount=1 \

Name=Core,InstanceGroupType=CORE,InstanceType=r3.xlarge, BidPrice=0.2,InstanceCount=2 \

Name=Task,InstanceGroupType=TASK,InstanceType=r3.xlarge, BidPrice=0.2,InstanceCount=3

使用国外AWS区域的某个客户利用该方法分析每天产生的CDN日志,采用竞价实例的情况下,每天用于数据分析的投入不足1美元。

本文介绍的内容使用了Hive和Presto,实际上Amazon EMR中的Spark、Tez等服务都可以实现类似的功能,同样可以利用Amazon EMR的灵活创建和删除,以及使用Amazon S3作为存储的特性。

表1中列出了这些主要用到的数据分析型服务现在的特征,供读者参考选用。需要注意的是,随着社区产品的升级,这些特征并不是一成不变的,需要用户关注这些产品的变化,做出正确的选择。

针对本文中描述的方案,以下blog中有更详细的操作方法和代码,感兴趣的用户可参考:

https://aws.amazon.com/cn/blogs/china/amazon-emr/

作者介绍

韩小勇,AWS解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,实施和推广,在加入AWS之前,从事电信核心网系统上云的方案设计及标准化推广。

手把手教你使用Amazon EMR进行交互式数据查询

本文将带您一步步完成一个利用Amazon EMR进行交互式数据查询的实例,过程包括数据的注入、数据的分析、结果的转存、以及将整个过程自动化的方法。其中涉及的EMR组件主要包括: Hive, Hadoop, presto, sqoop。除EMR外,涉及到的其他服务包括:S3, RDS. 本文所使用的数据源是cloudfront产生的日志。

在按照本文档进行操作之前,读者需了解S3,RDS并能够进行基本的S3,RDS的操作,读者需了解EMR的基本概念。以下是参考资料:

什么是EMR:

Amazon Elastic MapReduce (Amazon EMR) 是一种托管数据分析服务的框架,提升企业、研究人员、数据分析师和开发人员轻松、经济高效掌控海量数据的能力。其当前版本中托管的服务包括:Hadoop, Zeppelin, Tez, Ganglia, HBase, Pig, Hive, Presto, ZooKeeper, Sqoop, Mahout, Hue, Phoenix, Oozie, Spark, Hcatalog. EMR让您专注于数据分析,无需担心费时的集群设置、管理或调整,也无需担心所需要的计算能力。

具体参考: https://aws.amazon.com/cn/documentation/elastic-mapreduce/

什么是S3:

Amazon Simple Storage Service (Amazon S3) 为开发人员和 IT 团队提供安全、耐用且高度可扩展的对象存储。S3 可为EMR提供文件存储服务。

具体参考:https://aws.amazon.com/cn/documentation/s3/

什么是RDS:

Amazon Relational Database Service (Amazon RDS) 是一种可让用户在云中轻松设置、操作和扩展关系数据库的 Web 服务。 它在承担耗时的数据库管理任务的同时,又可提供经济高效的可调容量,使您能够腾出时间专注于应用程序开发。Amazon RDS 让您能够访问非常熟悉的 MySQL、PostgreSQL、Oracle 或 Microsoft SQL Server 等数据库引擎的功能。

具体参考:https://aws.amazon.com/cn/documentation/rds/

准备工作

1.   Cloudfront生成的日志已经存储在s3桶中,并在不断更新, 存储目录是:s3://testcloudfrontlog/log

其中testcloudfrontlog是s3存储桶的名字,在实际操作的时候,需要换一个名字,因为s3存储桶的名字是全局唯一的, 而其他人也有可能使用了这个名字。

可以从以下链接下载本例中使用的示例文件,并上传到s3://testcloudfrontlog/log目录下。

https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz

https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz

2.   创建一个目录来存储按照日期划分的日志数据。 目录是s3://testcloudfrontlog/logbydate

3.   创建一个目录用来做hive表的数据存储,目录是s3://testcloudfrontlog/logpart

4.   注意:直接copy本文的代码有可能会由于字符原因出现错误,建议先copy到纯文本编辑器中再执行。

手动方式完成交互式数据查询

第一步,创建一个EMR集群,方法如下:

1.   进入到AWS的控制台,选择EMR服务,点击创建集群。

2.   点击转到高级选项

3.   软件配置

选择EMR发行版本以及所需要的软件, 在本例中,我们选择emr-5.0.0版本,所需要的工具选择hadoop, hive, presto, sqoop。 本步骤中的其余选项使用默认值,然后点击下一步。如果是使用北京Region,在输入配置中输入以下内容,使得presto可访问s3, 如果使用其他Region,不必输入。

[{"classification":"presto-connector-hive", "properties":{"hive.s3.pin-client-to-current-region":"true"}, "configurations":[]}]

4.   硬件配置

进入到硬件配置界面,默认配置如下,直接使用默认配置。然后点击下一步。

5.   一般选项

在一般选项的集群名称后面输入一个名字,作为集群的名字。其余的可按照默认配置。然后点击下一步。

6.   安全选项

在EC2键对后面的框中选择一个已有的键对,该键对用来在集群创建成功后,从SSH客户端登录到集群中的任意一台服务器。如果选择“在没有EC2键对的情况下继续”,则后续不能登录到集群中的机器。其余选项均可默认。然后点击创建集群。7.   修改安全组规则,并登录EMR的主节点

进入到刚刚创建的集群的信息界面,点击主节点安全组,进入到该安全组的配置界面,在入规则中增加SSH的访问规则,这样才可以通过SSH的方式从外部机器登录到主节点。然后通过任意一个SSH客户端登录到主节点,目标地址是图中所示的主节点共有DNS, 用户名是hadoop, 通过私钥登录,私钥与前面所提到的键对对应。

第二步,创建数据表并进行查询

1.   SSH到主节点后,执行hive命令,进入到hive命令行界面

2.   创建一个用日期作为分区的hive表,用来作为最终被查询的表

将以下脚本copy到hive>提示符下执行,注意LOCATION的参数需要改成你自己的目录。

CREATE TABLE IF NOT EXISTS cloudfrontlogpart (

time STRING, xedgelocation STRING, scbytes  INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING

)

PARTITIONED BY (datee Date)

STORED AS PARQUET

LOCATION 's3://testcloudfrontlog/logpart';

3.   输入quit命令,退出hive。用aws s3命令将s3://testcloudfrontlog/log中日志copy到s3://testcloudfrontlog/logbydate中按照时间划分的目录下

以2016-07-11这一天的文件为例,命令如下,注意,s3目录需要改成你自己的。

aws s3 cp s3://testcloudfrontlog/log/  s3://testcloudfrontlog/logbydate/2016-07-11/  --exclude "*" --include "*.2016-07-11*" --recursive

这里用到了aws s3命令行工具。你可以在EMR主节点中退出hive命令行程序,然后执行以上命令。或者在任意一个安装了aws cli工具并配置了s3访问权限的机器中执行。本例中直接在EMR的主节点中执行。aws s3 cp不支持通配符,所以用–exclude 和 –include 参数来代替。

4.   针对s3://testcloudfrontlog/logbydate/2016-07-11/ 中的数据,创建一个HIVE表。

假设表的名字是cloudfrontlog20160711, 输入hive, 重新进入到hive命令行工具,并输入以下语句,注意LOCATION的参数需要改成你自己的目录。

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog20160711 (

date1 Date, time STRING, xedgelocation STRING, scbytes  INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING

)

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

LOCATION 's3://testcloudfrontlog/logbydate/2016-07-11/'

tblproperties("skip.header.line.count"="2");

5.   数据注入

至此,已经创建了两个hive表,通过在hive命令行工具中执行 show tables命令,可以查看到两个hive表。

向带分区的hive表中注入7月11日的数据,在hive命令行界面中分别执行以下命令:

set hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)

SELECT  time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1

FROM cloudfrontlog20160711;

以下是INSERT语句执行过程的显示信息。

完成后,如果进入到s3://testcloudfrontlog/logpart目录下,可以查看到已经生成了按日期划分的目录,目录下存储了文件。

第三步,数据查询

可以继续使用hive做查询,也可以进入presto做查询。这里,我们使用presto, 由于presto完全使用内存进行计算, 速度更快。进入presto的方式如下:

执行quit, 退出hive命令行程序。

然后执行以下语句进入presto命令行界面:

presto-cli --catalog hive --schema default

该语句表示presto使用hive数据源,并且使用hive数据源中的default数据库。细节请参考presto的社区文档。

执行一个简单的查询:

SELECT time, scbytes, cip FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('2016-07-11' AS varchar) LIMIT 5;

然后退出presto命令行工具,输入 quit;

第四步,删除EMR集群

在集群列表中,选中刚刚创建的集群,点击终止,以终止该集群。如果开启的终止保护,需要变更一下终止保护的状态,然后再终止。

至此, 我们通过手动的方式完成了一个简单的数据查询。但在实际生产环境中,使用手动的方式会耗费很长时间做重复性的工作,并难免出错。EMR更大优势是能够通过程序的方式去控制集群的创建以及任务的执行,这使得EMR的使用者能够将集群创建以及数据分析的过程自动化。

接下来的部分将以相同的示例指导读者一步步的实现自动化的创建集群、分析数据、转存结果、关闭集群。

自动方式完成交互式数据查询

首先概括几点自动化执行数据分析的需求:

1.   集群在每天的固定时间被创建,然后对数据进行分析,然后集群被自动删除。这显然不能通过图形界面进行一步步的操作了。

2.   在手动操作中执行的每个步骤,需要按顺序自动化执行。这些步骤包括:

–       从/log目录向/logbydate目录中copy特定日期的文件。

–       创建对应特定日期的hive表,例如表名为cloudfrontlog20160711。

–       从cloudfrontlog20160711向cloudfrontlogpart表中注入数据。

–       使用presto从cloudfrontlogpart表中查询出需要的数据。

–       此外,在手动执行的最后一步,我们可以直接看到查询结果,但在自动化执行的过程中,我们需要将查询结果存储到一个长期运行的数据库中,供随时查询。

3.   将hive元数据放在集群的外部。

在手动执行的流程中,创建hive表后,hive的元数据存储在了主节点。而在自动化执行的过程中,当所有任务执行完毕后,集群被删除,存储在主节点的元数据也会被删除,因此要在外部数据库中存储hive的元数据。

针对以上的几个需求,在EMR中对应的解决方法如下.

1.   使用EMR的命令行进行各种集群的操作,例如集群的创建,参数的设置等。

2.   使用EMR的“step”来组织各个任务的执行。

3.   创建一个外部的数据库用来存储hive元数据,并在集群创建的时候指定元数据的存储位置。

接下来详细描述操作步骤和脚本

第一步,准备工作

1.   准备两个mysql数据库分别用来存储hive元数据和查询结果,可以使用AWS的RDS服务来创建。这两个数据库都需要能被EMR访问到, 这两个数据库也可以使用同一个物理服务器或虚拟机。

2.   假设存储查询结果的数据库名字是loganalydb,我们在该数据库中创建一个表,用来存储结果数据,表的名字是loganalytb。根据后面所进行的查询,使用如下语句创建数据库以及与查询结果匹配的表:

CREATE DATABASE loganalydb;

USE loganalydb;

CREATE TABLE `loganalytb` ( `id` int(11) NOT NULL AUTO_INCREMENT, `filepath` varchar(300) DEFAULT NULL, `totalbyte` bigint(20) DEFAULT NULL, `tdate` varchar(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58147 DEFAULT CHARSET=utf8;

对于存储hive元数据的数据库,暂时不必创建,从后面提到的emrconf.json文件可以看到,只需要给出数据库服务器的地址,以及用户名和密码即可,数据库不存在的话,会自动创建。

3.   由于自动化的流程与前面提到的手动流程使用相同的示例,因此,请事先删除手动流程示例中产生的数据。包括:

1)   s3://testcloudfrontlog/logbydate目录下的数据

2)   s3://testcloudfrontlog/logpart目录下的数据

第二步,编写脚本

下面给出各个脚本,并配以注释解释

1.   主程序脚本loganaly.sh

#!/bin/bash

# 变量KEYNAMES是你的SSH key的名字, 用来通过SSH方式访问EC2。

KEYNAME=yourkeyname

# 变量CONFIGFILE是emrconf.json的url地址,emrconf.json这个文件中包含了存储hive元数据的外部数据库的信息。将该文件存在s3中并让这个文件能够从外部访问到。emrconf.json中的内容在下文中会给出。

CONFIGFILE=https://s3-us-west-2.amazonaws.com/testcloudfrontlog/conf/emrconf.json

# EMR集群产生的日志所存放的s3目录。

LOGURI=s3://testcloudfrontlog/emrlog/

# 脚本、配置文件、jar包所在的目录

CODEDIR=s3://testcloudfrontlog/conf

# cloudfront日志的存放位置, 结尾别加 /

LOGSOURCEDIR=s3://testcloudfrontlog/log

# 当天日志的中转目录, 结尾别加 /

STAGINGDIR=s3://testcloudfrontlog/logbydate

# 昨天的日期, 形如:20160711

DATE=$(date -d "yesterday" +%Y%m%d)

# 昨天的日期,另外一种格式, 形如:2016-07-11

DATEE=$(date -d "yesterday" +%Y-%m-%d)

# 被查询的hive表的文件存储位置, 结尾别加 /

PARTDIR=s3://testcloudfrontlog/logpart

# 用来存储结果文件的目录, 结果文件将被sqoop使用,数据会被传到mysql.

SQOOPFILE=s3://testcloudfrontlog/sqoopfile

#用来存储结果数据的mysql的信息。

DBHOST=rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com

JDBCURL=jdbc:mysql://rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com/loganalydb

DBUSER=username

DBPASS=password

# EMR集群的配置信息, 这里是以Oregan region为例。如果使用的是北京region,AWSREGION用cn-north-1。

AWSREGION=us-west-2

MASTERTYPE=m3.xlarge

CORETYPE=r3.xlarge

TASKTYPE=r3.xlarge

MASTERNUM=1

CORENUM=1

TASKNUM=1

# 删除当天的数据,目的是防止脚本在同一天被多次执行而造成数据冗余,

aws s3 rm $PARTDIR/datee=$DATEE --recursive

aws s3 rm $SQOOPFILE/$DATE --recursive

mysql -h$DBHOST -u$DBUSER -p$DBPASS --execute "DELETE FROM loganalydb.loganalytb WHERE tdate='$DATEE'"

 

# 创建emr集群,名字是loganaly, 其中:

# --auto-terminate参数表示该集群在执行完所有的任务后自动删除。

# --configurations参数的文件中的参数配置覆盖了该集群运行起来后的默认参数配置。在本例中用来修改Hive元数据的存储位置。

# --step参数规定了该集群在创建后要执行的几个任务,其中Type=Hive的step, 需要给出包含hive语句的文件作为参数。而Type=CUSTOM_JAR的step,需要给出一个JAR包,这里我们使用EMR提供scrip-runner.jar,它的作用是执行其第一个参数中指定的脚本文件,并将其余的参数作为脚本文件的输入参数。

# --instance groups参数规定了集群的中各节点的机型和数量

aws --region $AWSREGION emr create-cluster --name "loganaly" --release-label emr-5.0.0 \

--applications Name=Hadoop Name=Hive Name=Presto Name=Sqoop \

--use-default-roles \

--ec2-attributes KeyName=$KEYNAME \

--termination-protected \

--auto-terminate \

--configurations $CONFIGFILE \

--enable-debugging \

--log-uri $LOGURI \

--steps \

Type=CUSTOM_JAR,Name="cpjar",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/cpjar.sh"," $CODEDIR"] \

Type=CUSTOM_JAR,Name="log2staging",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/log2logbydate.sh","$LOGSOURCEDIR","$STAGINGDIR","$DATE","$DATEE"] \

Type=Hive,Name="HiveStep",Args=[-f,$CODEDIR/hivetables.q,-d,PARTDIRh=$PARTDIR,-d,STAGINGDIRh=$STAGINGDIR,-d,DATEh=$DATE] \

Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/presto2s3.sh","$SQOOPFILE","$DATE","$DATEE"] \

Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/s3tomysql.sh","$JDBCURL","$DBUSER","$DBPASS","$SQOOPFILE","$DATE"] \

--instance-groups \

Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,InstanceCount=$MASTERNUM \

Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,InstanceCount=$CORENUM \

Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,InstanceCount=$TASKNUM

2.   准备cpjar.sh脚本

作用是将mysql的JDBC驱动包下载到本地的Sqoop目录下,sqoop在将文件转存到数据库的时候会用到JDBC驱动。

在国外region, 使用以下脚本:

#!/bin/bash

CODEDIR=$1

sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/

在北京Region, 使用以下脚本:

#!/bin/bash

CODEDIR=$1

sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/ --region cn-north-1

3.   准备log2logbydate.sh脚本

做用是将原始日志文件copy到按天划分的目录下。

#!/bin/bash

LOGSOURCEDIR=$1

STAGINGDIR=$2

DATE=$3

DATEE=$4

aws s3 cp $LOGSOURCEDIR/ $STAGINGDIR/$DATE/ --exclude "*" --include "*.$DATEE*" --recursive

4.   准备hivetables.q脚本

作用包括:创建待查询的表和按天命名的临时表,将临时表中数据注入到待查询的表中,并删除临时表。

CREATE TABLE IF NOT EXISTS cloudfrontlogpart (

time STRING, xedgelocation STRING, scbytes  INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING

)

PARTITIONED BY (datee Date)

LOCATION '${PARTDIRh}';

 

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog${DATEh} (

date1 Date, time STRING, xedgelocation STRING, scbytes  INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING

)

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

LOCATION '${STAGINGDIRh}/${DATEh}/'

tblproperties("skip.header.line.count"="2");

 

--make dynamic insert available

set hive.exec.dynamic.partition.mode=nonstrict;

 

--insert data.

INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)

SELECT  time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1

FROM cloudfrontlog${DATEh};

 

--delete the staging table

DROP TABLE cloudfrontlogstaging${DATEh};

5.   准备presto2s3.sh脚本

作用包括:查询hive表,并将结果存成.csv格式的文件,将该文件上传到S3中相应的目录下。

#!/bin/bash

TIME=$(date +%H%M%S)

SQOOPFILE=$1

DATE=$2

DATEE=$3

sudo presto-cli --catalog hive --schema default --execute "SELECT NULL as id, csuristem, SUM(scbytes) as totalbyte, CAST('$DATEE' AS varchar) as date FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('$DATEE' AS varchar) GROUP BY csuristem order by totalbyte desc" --output-format CSV > ~/cdnfilestat.csv

aws s3 cp ~/cdnfilestat.csv $SQOOPFILE/$DATE/cdnfilestat$TIME.csv

6.   准备s3tomysql.sh脚本

作用是利用sqoop将s3中的.csv文件中的内容转存到数据库中。注意,如果单次转存的数据量大,你可能需要调大数据库的max_allowed_packet参数。

#!/bin/bash

JDBCURL=$1

DBUSER=$2

DBPASS=$3

SQOOPFILE=$4

DATE=$5

sqoop export --connect $JDBCURL --username $DBUSER --password $DBPASS --table loganalytb --fields-terminated-by ',' --enclosed-by '\"' --export-dir $SQOOPFILE/$DATE/

7.   准备emrconf.json脚本

作用是使得创建起来的EMR集群的hive元数据存储在外部数据库中。注意:根据准备工作中创建的数据库来修改文件中的ConnectionUserName、ConnectionPassword、ConnectionURL三个参数。

[

{"Classification":"hive-site",

"Properties":{

"javax.jdo.option.ConnectionUserName":"username",

"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",

"javax.jdo.option.ConnectionPassword":"password",

"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"

},

"Configurations":[]

}

]

注意:如果是在北京Region, emrconf.json使用以下脚本

[

{"Classification":"hive-site",

"Properties":{

"javax.jdo.option.ConnectionUserName":"username",

"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",

"javax.jdo.option.ConnectionPassword":"password",

"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"

},

"Configurations":[]

},

{"Classification":"presto-connector-hive",

"Properties":{

"hive.s3.pin-client-to-current-region":"true"

},

"Configurations":[]

}

]

8.   准备jar包

需要两个jar,一个是script-runner.jar,下载地址: http://s3.amazonaws.com/elasticmapreduce/libs/script-runner/script-runner.jar

另一个是mysql的JDBC驱动,下载地址: https://s3-us-west-2.amazonaws.com/hxyshare/mysql-connector-java-5.1.38-bin.jar

9.   上传文件

在s3中创建s3://testcloudfrontlog/conf/目录,并将第8步中的两个jar包,以及cpjar.sh,log2staging.sh,hivetables.q,presto2s3.sh,s3tomysql.sh,emrconf.json,上传到该目录。

由于emrconf.json需要通过http的方式访问到,在s3中将emrconf.json的访问权限增加“所有人可下载”。

cloudfront日志文件copy到s3://testcloudfrontlog/log目录下, 两个示例文件下载地址:

https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz

https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz

如果在手动流程中已经上传了日志文件,则不必再上传。

第三步,执行程序

暂时将主程序loganaly.sh中的DATE和DATEE参数修改为示例数据的时间,例如,分别写成20160711和2016-07-11,在任意一个Linux系统中运行主程序脚本loganaly.sh(例如,可以使用EC2实例)。但需注意:

1) 该机器需要安装了AWS命令行工具,并具有s3和EMR的操作权限。

2) 该机器能访问到存储数据结果的数据库,因为loganaly.sh中有对该数据库的操作。

集群创建成功后,自动执行每个step中的任务,所有任务执行完成后自动关闭,从下图中可以看到每个Step的执行:

所有任务执行完成后,进入到存储查询结果的数据库,查看输入的结果:

如果要想每天执行loganaly.sh脚本并对前一天的数据进行处理和分析,将loganaly.sh中的DATE和DATEE分别赋值为 $(date -d “yesterday” +%Y%m%d) 和 $(date -d “yesterday” +%Y-%m-%d),然后创建一个crontab定时任务,每天定时执行loganaly.sh.

如果是在AWS中国以外的region执行,还可以利用竞价实例来大幅的降低成本,使用竞价实例的方法也非常简单,只需要在将loganaly.sh中对创建EMR集群的脚本稍做修改,增加BidPrice参数:

--instance-groups \

Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,BidPrice=0.2,InstanceCount=$MASTERNUM \

Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,BidPrice=0.2,InstanceCount=$CORENUM \

Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,BidPrice=0.2,InstanceCount=$TASKNUM

第四步,删除资源

为避免额外花费,删除本实验过程中(包括手动过程以及自动过程中)创建的资源,S3, EC2等资源。

总结

本文中使用Cloudfront日志进行分析,但本文中使用的方法稍作修改便适用于其他类型的日志类文件的分析。本文中主要使用了EMR中的Hive,Presto,Sqoop工具,但EMR还有更多的工具(例如Spark)可供用户使用,用户在创建集群的时候增加相应的服务即可实现丰富的功能。

作者介绍:

韩小勇

AWS解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,实施和推广,在加入AWS之前,从事电信核心网系统上云的方案设计及标准化推广 。