Tag: Amazon Kinesis


如何在1个小时之内轻松构建一个Serverless 实时数据分析平台

数据分析平台,特别是实时数据分析,正在被越来越广泛的应用于各个行业。 举例来说,游戏公司在发布新游戏之后,需要实时定位用户的留存、增长等情况;快销公司需要精确地记录每一笔订单的情详情,并结合社交媒体,实时分析促销活动引起的用户购买行为与销量等等。基于这些需求, AWS提供了一整套成熟的解决方案与服务,并且得到了广泛的应用。

图1 AWS大数据参考架构示例

上图中,Amazon Kinesis 是实时的流式分析服务,而Amazon S3是AWS的海量数据存储服务。利用Kinesis与S3,我们可以十分方便的构建一个实时流式信息数据的采集与存储。 值得注意的是,作为Serverless计算服务的代表 , 用户只需要编写实现对应的ETL逻辑,Amazon Lambda就可以非常方便地对Kinesis流式数据进行抽取与分析而不需要部署任何服务器。另外,用户也可以使用Kinesis Firehose(Kinsis服务之一)实现原始数据的直接注入与收集。

随着Amazon Athena在AWS re:Invent 2016的重磅发布,AWS的大数据平台又增添了重要的一员!Amazon Athena 是一种交互式查询服务,用户可以使用标准SQL 分析 Amazon S3 中的数据。因为Athena底层是基于Serverless(无服务器)架构,用户不需要运维底层的服务器,并且查询处理能力会随着用户的数据将进行自适应与扩展,实现秒级别的数据查询与处理。

闲话少说,我们将利用AWS提供的三个重要服务——Amazon Kinesis Firehose,、Lambda和Athena在1个小时之内实现一套实时分析的Serverless数据分析平台!

准备好了吗?Let’s rock

1.数据源。作为测试,我们将对AWS VPC Flow Logs进行分析。您可以使用Kinesis Agent/Flume/Fluentd或者Amazon Kinesis SDK对前端的实时日志进行分析。Amazon VPC Flow Logs将实时记录VPC监控的网络端口的流量与通信日志,并将日志发布于AWS CloudWatch Logs。详细的配置请参见 https://aws.amazon.com/cn/blogs/aws/vpc-flow-logs-log-and-view-network-traffic-flows/

2.数据ETL。VPC Flow Logs进入CloudWatch Logs之后,可以利用Lambda对实时日志进行订阅处理。订阅之后,Lambda会在CloudWatch Logs更新之后,自动调用执行,进行数据ETL。

首先,在控制台创建一个Lambda函数(利用Python实现).为了确保Lambda有对应的执行权限,需要赋予Lambda函数相应的Permission Role.在这个示例中,我们只需要服务Lambda对应的CloudWatch Logs以及Kinesis Firehose的权限即可。

其次,Lambda 代码会对进入的CloudWatch日志的第一个Base64编码的转码并进行gzip解压(因为Cloudwatch Logs会对送往Lambda首先进行Base64编码并进行gzip压缩)。之后,Lambda会对具体的日志进行汇聚,以batch的方式发送给Kinesis Firehose。具体的代码如下:

代码中,利用环境变量 DELIVER_STREAM_NAME 传递Kinesis Firehose Stream,详见步骤3)。

最后,利用AWS CloudWatch logs的订阅功能,就可以实时地把日志发布到Lambda函数中了。

aws logs put-subscription-filter \

    --log-group-name myLogGroup \

    --filter-name demo \

    --filter-pattern "" \

    --destination-arn arn:aws:lambda:us-east-1:123456789123:function:helloworld\

具体的配置过程可以参考 http://docs.aws.amazon.com/zh_cn/AmazonCloudWatch/latest/logs/Subscriptions.html

3.创建Kinesis Fireshose实现到S3的数据自动存储与汇聚。Kinesis Firehose提供了自动对数据进行汇聚,目前支持S3和Redshift, ElastiSearh。这里,我们利用控制台,十分简单地创建了如下一个Firehose Stream:

图2 Kinesis Firehose 配置过程

4.利用Amazon Athena进行数据查询。因为Athena底层是基于Hive Catalog对S3数据进行管理,上层基于Presto的方式进行SQL查询。因此我们首先需要使用Hive对S3的VPC Flow Logs进行外表DDL操作。具体代码如下:

我们在创建表的过程中,创建了 Year,Month, Day 与Hour的分区,是因为我们在实现Firehose的时候自动进行了时间和日期的前缀设置。同时,利用分区也可以大大提高hive的数据查询性能。

到这里,整个Serverless 处理能力自适应的架构已经构建完成,来测试一下Athena的查询结果吧。 Athena提供了Web Console让BI用户可以直接对S3数据湖进行查询,同时,用户也可以利用JDBC直接与第三方的BI工具集成实现自动化查询。查询结果也可以利用CSV的文件下载的方式直接分享给其他用户。

图3 利用Web Console对Athena进行数据分析

作者介绍:

肖凌

AWS解决方案架构师,负责基于AWS的云计算方案架构的咨询和设计,同时致力于AWS云服务在国内和全球的应用和推广,在大规模并发后台架构、跨境电商应用、社交媒体分享 、Hadoop大数据架构以及数据仓库等方面有着广泛的设计和实践经验。在加入AWS之前曾长期从事移动端嵌入式系统开发,IBM服务器开发工程师。并负责IBM亚太地区企业级高端存储产品支持团队,对基于企业存储应用的高可用存储架构和方案有深入的研究。

 

 

 

AWS Kinesis的Javascript交互方法

一.介绍

Amazon Kinesis 是一种托管的弹性可扩展服务,能实时处理大规模的流数据。在该服务收集大数据记录流后,可用多种数据处理应用程序实时处理该数据流。Amazon Kinesis Streams 每小时可从数十万种来源中连续捕获和存储数 TB 数据,如网站点击流、财务交易、社交媒体源、IT 日志和定位追踪事件。Amazon Kinesis Streams 数据流的吞吐量每小时可从数 MB 扩展到数 TB,PUT 记录每秒钟可从数千次扩展到数百万。您可以随时根据您的输入数据量动态调节数据流的吞吐量。

AWS为旗下的服务提供了多种开发工具包,支持包括Java、PHP、Python、Ruby、浏览器端等语言或平台。对于Amazon Kinesis,我们除了使用上述的Stream API进行开发外,AWS还提供了Amazon Kinesis Client Library (KCL) 开发适用于 Amazon Kinesis Streams 的使用器应用程序。在本文当中,我们展示如何使用Javascript在浏览器端与Amazon Kinesis进行交互,包括把记录Put到Kinesis,和从Kinesis读取记录。

二.基本概念与限制

在阐述如何AWS Kinesis的Javascript交互方法前,我们有必要对Kinesis Stream当中的关键概念——“分片”和“数据记录”作初步的了解。

分片

分片Share是流中数据记录的唯一标识组。一个流由一个或多个分片组成,每个分片提供一个固定的容量单位。流的总容量是其分片容量的总和。每个分片对应提供 1 MB/s 的写入容量和 2 MB/s 的读取容量。需要注意的是,每个分片可支持最多1000条记录/s的写入,和5个事务/s的读取。用户需要根据上述的容量和数目的限制,为流添加足够多的分片数目,以满足自身需求。

数据记录

数据记录是存储在 Amazon Kinesis Stream中的数据单位。数据记录由序列号、分区键和数据 Blob 组成。

每个数据记录都有一个唯一的序列号。当应用程序对Amazon Kinesis Stream进行写入记录时,Streams将自动为其分配序列号。同一分区键的序列号通常会随时间变化增加;写入请求之间的时间段越长,序列号则越大。但需要注意的是,序列号不能用作相同流中的数据集的索引。用户如果需要在逻辑上分隔数据集,请使用分区键或为每个数据集创建单独的流。

分区键Partition Key用于按分片对流中的数据进行分组。Streams 服务使用与每条数据记录关联的分区键将属于流的数据记录分为多个分片,以便确定给定的数据记录所属的分片。分区键是最大长度限制为 256 个字节的 Unicode 字符串。MD5 哈希函数用于将分区键映射到 128 位整数值并将关联的数据记录映射到分片。分区键由将数据放入流的应用程序指定。

Blob是不可变的字节序列,也就是用户添加到Kinesis Stream中真正存储的数据。Streams 不以任何方式检查、解释或更改 Blob 中的数据。数据 Blob 可以是任何类型的数据,例如可以为日志文件的一个分段、地理位置数据、网页点击流数据等等。一个Blob 数据最多为 1 MB。

关于Amazon Kinesis Streams更详细概念信息,请查看 以下AWS官方文档:http://docs.aws.amazon.com/zh_cn/kinesis/latest/dev/service-sizes-and-limits.html

三.安全性问题

要对AWS Kinesis发送或接收消息,我们需要在客户端中设置安全证书,才能与Kinesis进行交互。用户可以选择在客户端中使用固定的IAM证书或者临时证书,但如果您选择使用固定的IAM证书,请注意当中涉及到的重大安全性问题!客户能够轻易地从前端页面获取IAM用户的AWS Access Key ID和AWS Secret Access Key,而且固定的IAM证书长期有效,如果大部分用户共用同一个IAM证书,极容易发生恶意攻击的情况。因此,尽管技术上可行,但我们不建议用户使用固定的IAM证书,强烈建议用户使用临时证书!用户能够给临时证书设定较短的有效期,而且每次申请所获得临时证书都是独一无二的。用户能够在其自身的恶意攻击检测中直接让攻击源的临时证书失效,而不影响其他客户的正常使用。

在本文档中,我们将会介绍使用TVM服务器获取临时证书以及使用AWS的托管服务Cognito获取临时证书两种方法。但无论使用TVM还是Cognito,共同的目的都是获取临时证书的accessKeyId、secretAccessKey、sessionToken三个参数。因此获取临时证书是本文相对独立的一部分,我们会在第八部分和第九部分分别进行介绍。而在第七部分,我们假设前端已经获取上述临时证书三个参数的情况下,对Kinesis的Javascript交互方法进行讲述。

四.准备资源文件

要使用Javascript访问AWS Kinesis,需要准备AWS的Javascript开发工具包。AWS的Javascript开发工具包可从AWS官网上下载,或在前端页面的head部分直接引用该js包。在本篇文章撰写时,最新js包的地址为https://sdk.amazonaws.com/js/aws-sdk-2.4.13.min.js。您可以关注该工具包的github项目地址(https://github.com/aws/aws-sdk-js),随时获取最新的开发工具包。

五.准备AWS账户及创建AWS Kinesis Stream

要使用Javascript与AWS Kinesis进行交互,我们需要在云端创建一个可用的AWS Kinesis Stream。如今,包括中国北京在内的AWS Region都支持使用Kinesis服务,因此,用户创建AWS中国区账户或AWS标准账户都能够使用AWS Kinesis服务。

进入AWS账户后,我们需要在AWS Kinesis服务中创建流。具体的创建方法为:

  1. 登陆AWS账户终端界面,点击Kinesis操作模块 https://console.aws.amazon.com/kinesis
  2. 在操作模块中点击“创建流”。自定义流名称后,分区数量我们填写最低配置1,然后点击“创建”按钮即可。(为了节省成本,我们这里只为流添加一个分片。用户可以根据自己的实际需要设定分片的数量)

六.权限设置

要对AWS Kinesis Stream生产和消费数据,我们需要为临时证书添加相应的权限。一般情况下,我们建议用户为临时证书提供最低的AWS Kinesis使用权限。对资源的严格限制,是我们AWS一直推崇的做法。生产者和消费者的最低权限设置分别如下所示:

生产者Producer的最低权限设置:

{

    "Version": "2012-10-17",

    "Statement": [

        {

            "Effect": "Allow",

            "Action": [

                "kinesis:DescribeStream",

                "kinesis:PutRecord",

                "kinesis:PutRecords"

            ],

            "Resource": [

                "arn:aws-cn:kinesis:xxxxxxxxxx:xxxxxxxxxxxx:stream/Vincent_danmu_BG"

            ]

        }

    ]

}

消费者Consumer的最低权限设置:

{

    "Version": "2012-10-17",

    "Statement": [

        {

            "Effect": "Allow",

            "Action": [

                "kinesis:DescribeStream",

                "kinesis:GetShardIterator",

                "kinesis:GetRecords"

            ],

            "Resource": [

                "arn:aws-cn:kinesis:xxxxxxxxxx:xxxxxxxxxxxx:stream/Vincent_danmu_BG"

            ]

        }

    ]

}

另外,如果您使用 Amazon Kinesis Client Library (KCL) 开发应用程序,您的策略必须包含对 Amazon DynamoDB 和 Amazon CloudWatch 的权限。因为KCL需要使用 DynamoDB 跟踪应用程序的状态信息,并使用 CloudWatch 代表您将 KCL 指标发送到 CloudWatch。如果用户并非使用KCL 开发应用程序,包括本次的Kinesis与Javascript交互演示,无需使用DynamoDB和CloudWatch资源,在此仅作提示。在KCL的场景下需要增加的权限如下所示:

  {

            "Effect": "Allow",

            "Action": [

                "dynamodb:CreateTable",

                "dynamodb:DeleteItem",

                "dynamodb:DescribeTable",

                "dynamodb:GetItem",

                "dynamodb:PutItem",

                "dynamodb:Scan",

                "dynamodb:UpdateItem"

            ],

            "Resource": [

                "arn:aws-cn:dynamodb:xxxxxxxxxx:xxxxxxxxxxxx:table/amazon-kinesis-learning"

            ]

        },

        {

            "Effect": "Allow",

            "Action": [

                "cloudwatch:PutMetricData"

            ],

            "Resource": [

                "*"

            ]

        }

需要注意的是,上述红色标注的地方为本人所使用的AWS Kinesis Stream和 DynamoDB表格的ARN资源名称(Stream对应的名称为Vincent_danmu_BG,DynamoDB table对应的名称为amazon-kinesis-learning),用户需要把它替换成自己个人所对应的ARN资源。关于如何寻找各AWS服务中对应的资源名称,请参考AWS官方网站说明。另外需要注意的是,如果ARN资源在中国区内,一般需要以“arn:aws-cn:”开头标注资源;如果ARN资源在中国区外的标准AWS区域,则只需要使用一般的“arn:aws:”开头标注资源。

更多关于配置AWS Kinesis的权限问题,请参考AWS官方文档:http://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-iam.html。文档当中详细叙述了在AWS Console的IAM模块中如何创建User和Policy,为初学者提供很好的帮助。

七.在Javascript中对AWS Kinesis的访问方法

经过上述的准备工作,我们在这部分中,将会对AWS Kinesis的Javascript交互方法的核心部分进行介绍。

基本参数配置

无论是推送消息到AWS Kinesis还是从AWS Kinesis接收消息,我们都需要在Javascript中传入基本参数配置信息,包括临时证书的accessKeyId、secretAccessKey、sessionToken以及Kinesis Stream所在的Region、Kinesis Stream名称、分区键等等。然后我们需要用这些基本参数配置信息初始化这个过程中最重要的操作对象AWS.Kinesis。为了方便起见,我们建议用户把这些公用配置操作写在同一个文件当中,代码如下。

// constants of AWS Kinesis

var accessKeyId = '';//在这里传入临时证书的accessKeyId

var secretAccessKey = '';//在这里传入临时证书的secretAccessKey

var sessionToken = ''; //在这里传入临时证书的sessionToken

var region = '';//在这里输入AWS Region

var stream_name = '';//在这里输入Kinesis的流名称

var partition_key = '';//请在这里输入分区键

//初始化kinesis对象

var kinesis = new AWS.Kinesis({accessKeyId: accessKeyId, secretAccessKey: secretAccessKey, sessionToken: sessionToken, region: region });

在这里需要注意的是,我们初始化AWS.Kinesis对象时只需要传入accessKeyId、secretAccessKey、sessionToken、region四个参数。stream_name和partition_key会在后续推送消息或接收消息调用相关函数的时候作为参数传入。

推送消息到AWS Kinesis

要向Kinesis中推送消息,我们需要调用aws-sdk-2.4.13.min.js中的putRecord方法。putRecord方法每次向Kinesis Stream中推送一条record,AWS同时也支持API putRecords方法,用于每次向Kinesis Stream中推送多条record,读者可自行查看AWS API文档尝试。作为一种良好的代码写作习惯,我们把这部分的功能封装在函数function putStream(text)中。具体代码如下所示:

//该方法用于推送单条信息到Kinesis

function putStream(text) {

  var write_simple_params = {

    Data: text,

    PartitionKey: partition_key,

    StreamName: stream_name

  };

  kinesis.putRecord(write_simple_params, function(err, data) {//推动单条消息到Kinesis并定义返回函数

    if (err){// an error occurred

      console.log(err, err.stack);

    }else{

      alert('成功发送消息!');

    }

  });

}

需要注意的是,调用putRecord方法时需要传入配置参数组(这里为write_simple_params)和设置好回调函数。参数组的组成包括Blob的内容Data、分区键PartitionKey、流名称StreamName。通过回调函数,我们能够检测是否成功地把数据发送到Kinesis,并根据该结果进行下一步的操作。开发人员可以通过err判断是否产生错误,并通过err.stack获取错误产生的原因。

从AWS Kinesis接收消息

相比于向Kinesis推送消息,从Kinesis接收消息的流程和步骤相对复杂一点点。首先我们需要用getShardIterator方法获取初始分片迭代器ShardIterator。然后用ShardIterator从Kinesis获取记录。在这个过程当中,为了确保更新得以持续,我们循环自调用该方法,以保证ShardIterator可用。同样地,我们把这部分的功能封装在函数function readStream()中。具体关键代码如下所示:

//在第一次getShardIterator中获取ShardIterator读取数据

 function readStream(){

    var shardIterator_params = {

      ShardId: 'shardId-000000000000',

      ShardIteratorType: 'LATEST',

      StreamName: stream_name,

    }

    kinesis.getShardIterator(shardIterator_params, function(err, data) {

        if (!err) {

            readStreamLoop(data.ShardIterator);//第一次成功获取ShardIterator后进入循环更新以不断获取新的ShardIterator

        }else{

            console.log(err, err.stack);

            alert("ShardIterator Error");

        }

    });

 }

需要注意的是,调用getShardIterator方法时需要传入配置参数组(这里为shardIterator_params)和设置好回调函数。配置参数组需要我们输入分片Id号ShardId、ShardIterator的排序类型ShardIteratorType和流名称StreamName三项。同样地,通过回调函数我们能够检查存在的错误,并根据结果进行下一步的操作。在回调函数中,如果没有出现错误,我们可以通过data.ShardIterator获取初始分片迭代器的Id,然后凭借该Id从Kinesis获取记录。获取记录的具体步骤我们封装在函数function readStreamLoop(shardIteratorId)当中,具体代码如下所示。

//不断重复地获取下一个ShardIterator从而达到实时获取数据的效果。但该循环会不断消耗CPU性能

 function readStreamLoop(shardIteratorId) {

    var self = this;

    var read_params = {

        ShardIterator: shardIteratorId,

        Limit: 10

    };

    kinesis.getRecords(read_params, function(err, data) {

        if (!err){

            if (data['Records'].length > 0) {

                for (var i in data['Records']) {

                    msg = data['Records'][i]['Data'].toString('utf8');

                }

            }

            self.readStreamLoop(data['NextShardIterator']);//获取下一个ShardIterator的Id后不断自调用

        }

    });

}

需要注意的是,由于AWS Kinesis在接收数据后并不会主动向应用程序推送数据,因此我们需要在客户端不断地发出请求以达到实时获取数据的目的。为了达到这个效果,我们在上述代码中通过self.readStreamLoop(data[‘NextShardIterator’])不断地自调用readStreamLoop(shardIteratorId)方法进行实现。当然,如果用户对获取数据的实时性要求不高,完全可以通过一些定时的方法(如setInterval())来实现同样的效果。无论如何,保持这段代码轻量级对于减少CPU的负荷是百利而无一害的。

调用getRecords方法时需要传入配置参数组(这里为read_params)和设置好回调函数。配置参数组需要我们输入分片迭代器Id号ShardIterator和一次读取的限制个数Limit。在回调函数中,如果没有出现错误,我们可以通过数组data[‘Records’]得到这次获取的所有记录。另外,我们能够通过data[‘NextShardIterator’]以获取下一个分片迭代器的Id,从而做到不断的持续更新。

更多AWS-SDK中关于Kinesis的API调用方法与实例,请参考以下AWS文档:http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html

八.自建TVM服务器使用AWS STS服务获取临时证书

在上述第三部分的安全性问题中提到,把AWS Access Key ID和AWS Secret Access Key存放在html文件或js文件中会存在较大的安全问题,而且考虑到固定的AWS IAM安全证书的定期更新、防止黑客的窃取和攻击等问题,我们需要使用AWS 安全令牌服务Security Token Service(简称STS)颁发临时证书,通过限制这些临时安全证书的AWS服务访问权限和有效时间,从而解决上述存在的安全问题。

具体如何在AWS EC2上构建TVM服务器,请参阅亚马逊AWS官方博客中的《Token Vending Machine:移动应用客户端安全访问AWS服务的解决方案》一文。

在与自建TVM服务器交互以获取临时证书的过程中,您可以在TVM程序中加入自行维护的验证保护机制,例如要求用户提供在您所维护的验证机构中的验证信息。加入验证保护机制能够使用户有条件地获取临时证书,从而更好地保护您的资源。当然,个别的应用场景,例如对于临时访客,匿名地获取临时证书也是一种客户需求。

在这里,我们将重点讲述如何用Javascript获取证书,并把该证书缓存到cookies等操作。

Javascript从TVM服务器上获取临时证书

jquery的$.ajax()是一种与服务器交换数据的技术,它可以在不刷新整个页面的情况下向服务器发出请求并获取返回的数据。我们使用$.ajax()完成客户端向TVM服务器请求临时证书的操作,整个过程做到对用户透明。我们把这一部分封装在函数function getCertification()当中,代码如下所示:

//从TVM获取临时证书

function getCertification(){

    $.ajax({

        url:'getCertification.action',//默认在当前页地址发送请求

        type:'post',

        async: false,//使用同步请求

        data:"{}",//客户端发送到服务器端的数据

        dataType:'json',//预期服务器相应的数据类型

        success:function (data) {//成功返回的回调函数

            setCertificationFromCookies(data.accessKeyId, data.secretAccessKey, data.sessionToken, data.expiration);

        }

    });

}

需要注意的是,由于我们的请求页面恰好放置在TVM服务器中,所以我们只需要默认在当前页地址发送请求即可。对于TVM服务器和当前页面请求域名不相同的情况,则需要补全完整的url地址。除此之外,我们部署的TVM服务器返回的是json格式的证书数据,返回的数据格式为标准的json格式,如下所示:

{“accessKeyId”:”(20位长字符串)“,”secretAccessKey”:”(40位长字符串)“,”sessionToken”:”(712位长字符串)“,”expiration”:(从1970年1月1日到现在的毫秒数)}

因此,在$.ajax()成功返回的回调函数中,我们只需要用“data.关键字”就能够分别把上述四个证书属性提取出来。当这四个属性提取出来后,我们把它们分别作为函数function setCertificationFromCookies(accessKeyId, secretAccessKey, sessionToken, expiration)的四个参数,把它们保存到cookies当中。该函数具体代码如下所示:

//把证书写到Cookies当中

function setCertificationFromCookies(accessKeyId, secretAccessKey, sessionToken, expiration){

    var date = new Date(expiration);

    $.cookie('accessKeyId', accessKeyId, { expires: date, path: '/' });

    $.cookie('secretAccessKey', secretAccessKey, { expires: date, path: '/' });

    $.cookie('sessionToken', sessionToken, { expires: date, path: '/' });

    $.cookie('expiration', expiration, { expires: date, path: '/' });

}

可以看到,我们把证书的过期时间作为cookies的过期时间。

另外,我们在运行的过程中还需要检查该证书是否已经过期,具体代码如下:

//检查证书是否过期

function checkExpiration(){

    if($.cookie('expiration') == null || new Date() > new Date(parseInt($.cookie('expiration')))){

        return false;

    }else{

        return true;

    }

}

需要注意的是,由于临时证书的有效期限较短,因此建议在每次使用kinesis对象前,都需要把临时证书的expiration和当前的时间做对比,以防止临时证书的过期而导致的kinesis对象方法调用的失败。具体的代码如下:

//检查当前的Kinesis是否可用。原则上在每次使用Kinesis之前都应该检查一遍。为了加快相应速度,应该在页面加载的时候运行之以达到预加载证书的效果

function checkKinesis(){

    if(!checkExpiration()){

        getCertification();

        initKinesis();

    }else if(sessionToken == null){

        initKinesis();

    }

}

另外,为了减轻TVM服务器的负载,我们建议从TVM获取临时安全证书后,把临时安全证书保存在客户端当中,在有效期内重复使用。您可以选在Javascript变量或者cookies等方式进行保存。在上述说明中,我们使用cookies的方式对之进行保存。

九.使用AWS的托管服务Cognito获取临时证书

Amazon Cognito 是专为想要将用户管理和同步功能添加到其移动和 Web 应用程序的开发人员设计的。Cognito分为Federated Identities和User Pools两部分。Federated Identities能够让您创建一个用户身份认证服务,通过认证可以给客户颁发临时的、自定义权限的证书。而User Pools是一个可以安全存储用户资料属性的用户目录,您可以把包括登录和注册信息在内的用户信息保存在User Pools中统一管理。

类似于在自建TVM服务器中自行选择提供验证保护机制,Federated Identities允许用户提供认证和未认证两种方式,有差异地提供不同权限的临时证书。

在本节中,我们假设在某一个User Pool中保存了用户信息,并把该User Pool作为Identity的Authentication  provider。另外,我们在该Identity的Authenticated role中赋予上述第六部分的访问Kinesis的最低权限。这样从该Identity所获取到的临时证书就拥有访问Kinesis的权限。关于如何创建及配置identity pools和user pools,请参考AWS的官方文档(http://docs.aws.amazon.com/zh_cn/cognito/latest/developerguide/what-is-amazon-cognito.html)和AWS的官方微博(https://aws.amazon.com/cn/blogs/aws/category/amazon-cognito/)。

另外,当我们使用Javascript向User Pool认证身份的时候,要正常调用该API,需要使用jsbn.js、jsbn2.js、sjcl.js、moment.min.js、aws-cognito-sdk.min.js、amazon-cognito-identity.min.js等第三方或AWS额外的js包,具体的js包依赖及下载地址请查看AWS官方文档 http://docs.aws.amazon.com/zh_cn/cognito/latest/developerguide/setting-up-the-javascript-sdk.html

一切就绪后,我们就能够在前端通过User Pools认证身份获取用户的IdToken,然后用该IdToken向Identity Pools获取临时证书的accessKeyId、secretAccessKey、sessionToken。凭借该临时证书,我们就能够通过上述的第七部分访问Kinesis。具体从Cognito获取临时证书的javascript方法封装在下面的getAuthenticatedIdentity(username, password)方法当中:

//获取登录认证的临时证书

function getAuthenticatedIdentity(username, password){//传入参数为User Pools中的用户名和密码

    AWSCognito.config.region = 'us-west-2'; //所使用的Cognito服务的Region,这里假设为us-west-2

    var poolData = {

        UserPoolId : 'us-west-2_xxxxxxxxx',//填写User Pool的id

        ClientId : 'xxxxxxxxxxxxxxxxxxxxxxxxxx'//填写User Pool中对应App的client id

    };

    var userPool = new AWSCognito.CognitoIdentityServiceProvider.CognitoUserPool(poolData);

    var authenticationData = {

        Username : username,

        Password : password

    };

    var authenticationDetails = new AWSCognito.CognitoIdentityServiceProvider.AuthenticationDetails(authenticationData);

    var userData = {

        Username : username,

        Pool : userPool

    };

    var cognitoUser = new AWSCognito.CognitoIdentityServiceProvider.CognitoUser(userData);

    cognitoUser.authenticateUser(authenticationDetails, {

        onSuccess: function (result) {

            // Set the region where your identity pool exists

            AWS.config.region = 'us-west-2'; //所使用的Cognito服务的Region,这里假设为us-west-2

            // Configure the credentials provider to use your identity pool

            AWS.config.credentials = new AWS.CognitoIdentityCredentials({

                IdentityPoolId: 'us-west-2:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', //填写Identity Pool的id

                Logins: {

                    'cognito-idp.us-west-2.amazonaws.com/us-west-2_r12hRvjCr': result.getIdToken().getJwtToken()

                }

            });

            // Make the call to obtain credentials

            AWS.config.credentials.get(function(){

                // Credentials will be available when this function is called.

                accessKeyId = AWS.config.credentials.accessKeyId;

                secretAccessKey = AWS.config.credentials.secretAccessKey;

                sessionToken = AWS.config.credentials.sessionToken;

            });

        },

        onFailure: function(err) {

            alert(err);

        }

    });

}

需要注意的是,我们需要从前端把客户的用户名和密码传入到该js方法当中。cognitoUser根据该用户名、密码等参数从User Pool获取该客户的IdToken。特别地,该IdToken的有效期为一小时。当IdToken失效后,用户可通过RefreshToken获取新的IdToken。用户可以在创建User Pool的app的时候自定义RefreshToken的有效期(1~3600天,默认30天)。另外,从Identity Pool获取的临时证书有效期为一小时。同样地,用户可以通过Javascript变量或者cookies等方式保存临时证书,以达到复用的效果。

另外,在上述的红色代码中,我们特别需要注意是Provider Name的格式为:cognito-idp.<region>.amazonaws.com/<YOUR_USER_POOL_ID>,而Value值为User Pool ID。具体如何把Identity Pool和User Pool集成使用,可参考AWS的官方文档 http://docs.aws.amazon.com/zh_cn/cognito/latest/developerguide/amazon-cognito-integrating-user-pools-with-identity-pools.html

十.总结

AWS Kinesis服务能够在短时间内实现弹性扩展,帮助您实时处理大规模的流数据。而且Kinesis服务还极其灵活,能够从任何可以调用Web服务的地方收集数据。本文介绍了如何通过Javascript对Kinesis进行交互,除此之外,用户还能够通过AWS所提供的丰富的开发工具包实现兼容各种语言和平台的Kinesis交互客户端,使用户能够从多个渠道收集数据。

既然一切都准备就绪,那就出发吧,详细AWS Kinesis服务会给你带来不一样的惊喜。Good Luck!

作者介绍:

邓明轩

亚 马逊AWS解决方案架构师;拥有15年IT 领域的工作经验,先后在IBM,RIM,Apple 等企业担任工程师、架构师等职位;目前就职于AWS,担任解决方案架构师一职。喜欢编程,喜欢各种编程语言,尤其喜欢Lisp。喜欢新技术,喜欢各种技术 挑战,目前在集中精力学习分布式计算环境下的机器学习算法。

邱越俊

亚马逊AWS解决方案架构师实习生,擅长Web开发,熟悉使用Java、Javascript、Html5、Mysql数据库,曾在多个互联网公司从事软件平台开发工作,对计算机网络架构、云平台的开发和部署有一定的经验。