亚马逊AWS官方博客

手把手教你使用Amazon EMR进行交互式数据查询

本文将带您一步步完成一个利用Amazon EMR进行交互式数据查询的实例,过程包括数据的注入、数据的分析、结果的转存、以及将整个过程自动化的方法。其中涉及的EMR组件主要包括: Hive, Hadoop, presto, sqoop。除EMR外,涉及到的其他服务包括:S3, RDS. 本文所使用的数据源是cloudfront产生的日志。

在按照本文档进行操作之前,读者需了解S3,RDS并能够进行基本的S3,RDS的操作,读者需了解EMR的基本概念。以下是参考资料:

什么是EMR:

Amazon Elastic MapReduce (Amazon EMR) 是一种托管数据分析服务的框架,提升企业、研究人员、数据分析师和开发人员轻松、经济高效掌控海量数据的能力。其当前版本中托管的服务包括:Hadoop, Zeppelin, Tez, Ganglia, HBase, Pig, Hive, Presto, ZooKeeper, Sqoop, Mahout, Hue, Phoenix, Oozie, Spark, Hcatalog. EMR让您专注于数据分析,无需担心费时的集群设置、管理或调整,也无需担心所需要的计算能力。

具体参考: https://aws.amazon.com/cn/documentation/elastic-mapreduce/

什么是S3:

Amazon Simple Storage Service (Amazon S3) 为开发人员和 IT 团队提供安全、耐用且高度可扩展的对象存储。S3 可为EMR提供文件存储服务。

具体参考:https://aws.amazon.com/cn/documentation/s3/

什么是RDS:

Amazon Relational Database Service (Amazon RDS) 是一种可让用户在云中轻松设置、操作和扩展关系数据库的 Web 服务。 它在承担耗时的数据库管理任务的同时,又可提供经济高效的可调容量,使您能够腾出时间专注于应用程序开发。Amazon RDS 让您能够访问非常熟悉的 MySQL、PostgreSQL、Oracle 或 Microsoft SQL Server 等数据库引擎的功能。

具体参考:https://aws.amazon.com/cn/documentation/rds/

准备工作

1.   Cloudfront生成的日志已经存储在s3桶中,并在不断更新, 存储目录是:s3://testcloudfrontlog/log

其中testcloudfrontlog是s3存储桶的名字,在实际操作的时候,需要换一个名字,因为s3存储桶的名字是全局唯一的, 而其他人也有可能使用了这个名字。

可以从以下链接下载本例中使用的示例文件,并上传到s3://testcloudfrontlog/log目录下。

https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz

https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz

2.   创建一个目录来存储按照日期划分的日志数据。 目录是s3://testcloudfrontlog/logbydate

3.   创建一个目录用来做hive表的数据存储,目录是s3://testcloudfrontlog/logpart

4.   注意:直接copy本文的代码有可能会由于字符原因出现错误,建议先copy到纯文本编辑器中再执行。

手动方式完成交互式数据查询

第一步,创建一个EMR集群,方法如下:

1.   进入到AWS的控制台,选择EMR服务,点击创建集群。

2.   点击转到高级选项

3.   软件配置

选择EMR发行版本以及所需要的软件, 在本例中,我们选择emr-5.0.0版本,所需要的工具选择hadoop, hive, presto, sqoop。 本步骤中的其余选项使用默认值,然后点击下一步。如果是使用北京Region,在输入配置中输入以下内容,使得presto可访问s3, 如果使用其他Region,不必输入。

[{"classification":"presto-connector-hive", "properties":{"hive.s3.pin-client-to-current-region":"true"}, "configurations":[]}]

4.   硬件配置

进入到硬件配置界面,默认配置如下,直接使用默认配置。然后点击下一步。

5.   一般选项

在一般选项的集群名称后面输入一个名字,作为集群的名字。其余的可按照默认配置。然后点击下一步。

6.   安全选项

在EC2键对后面的框中选择一个已有的键对,该键对用来在集群创建成功后,从SSH客户端登录到集群中的任意一台服务器。如果选择“在没有EC2键对的情况下继续”,则后续不能登录到集群中的机器。其余选项均可默认。然后点击创建集群。7.   修改安全组规则,并登录EMR的主节点

进入到刚刚创建的集群的信息界面,点击主节点安全组,进入到该安全组的配置界面,在入规则中增加SSH的访问规则,这样才可以通过SSH的方式从外部机器登录到主节点。然后通过任意一个SSH客户端登录到主节点,目标地址是图中所示的主节点共有DNS, 用户名是hadoop, 通过私钥登录,私钥与前面所提到的键对对应。

第二步,创建数据表并进行查询

1.   SSH到主节点后,执行hive命令,进入到hive命令行界面

2.   创建一个用日期作为分区的hive表,用来作为最终被查询的表

将以下脚本copy到hive>提示符下执行,注意LOCATION的参数需要改成你自己的目录。

CREATE TABLE IF NOT EXISTS cloudfrontlogpart (

time STRING, xedgelocation STRING, scbytes  INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING

)

PARTITIONED BY (datee Date)

STORED AS PARQUET

LOCATION 's3://testcloudfrontlog/logpart';

3.   输入quit命令,退出hive。用aws s3命令将s3://testcloudfrontlog/log中日志copy到s3://testcloudfrontlog/logbydate中按照时间划分的目录下

以2016-07-11这一天的文件为例,命令如下,注意,s3目录需要改成你自己的。

aws s3 cp s3://testcloudfrontlog/log/  s3://testcloudfrontlog/logbydate/2016-07-11/  --exclude "*" --include "*.2016-07-11*" --recursive

这里用到了aws s3命令行工具。你可以在EMR主节点中退出hive命令行程序,然后执行以上命令。或者在任意一个安装了aws cli工具并配置了s3访问权限的机器中执行。本例中直接在EMR的主节点中执行。aws s3 cp不支持通配符,所以用–exclude 和 –include 参数来代替。

4.   针对s3://testcloudfrontlog/logbydate/2016-07-11/ 中的数据,创建一个HIVE表。

假设表的名字是cloudfrontlog20160711, 输入hive, 重新进入到hive命令行工具,并输入以下语句,注意LOCATION的参数需要改成你自己的目录。

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog20160711 (

date1 Date, time STRING, xedgelocation STRING, scbytes  INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING

)

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

LOCATION 's3://testcloudfrontlog/logbydate/2016-07-11/'

tblproperties("skip.header.line.count"="2");

5.   数据注入

至此,已经创建了两个hive表,通过在hive命令行工具中执行 show tables命令,可以查看到两个hive表。

向带分区的hive表中注入7月11日的数据,在hive命令行界面中分别执行以下命令:

set hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)

SELECT  time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1

FROM cloudfrontlog20160711;

以下是INSERT语句执行过程的显示信息。

完成后,如果进入到s3://testcloudfrontlog/logpart目录下,可以查看到已经生成了按日期划分的目录,目录下存储了文件。

第三步,数据查询

可以继续使用hive做查询,也可以进入presto做查询。这里,我们使用presto, 由于presto完全使用内存进行计算, 速度更快。进入presto的方式如下:

执行quit, 退出hive命令行程序。

然后执行以下语句进入presto命令行界面:

presto-cli --catalog hive --schema default

该语句表示presto使用hive数据源,并且使用hive数据源中的default数据库。细节请参考presto的社区文档。

执行一个简单的查询:

SELECT time, scbytes, cip FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('2016-07-11' AS varchar) LIMIT 5;

然后退出presto命令行工具,输入 quit;

第四步,删除EMR集群

在集群列表中,选中刚刚创建的集群,点击终止,以终止该集群。如果开启的终止保护,需要变更一下终止保护的状态,然后再终止。

至此, 我们通过手动的方式完成了一个简单的数据查询。但在实际生产环境中,使用手动的方式会耗费很长时间做重复性的工作,并难免出错。EMR更大优势是能够通过程序的方式去控制集群的创建以及任务的执行,这使得EMR的使用者能够将集群创建以及数据分析的过程自动化。

接下来的部分将以相同的示例指导读者一步步的实现自动化的创建集群、分析数据、转存结果、关闭集群。

自动方式完成交互式数据查询

首先概括几点自动化执行数据分析的需求:

1.   集群在每天的固定时间被创建,然后对数据进行分析,然后集群被自动删除。这显然不能通过图形界面进行一步步的操作了。

2.   在手动操作中执行的每个步骤,需要按顺序自动化执行。这些步骤包括:

–       从/log目录向/logbydate目录中copy特定日期的文件。

–       创建对应特定日期的hive表,例如表名为cloudfrontlog20160711。

–       从cloudfrontlog20160711向cloudfrontlogpart表中注入数据。

–       使用presto从cloudfrontlogpart表中查询出需要的数据。

–       此外,在手动执行的最后一步,我们可以直接看到查询结果,但在自动化执行的过程中,我们需要将查询结果存储到一个长期运行的数据库中,供随时查询。

3.   将hive元数据放在集群的外部。

在手动执行的流程中,创建hive表后,hive的元数据存储在了主节点。而在自动化执行的过程中,当所有任务执行完毕后,集群被删除,存储在主节点的元数据也会被删除,因此要在外部数据库中存储hive的元数据。

针对以上的几个需求,在EMR中对应的解决方法如下.

1.   使用EMR的命令行进行各种集群的操作,例如集群的创建,参数的设置等。

2.   使用EMR的“step”来组织各个任务的执行。

3.   创建一个外部的数据库用来存储hive元数据,并在集群创建的时候指定元数据的存储位置。

接下来详细描述操作步骤和脚本

第一步,准备工作

1.   准备两个mysql数据库分别用来存储hive元数据和查询结果,可以使用AWS的RDS服务来创建。这两个数据库都需要能被EMR访问到, 这两个数据库也可以使用同一个物理服务器或虚拟机。

2.   假设存储查询结果的数据库名字是loganalydb,我们在该数据库中创建一个表,用来存储结果数据,表的名字是loganalytb。根据后面所进行的查询,使用如下语句创建数据库以及与查询结果匹配的表:

CREATE DATABASE loganalydb;

USE loganalydb;

CREATE TABLE `loganalytb` ( `id` int(11) NOT NULL AUTO_INCREMENT, `filepath` varchar(300) DEFAULT NULL, `totalbyte` bigint(20) DEFAULT NULL, `tdate` varchar(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58147 DEFAULT CHARSET=utf8;

对于存储hive元数据的数据库,暂时不必创建,从后面提到的emrconf.json文件可以看到,只需要给出数据库服务器的地址,以及用户名和密码即可,数据库不存在的话,会自动创建。

3.   由于自动化的流程与前面提到的手动流程使用相同的示例,因此,请事先删除手动流程示例中产生的数据。包括:

1)   s3://testcloudfrontlog/logbydate目录下的数据

2)   s3://testcloudfrontlog/logpart目录下的数据

第二步,编写脚本

下面给出各个脚本,并配以注释解释

1.   主程序脚本loganaly.sh

#!/bin/bash

# 变量KEYNAMES是你的SSH key的名字, 用来通过SSH方式访问EC2。

KEYNAME=yourkeyname

# 变量CONFIGFILE是emrconf.json的url地址,emrconf.json这个文件中包含了存储hive元数据的外部数据库的信息。将该文件存在s3中并让这个文件能够从外部访问到。emrconf.json中的内容在下文中会给出。

CONFIGFILE=https://s3-us-west-2.amazonaws.com/testcloudfrontlog/conf/emrconf.json

# EMR集群产生的日志所存放的s3目录。

LOGURI=s3://testcloudfrontlog/emrlog/

# 脚本、配置文件、jar包所在的目录

CODEDIR=s3://testcloudfrontlog/conf

# cloudfront日志的存放位置, 结尾别加 /

LOGSOURCEDIR=s3://testcloudfrontlog/log

# 当天日志的中转目录, 结尾别加 /

STAGINGDIR=s3://testcloudfrontlog/logbydate

# 昨天的日期, 形如:20160711

DATE=$(date -d "yesterday" +%Y%m%d)

# 昨天的日期,另外一种格式, 形如:2016-07-11

DATEE=$(date -d "yesterday" +%Y-%m-%d)

# 被查询的hive表的文件存储位置, 结尾别加 /

PARTDIR=s3://testcloudfrontlog/logpart

# 用来存储结果文件的目录, 结果文件将被sqoop使用,数据会被传到mysql.

SQOOPFILE=s3://testcloudfrontlog/sqoopfile

#用来存储结果数据的mysql的信息。

DBHOST=rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com

JDBCURL=jdbc:mysql://rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com/loganalydb

DBUSER=username

DBPASS=password

# EMR集群的配置信息, 这里是以Oregan region为例。如果使用的是北京region,AWSREGION用cn-north-1。

AWSREGION=us-west-2

MASTERTYPE=m3.xlarge

CORETYPE=r3.xlarge

TASKTYPE=r3.xlarge

MASTERNUM=1

CORENUM=1

TASKNUM=1

# 删除当天的数据,目的是防止脚本在同一天被多次执行而造成数据冗余,

aws s3 rm $PARTDIR/datee=$DATEE --recursive

aws s3 rm $SQOOPFILE/$DATE --recursive

mysql -h$DBHOST -u$DBUSER -p$DBPASS --execute "DELETE FROM loganalydb.loganalytb WHERE tdate='$DATEE'"

 

# 创建emr集群,名字是loganaly, 其中:

# --auto-terminate参数表示该集群在执行完所有的任务后自动删除。

# --configurations参数的文件中的参数配置覆盖了该集群运行起来后的默认参数配置。在本例中用来修改Hive元数据的存储位置。

# --step参数规定了该集群在创建后要执行的几个任务,其中Type=Hive的step, 需要给出包含hive语句的文件作为参数。而Type=CUSTOM_JAR的step,需要给出一个JAR包,这里我们使用EMR提供scrip-runner.jar,它的作用是执行其第一个参数中指定的脚本文件,并将其余的参数作为脚本文件的输入参数。

# --instance groups参数规定了集群的中各节点的机型和数量

aws --region $AWSREGION emr create-cluster --name "loganaly" --release-label emr-5.0.0 \

--applications Name=Hadoop Name=Hive Name=Presto Name=Sqoop \

--use-default-roles \

--ec2-attributes KeyName=$KEYNAME \

--termination-protected \

--auto-terminate \

--configurations $CONFIGFILE \

--enable-debugging \

--log-uri $LOGURI \

--steps \

Type=CUSTOM_JAR,Name="cpjar",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/cpjar.sh"," $CODEDIR"] \

Type=CUSTOM_JAR,Name="log2staging",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/log2logbydate.sh","$LOGSOURCEDIR","$STAGINGDIR","$DATE","$DATEE"] \

Type=Hive,Name="HiveStep",Args=[-f,$CODEDIR/hivetables.q,-d,PARTDIRh=$PARTDIR,-d,STAGINGDIRh=$STAGINGDIR,-d,DATEh=$DATE] \

Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/presto2s3.sh","$SQOOPFILE","$DATE","$DATEE"] \

Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/s3tomysql.sh","$JDBCURL","$DBUSER","$DBPASS","$SQOOPFILE","$DATE"] \

--instance-groups \

Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,InstanceCount=$MASTERNUM \

Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,InstanceCount=$CORENUM \

Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,InstanceCount=$TASKNUM

2.   准备cpjar.sh脚本

作用是将mysql的JDBC驱动包下载到本地的Sqoop目录下,sqoop在将文件转存到数据库的时候会用到JDBC驱动。

在国外region, 使用以下脚本:

#!/bin/bash

CODEDIR=$1

sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/

在北京Region, 使用以下脚本:

#!/bin/bash

CODEDIR=$1

sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/ --region cn-north-1

3.   准备log2logbydate.sh脚本

做用是将原始日志文件copy到按天划分的目录下。

#!/bin/bash

LOGSOURCEDIR=$1

STAGINGDIR=$2

DATE=$3

DATEE=$4

aws s3 cp $LOGSOURCEDIR/ $STAGINGDIR/$DATE/ --exclude "*" --include "*.$DATEE*" --recursive

4.   准备hivetables.q脚本

作用包括:创建待查询的表和按天命名的临时表,将临时表中数据注入到待查询的表中,并删除临时表。

CREATE TABLE IF NOT EXISTS cloudfrontlogpart (

time STRING, xedgelocation STRING, scbytes  INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING

)

PARTITIONED BY (datee Date)

LOCATION '${PARTDIRh}';

 

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog${DATEh} (

date1 Date, time STRING, xedgelocation STRING, scbytes  INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING

)

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

LOCATION '${STAGINGDIRh}/${DATEh}/'

tblproperties("skip.header.line.count"="2");

 

--make dynamic insert available

set hive.exec.dynamic.partition.mode=nonstrict;

 

--insert data.

INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)

SELECT  time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1

FROM cloudfrontlog${DATEh};

 

--delete the staging table

DROP TABLE cloudfrontlogstaging${DATEh};

5.   准备presto2s3.sh脚本

作用包括:查询hive表,并将结果存成.csv格式的文件,将该文件上传到S3中相应的目录下。

#!/bin/bash

TIME=$(date +%H%M%S)

SQOOPFILE=$1

DATE=$2

DATEE=$3

sudo presto-cli --catalog hive --schema default --execute "SELECT NULL as id, csuristem, SUM(scbytes) as totalbyte, CAST('$DATEE' AS varchar) as date FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('$DATEE' AS varchar) GROUP BY csuristem order by totalbyte desc" --output-format CSV > ~/cdnfilestat.csv

aws s3 cp ~/cdnfilestat.csv $SQOOPFILE/$DATE/cdnfilestat$TIME.csv

6.   准备s3tomysql.sh脚本

作用是利用sqoop将s3中的.csv文件中的内容转存到数据库中。注意,如果单次转存的数据量大,你可能需要调大数据库的max_allowed_packet参数。

#!/bin/bash

JDBCURL=$1

DBUSER=$2

DBPASS=$3

SQOOPFILE=$4

DATE=$5

sqoop export --connect $JDBCURL --username $DBUSER --password $DBPASS --table loganalytb --fields-terminated-by ',' --enclosed-by '\"' --export-dir $SQOOPFILE/$DATE/

7.   准备emrconf.json脚本

作用是使得创建起来的EMR集群的hive元数据存储在外部数据库中。注意:根据准备工作中创建的数据库来修改文件中的ConnectionUserName、ConnectionPassword、ConnectionURL三个参数。

[

{"Classification":"hive-site",

"Properties":{

"javax.jdo.option.ConnectionUserName":"username",

"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",

"javax.jdo.option.ConnectionPassword":"password",

"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"

},

"Configurations":[]

}

]

注意:如果是在北京Region, emrconf.json使用以下脚本

[

{"Classification":"hive-site",

"Properties":{

"javax.jdo.option.ConnectionUserName":"username",

"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",

"javax.jdo.option.ConnectionPassword":"password",

"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"

},

"Configurations":[]

},

{"Classification":"presto-connector-hive",

"Properties":{

"hive.s3.pin-client-to-current-region":"true"

},

"Configurations":[]

}

]

8.   准备jar包

需要两个jar,一个是script-runner.jar,下载地址: http://s3.amazonaws.com/elasticmapreduce/libs/script-runner/script-runner.jar

另一个是mysql的JDBC驱动,下载地址: https://s3-us-west-2.amazonaws.com/hxyshare/mysql-connector-java-5.1.38-bin.jar

9.   上传文件

在s3中创建s3://testcloudfrontlog/conf/目录,并将第8步中的两个jar包,以及cpjar.sh,log2staging.sh,hivetables.q,presto2s3.sh,s3tomysql.sh,emrconf.json,上传到该目录。

由于emrconf.json需要通过http的方式访问到,在s3中将emrconf.json的访问权限增加“所有人可下载”。

cloudfront日志文件copy到s3://testcloudfrontlog/log目录下, 两个示例文件下载地址:

https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz

https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz

如果在手动流程中已经上传了日志文件,则不必再上传。

第三步,执行程序

暂时将主程序loganaly.sh中的DATE和DATEE参数修改为示例数据的时间,例如,分别写成20160711和2016-07-11,在任意一个Linux系统中运行主程序脚本loganaly.sh(例如,可以使用EC2实例)。但需注意:

1) 该机器需要安装了AWS命令行工具,并具有s3和EMR的操作权限。

2) 该机器能访问到存储数据结果的数据库,因为loganaly.sh中有对该数据库的操作。

集群创建成功后,自动执行每个step中的任务,所有任务执行完成后自动关闭,从下图中可以看到每个Step的执行:

所有任务执行完成后,进入到存储查询结果的数据库,查看输入的结果:

如果要想每天执行loganaly.sh脚本并对前一天的数据进行处理和分析,将loganaly.sh中的DATE和DATEE分别赋值为 $(date -d “yesterday” +%Y%m%d) 和 $(date -d “yesterday” +%Y-%m-%d),然后创建一个crontab定时任务,每天定时执行loganaly.sh.

如果是在AWS中国以外的region执行,还可以利用竞价实例来大幅的降低成本,使用竞价实例的方法也非常简单,只需要在将loganaly.sh中对创建EMR集群的脚本稍做修改,增加BidPrice参数:

--instance-groups \

Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,BidPrice=0.2,InstanceCount=$MASTERNUM \

Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,BidPrice=0.2,InstanceCount=$CORENUM \

Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,BidPrice=0.2,InstanceCount=$TASKNUM

第四步,删除资源

为避免额外花费,删除本实验过程中(包括手动过程以及自动过程中)创建的资源,S3, EC2等资源。

总结

本文中使用Cloudfront日志进行分析,但本文中使用的方法稍作修改便适用于其他类型的日志类文件的分析。本文中主要使用了EMR中的Hive,Presto,Sqoop工具,但EMR还有更多的工具(例如Spark)可供用户使用,用户在创建集群的时候增加相应的服务即可实现丰富的功能。

作者介绍:

韩小勇

AWS解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,实施和推广,在加入AWS之前,从事电信核心网系统上云的方案设计及标准化推广 。