亚马逊AWS官方博客

Amazon S3 对象的 Amazon Kinesis Data Firehose 自定义前缀

Original URL:https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/

2019 2月,Amazon Web Services (AWS) 宣布了 Amazon Kinesis Data Firehose 的一项称为“Amazon S3 对象自定义前缀新功能。它允许客户为传输数据记录的 Amazon S3 对象的前缀指定自定义表达式。 之前,Kinesis Data Firehose 仅允许指定部分文字前缀。新支持的前缀可以与静态日期格式的前缀结合使用,以创建固定格式的输出文件夹。 客户要求具有灵活性,AWS 聆听客户意见并进行改进和交付此项服务。Kinesis Data Firehose 最常用于消费来自流处理源(如应用程序或物联网设备)的流式事件数据它消费了数据后,数据通常存储在数据湖中,因此可以处理并最终查询数据。  Amazon S3 上存储数据时,最佳实践是对相关数据进行分区或分组,并将它们存储在同一文件夹中。  这样就可以筛选分区数据并控制每个查询扫描的数据量,从而提高性能并降低成本。

分组数据的一个常用方法是按日期分组。  Kinesis Data Firehose 会根据日期自动对数据进行分组,并将其存储到 Amazon S3 上的相应文件夹中。  但是,Amazon S3 中的文件夹命名与 Apache Hive 命名规则不兼容。这使得使用 AWS Glue 爬网程序进行编目并使用大数据工具进行分析时变得比较复杂。

本博文讨论了一项新功能,该功能允许自定义 Kinesis Data Firehose 命名 Amazon S3 中输出文件夹的方式。它介绍了自定义前缀的工作原理、预期用例,并附带按步就搬的操作说明。

需要 Amazon S3 对象的自定义前缀

在以前,Kinesis Data Firehose 采用 YYYY/MM/DD/HH 格式创建基于UTC格式 的静态文件夹结构。然后,在将对象写入 Amazon S3 之前,将其添加到前缀。 例如,如果您提供的前缀为“mydatalake/”,则生成的文件夹层次结构将为“mydatalake/2019/02/09/13”  但是,为了与 Hive 命名规则兼容,文件夹结构应遵循“/partitionkey=partitionvalue”格式。 如果能使用此命名规则,可以通过 AWS Glue 爬网程序轻松编目数据,从而生成正确的分区名称。

也可以使用其他管理分区的方法,例如在 Amazon Athena 上运行 MSCK REPAIR TABLE 或在 Amazon EMR 上运行 Apache Hive,这样可以通过单个语句添加所有分区。 此外,您可以使用其他基于日期的分区模式,如“/dt=2019-02-09-13/”,而不是将日期扩展到文件夹中。  这将有助于减少随着表项目的增加而需要维护的S3分区总数。通过这种方式还可以简化范围查询。 提供指定自定义前缀的功能,无需额外的 ETL 步骤即可将数据放入正确的文件夹结构中,从而缩短洞察时间。

 

Amazon S3 对象的自定义前缀如何工作

注意:此新功能不允许使用来自事件数据中的任何日期或时间戳值,也不能在事件中使用任何其他任意值。Kinesis Data Firehose 使用名为 ApproximateArrivalTimestamp 的内部时间戳字段。 每个数据记录包括在流成功接收并存储记录时设置的 ApproximateArrivalTimestamp(采用 UTC)。这通常称为服务器端时间戳。Kinesis Data Firehose 根据配置的缓冲提示缓冲传入记录,并将它们传输到 Amazon S3 目标的 Amazon S3 对象。Amazon S3 中产生的对象可能包含多个记录,每个记录具有不同的 ApproximateArrivalTimestamp 在评估时间戳时,Kinesis Data Firehose 使用对象中最老的 ApproximateArrivalTimestamp记录。

借助 Kinesis Data Firehose,还可以在传输、AWS Lambda 转换或格式转换失败时将记录传送到不同错误输出位置。在以前,错误输出位置无法配置,根据传输失败的类型系统自动决定。在此版本中可以配置错误输出位置 (ErrorOutputPrefix)。这项新功能的一个好处是,可以将失败的记录分成日期分区文件夹,以便于再处理。

那么如何指定自定义前缀和 ErrorOutputPrefix? 可使用 : !{namespace:value} 形式的表达式,其中命名空间可以是 firehosetimestamp。可以是“random-string”或“error-output-type”(对于 firehose 命名空间),也可以是日期模式(对于 Java DateTimeFormatter 格式的时间戳命名空间)。在单个表达式中,您可以使用两个名称空间的组合,但 !{firehose: error-output-type} 只能在 ErrorOutputPrefix 中使用。有关更多信息和示例,请参阅 Amazon S3 对象的自定义前缀

使用 Kinesis Data Firehose 将流数据写入Amazon S3

本分步演示描述了如何使用 Hive 兼容文件夹结构,借助 Kinesis Data Firehose 将流数据写入Amazon S3  接着,它显示了 AWS Glue 爬网程序如何推断架构和提取我们在 Kinesis Data Firehose 中指定的正确分区名称,并在 AWS Glue 数据目录中对其进行编目。  最后,我们运行示例查询,确认能正确识别分区。

为了演示这一点,我们使用 python 代码生成示例数据。  我们还在 Kinesis Data Firehose 上使用 Lambda 转换来强制创建故障。这会演示数据如何被保存到错误输出位置。本分步演示所需的代码包含在 GitHub 中

对于本分步演示内容,这是我们正在构建的架构:

步骤 1:创建 Amazon S3 日志存储桶

创建一个 Kinesis Data Firehose 用于传输事件记录的 S3 存储桶。我们使用 AWS 命令行界面 (AWS CLI) 在美国东部(弗吉尼亚北部)区域创建 Amazon S3 存储桶。 请记得将示例中存储桶名称(以及您想使用的区域名字)替换为您自己的。

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

步骤 2:Lambda 转换(可选)

传入事件在事件负载中具有 ApproximateArrivalTimestamp 字段。  这足以在 Amazon S3 上创建适当的文件夹结构。  但是,在查询数据时,将此时间戳值公开为顶级列以便于过滤和验证可能更有利。  为此,我们创建了一个 Lambda 函数,它将 ApproximateArrivalTimestamp 添加为数据负载中的顶级字段。数据负载是 Kinesis Data Firehose 在 Amazon S3 中作为对象写入的内容。此外,Lambda 代码还会人为生成一些处理错误,这些错误会被传输到为传输目标指定的“ErrorOutputPrefix”位置,以说明“ErrorOutputPrefix”中表达式的使用。

为 Lambda 转换函数创建 IAM 角色

首先,为名为 LambdaBasicRole 的 Lambda 函数创建一个角色。 TrustPolicyForLambda.json 文件包含在 GitHub 存储库中。

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

创建角色后,将托管的 Lambda 基本执行策略附加到该角色。

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda 函数

要创建 Lambda 函数,请从 Python Kinesis Data Firehose 蓝图“General Firehose Processing”开始,然后对其进行修改。有关记录结构和必须返回的内容的更多信息,请参阅 Amazon Kinesis Data Firehose 数据转换

压缩 Python 文件,然后使用 AWS CLI 创建 Lambda 函数。CreateLambdaFunctionS3CustomPrefixes.json 文件包含在 GitHub 存储库中。

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

步骤 3.传输流

接下来,创建 Kinesis Data Firehose 传输流。 createdeliverystream.json 文件包含在 GitHub 存储库中。

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

在先前的配置中,我们在“ExtendedS3DestinationConfiguration”元素下定义了 Prefix 和 ErrorOutputPrefix。我们对“S3BackupConfiguration”元素进行了相同的定义。请注意,当“ProcessingConfiguration”元素设置为“禁用”时,“ExtendedS3DestinationConfiguration”元素的 ErrorOutputPrefix 参数仅出于一致性原因存在。否则没有意义。

我们选择了一个可使文件夹结构与 hive 风格的分区兼容前缀。这是我们使用的前缀:

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose 首先在 Amazon S3 存储桶下直接创建一个名为“fhbase”的基础文件夹。其次,它使用 Java DateTimeFormatter 格式计算表达式 !{timestamp:YYYY}、!{timestamp:MM}、!{timestamp:dd} 和 !{timestamp:HH} 的年、月、日和小时结果。例如,采用 UNIX 纪元时间的 1549754078390 的 ApproximateArrivalTimestamp 为 2019-02-09T16:13:01.000000Z(采用 UTC),则其计算结果为“year=2019”、“month=02”、“day=09”和“hour=16”。  因此,Amazon S3 中传输数据记录的位置的计算结果为“fhbase/year=2019/month=02/day=09/hour=16/”。

同样,ErrorOutputPrefix“fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/”会使用名为为“fherroroutputbase”的文件夹直接在 S3 存储桶下。表达式 !{firehose:random-string} 的计算结果为 11 个字符的随机字符串,如“ztWxkdg3Thg”  如果在同一表达式中多次使用它,则每个实例都会评估为一个新的随机字符串。表达式 !{firehose:error-output-type} 的计算结果为以下之一:

  1. “processing-failed”(对应 Lambda 转换传输失败)
  2. “elasticsearch-failed”(对应 Amazon ES 目标传输失败)
  3. “splunk-failed”(对应 Splunk 目标交付失败)
  4. “format-conversion-failed”(对应数据格式转换失败)

因此,包含 Lambda 转换的传输失败记录的Amazon S3 对象的位置的计算结果为:fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/。

您可以运行 aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample 来描述创建的传输流。

接下来,为传输流启用静态加密:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

使用 AWS 控制台创建传输流

  1. 选择来源。对于此示例,我使用 Direct PUT
  2. 选择是否要使用 Lambda 转换转换传入记录。选择启用,并选择了之前创建的 Lambda 函数的名称。

  1. 选择目标。选择Amazon S3 目标。

  1. 选择 Amazon S3 存储桶。选择之前创建的 Amazon S3 存储桶。

  1. 指定 Amazon S3 前缀和 Amazon S3 错误前缀。这对应于之前在 AWS CLI 输入 JSON 的上下文中解释的“Prefix”和“ErrorOutputPrefix”。

  1. 选择是否要将原始(转换前)记录备份到另一个 Amazon S3 位置。我选择了启用并指定了相同的存储桶(您可以选择不同的存储桶)。我还从转换后记录中指定了不同的前缀 – 基础文件夹不同,但下面的文件夹结构相同。这样可以更有效地使用AWS Glue 爬网程序对此位置进行爬网,或者在 Athena 或 Redshift Spectrum 中创建指向此位置的外部表。

  1. 指定 Amazon S3 目标的缓冲提示。我选择了1 MB 和 240 秒。
  2. 选择 S3 压缩和加密设置。我没有为转换后记录的位置选择压缩。我选择使用服务托管的 AWS KMS 客户主密钥 (CMK) 加密静态的 Amazon S3 位置。
  3. 选择是否要启用 Error Logging in Cloudwatch(Cloudwatch 中的错误记录)。我选择了 启用
  4. 指定您希望 Kinesis Data Firehose 承担的代表您访问资源的 IAM 角色。选择新建选择以显示新屏幕。选择创建新的 IAM 角色,为该角色命名,然后选择允许
  5. 选择创建传输流

现已创建并激活传输流。您可以向其发送事件。

 用示例数据测试

我使用 Python 代码生成示例数据。生成数据的结构如下:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

用于生成数据并将其推送到 Kinesis Data Firehose 的示例代码包含在 GitHub 存储库中。

在您开始向 Kinesis Data Firehose 传输流发送事件后,对象应该开始出现在 Amazon S3 中指定的前缀下。

我希望说明 Lambda 调用错误以及 Lambda 转换错误的 ErrorOutputPrefix 位置中的文件外观。因此,我没有授予“firehose_delivery_role”调用我的 Lambda 函数的权限。以下文件显示在 ErrorOutputPrefix 指定的位置。

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

这是我之前提到的错误文件内容的片段。

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied.Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

在我给予“firehose_delivery_role”相应的权限后,数据对象显示在为 Amazon S3 目标指定的“前缀”位置。

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

此外,由于我的 Lambda 转换中的Lambda 代码将 10% 的记录设置为失败状态,因此以下内容显示在 Lambo 转换错误的 ErrorOutputPrefix 位置。

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

以下是错误文件内容的片段:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

您现在可以创建 AWS Glue 爬网程序。有关使用 AWS Glue 数据目录的更多信息,请参阅填充 AWS Glue 数据目录

  1. 在 AWS Glue 控制台中,转到爬网程序,然后选择添加爬网程序

  1. 添加有关爬网程序的信息,然后选择下一步
  2. 在“包含路径”中,指定您在 Amazon S3 目标下输入的 Amazon S3 存储桶名称。同时包括创建 Kinesis Data Firehose 传输流时使用的静态前缀。不包括自定义前缀表达式。
  3. 选择下一步

  1. 选择下一步、否、下一步
  2. 指定 AWS Glue 将使用的 IAM 角色。我选择创建一个新的 IAM 角色。选择下一步
  3. 指定运行爬网程序的日程。我选择按需运行。选择下一步
  4. 指定爬网程序添加已爬网和已发现表的位置。我选择了默认数据库。选择下一步

  1. 选择完成
  1. 已创建爬网程序并准备运行。选择运行爬网程序

  1. 在 AWS Glue 控制台中,转到。您可以看到,已使用基础文件夹的名称创建了一个表。选择 fhbase

爬网程序已发现并填充了表及其属性。

您可以看到发现的架构。爬网程序已根据前缀表达式指定的文件夹结构标识并创建了分区。

打开 Amazon Athena 控制台,从下拉菜单中选择默认数据库。在 New query1 窗口中编写以下查询,然后选择 运行查询

SELECT * FROM “default”.”fhbase”

where year = ‘2019’ and day = ’12’ and hour = ’17’

order by approxarrtimestamputcfh desc

请注意,Amazon Athena 将 fhbase 表识别为分区表。查询可以利用查询中的分区来筛选结果。

小结

如本文所示,Amazon S3 对象的自定义前缀为自定义文件夹结构提供了很大的灵活性,其中 Kinesis Data Firehose传输 Amazon S3 中的数据记录和失败记录。控制 Amazon S3 中的文件夹结构和命名可简化数据发现、编目和访问。因此,它有助于更方便地进行深入了解,并帮助您更好地管理查询成本。

 


关于作者

Rajeev Chakrabarti 是 Kinesis 专业解决方案架构师