亚马逊AWS官方博客

Amazon DynamoDB Flink SQL 实时计算实践

1.背景

用户行为特征对于风控、精准营销等各个领域都有着重要的意义。通过对用户行为数据进行统计、分析,可以发现用户使用产品的规律,并将这些规律与网站的营销策略、产品功能、运营策略相结合,发现营销、产品和运营中可能存在的问题,解决这些问题就能优化用户体验、实现更精细和精准的运营与营销,让企业更好地服务于用户。

在风控领域,通过对用户行为数据进行分析,可以识别出异常行为,从而及时发现并防范风险。例如,在金融领域,可以通过对用户的交易行为进行分析,识别出异常交易行为,从而防范欺诈等风险。

在精准营销领域,通过对用户行为数据进行分析,可以了解用户的兴趣爱好、购买习惯等信息,从而实现更精准的广告投放和个性化推荐。例如,在电商领域,可以通过对用户的浏览、搜索、购买等行为进行分析,了解用户的需求和偏好,并根据这些信息推荐相关商品。实时特征计算和离线特征都有其各自的意义。离线特征是通过批处理计算出的特征,主要供离线程序使用。由于其性质,计算出这些类型的特征可能需要花费很多时间。而实时特征计算则是具有非常强时效性的,其查询计算的端到端延迟一般设定在几十毫秒的量级。实时特征的常见计算模式,是当事件发生时,基于从当前时间点往前推移的一个时间点,形成一个时间窗口,进行窗口内的相关聚合计算。相比之下,实时特征计算更加适合需要快速响应的场景,例如在线广告、推荐系统等。通过实时特征计算,可以更加精准地对用户进行定向广告投放和个性化推荐。而离线特征则更适合需要对历史数据进行分析和挖掘的场景,例如数据挖掘、机器学习等。

Apache Flink 是为分布式、高性能的流处理应用程序打造的开源流处理框架。Flink 不仅能提供同时支持高吞吐和 exactly-once 语义的实时计算,还能提供批量数据处理。Flink 主要由 Java 代码实现,支持实时流处理和批处理,批数据只是流数据的一个极限案例。Flink 支持了迭代计算,内存管理和程序优化。

在 Flink 中,可以使用 HBase 作为维表存储。HBase 是一个分布式的、面向列的开源数据库系统,可以在大规模集群上运行。此外,Flink 还可以使用 Redis 作为维表存储。Redis 是一个开源的内存数据结构存储系统,通常用作数据库、缓存和消息中间件。

在实时计算场景下,使用 Flink 结合 HBase 或 Redis 做维表关联计算可以提高计算效率和准确性。例如,在广告投放场景下,可以使用 Flink 结合 HBase 或 Redis 做维表关联计算,从而更加精准地对用户进行定向广告投放和个性化推荐。

Amazon DynamoDB 是一种全托管的 NoSQL 数据库服务,提供快速而可预测的性能,能够实现无缝扩展。DynamoDB 可以免除操作和扩展分布式数据库的管理工作负担,因而无需担心硬件预置、设置和配置、复制、软件修补或集群扩展等问题。DynamoDB 提供了加密静态,这可以消除在保护敏感数据时涉及的操作负担和复杂性。

2.整体架构介绍

2.1 Amazon DynamoDB 介绍

Amazon DynamoDB 是一个键-值和文档数据库,可利用水平扩展支持几乎任何大小的表。这使 DynamoDB 能够扩展到请求数超过 10 万亿条/天,峰值高于 2000 万条请求/秒,存储空间大于数 PB。交付具有一致的个位数毫秒性能、几乎无限的吞吐量和存储以及自动多区域复制的应用程序。通过静态加密、自动备份和恢复,以及高达 99.999% 可用性的 SLA 保证可靠性。

DynamoDB Accelerator(DAX)是一个内存内缓存,通过支持您使用完全托管的内存中的缓存来大规模地为您的表提供快速读取性能。借助 DAX,您的 DynamoDB 表可实现高达 10 倍的性能提升(读取时间从数毫秒缩短到数微秒),甚至在每秒处理的请求数达到数百万的情况下也是如此。利用 DynamoDB Accelerator 可以实现微秒级延迟。

通常情况下,DAX 集群运行时,复制该集群中所有节点之间的数据(假定已预置多个节点)。考虑一个使用 DAX 成功执行 UpdateItem 的应用程序。此操作会导致使用新值修改主节点中的项目缓存。然后,该值复制到集群中的所有其他节点。此复制具有最终一致性,并且通常只需不到一秒即可完成。DAX 无法自行处理强一致性读取,因为它未紧密耦合到 DynamoDB。因此,任何从 DAX 后续读取必须为最终一致性读取。任何后续强一致性读取将传递到 DynamoDB。

2.2 典型的 Flink 实时计算场景

使用 HBASE 作为维表的分布式存储库,底层存储依赖于 HDFS,挂载 Redis 提高查询性能。Flink 实时双写维表数据到 HBASE 及 Redis,并通过异步同步保证数据一致性。另一条实时计算流触发实时数据流于维表的计算,并将最终计算结果写入下游。整体上需要维护的组件较多,扩展及维护都需要投入资源。

本文实践内容旨在通过 Amazon DynamoDB 替换 HBASE 与 Redis,提出一种基于无服务化数据库实时计算的架构。接下来将重点介绍 Flink SQL 在 Amazon DynamoDB Connector 的实现。

2.3 Flink SQL Connector 实现

当前版本主要实现以下内容:

1)支持定义 primary key,Table Name,代码 hard code 到 AP_SOUTHEAST_1 区域;

2)支持 Source/Sink,Source 暂时只实现了 scan 模式。Sink 端支持了实时流式 append 写入,并支持 kafka、S3 等 Stream/Batch 数据源写入;

3)当前仅支持定义 String 类型数据格式。

2.4 DynamoDB Flink SQL Connector 使用说明

-- 创建表
CREATE TABLE IF NOT EXISTS test_database.test_dynamodb_table(   
field1 STRING,    
field2 STRING,    
field3 STRING)
WITH ( 'connector' = 'dynamodb-v2',  -- 使用Amazon v2版本sdk 
'table_name' = 'tableName',   -- 对应dynamodb 表名 
'primary_key' = 'primaryKey'  -- 主键必须定义的partition key)
-- 查询
select * from test_database.test_dynamodb_table;
-- 写入
insert into     test_database.test_dynamodb_table select * from     test_database_source.test_source_table   

2.5 DynamoDB Flink SQL Connector 实现说明

示例代码参考:https://github.com/SEZ9/flink-connector-dynamodb-1.13.2

项目整体结构

定义 DDL 接受参数

@Internal
public class DynamodbOptions {
    public static final ConfigOption<String> TABLE_NAME =
            ConfigOptions.key("table_name")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("The name of Dynamodb table to connect.");
    public static final ConfigOption<String> PRIMARY_KEY =
            ConfigOptions.key("primary_key")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("The name of Dynamodb table primary_key.");
}

实现 Sink 写数据

public class DynamodbDynamicTableSink implements DynamicTableSink {
    private final String tableName;
    private DynamodbTableSchema tableSchema;

    public DynamodbDynamicTableSink(
            String tableName,
            DynamodbTableSchema tableSchema
    ) {
        this.tableName = tableName;
        this.tableSchema = tableSchema;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        for (RowKind kind : changelogMode.getContainedKinds()) {
            // UPSERT mode
            if (kind != RowKind.UPDATE_BEFORE) {
                builder.addContainedKind(kind);
            }
        }
        return builder.build();
    }

    @Override
    public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
        DynamodbSinkFunction<RowData> sinkFunction =
                new DynamodbSinkFunction<> (
                        tableName,
                        tableSchema,
                        new RowDataToItemConverter(tableSchema)
                );
        return SinkFunctionProvider.of(sinkFunction);
}

实现 Source 读取数据

public class DynamodbSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
    private final String tableName;
    private DynamodbTableSchema tableSchema;
    private DeserializationSchema<RowData> deserializer;
    private DynamoDB dynamoDB;
    private Table table;
    private final int fieldLength;
    private volatile boolean isRunning = true;

    public DynamodbSourceFunction(String tableName, DynamodbTableSchema tableSchema) {
        this.tableName = tableName;
        this.tableSchema = tableSchema;
        this.fieldLength = tableSchema.getColumnNames().length;
    }

    @Override
    public TypeInformation<RowData> getProducedType() {
        return deserializer.getProducedType();
    }

    @Override
    public void run(SourceContext<RowData> sourceContext) throws Exception {
        while (isRunning) {
            AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard()
                    .withRegion(Regions.AP_SOUTHEAST_1)
                    .build();
            dynamoDB = new DynamoDB(client);
            table = dynamoDB.getTable(tableName);
            ItemCollection<ScanOutcome> items = table.scan();
            Iterator<Item> iter = items.iterator();
            while (iter.hasNext()) {
                Item item = iter.next();
                sourceContext.collect(convertToNewRow(item));
            }
        }

3.业务开发

3.1 创建维度表-dynamodb-写

DROP TABLE IF  EXISTS dynamodb_orders;
CREATE TABLE dynamodb_orders (
  order_id INT,            -- 	订单编号
  order_amount FLOAT,      -- 	订单金额
  coupon_code VARCHAR(255), -- 优惠券编码
  coupon_discount_amount FLOAT,  -- 优惠券优惠金额
  user_id INT,                   -- 用户ID
  order_status VARCHAR(255),  -- 订单状态
  created_at TIMESTAMP(3), -- 	创建时间
  updated_at TIMESTAMP(3),  --	更新时间
  product_id INT        --   商品编号
) PARTITIONED BY ( user_id )
WITH (
  'connector' = 'dynamodb',
  'table-name' = 'order_detail_latest_month',
  'aws.region' = 'us-east-1',
  'aws.credentials.basic.accesskeyid' =,
  'aws.credentials.basic.secretkey' = ''
);

3.2 创建实时事件表-kafka-读/写

%ssql

DROP TABLE IF  EXISTS kafka_orders;
CREATE TABLE IF NOT EXISTS kafka_orders (
  order_id INT,            -- 	订单编号
  order_amount FLOAT,      -- 	订单金额
  coupon_code VARCHAR(255), -- 优惠券编码
  coupon_discount_amount FLOAT,  -- 优惠券优惠金额
  user_id INT,                   -- 用户ID
  order_status VARCHAR(255),  -- 订单状态 1 创建 2 完成
  created_at TIMESTAMP(3), -- 	创建时间
  updated_at TIMESTAMP(3),  --	更新时间
  product_id INT        --   商品编号
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'order-group',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  'scan.startup.mode' = 'earliest-offset'
);
UDF 业务逻辑

%flink.pyflink
import boto3
from boto3.dynamodb.conditions import Key
class LatestMonthOrderSumFunction(ScalarFunction):

    def eval(user_id):
        dynamodb = boto3.resource('dynamodb',region_name='us-east-1')
        table_name = "order_detail_latest_month"
        table = dynamodb.Table(table_name)
       
        
        response = self.table.query(
            KeyConditionExpression=Key('user_id').eq(user_id)
        )

        month_order_amount = 0
        month_discount_amount = 0
        count = 0
        for item in response['Items']:
            count += 1
            month_order_amount += item['order_amount']
            month_discount_amount += item['coupon_discount_amount']

        return (str(count) + '_' +  str(month_order_amount) + '_' + str(month_discount_amount))


class LatestYearOrderSumFunction(ScalarFunction):

    def eval(user_id):
        dynamodb = boto3.resource('dynamodb',region_name='us-east-1')
        table_name = "order_sum_monthly"
        table = dynamodb.Table(table_name)
       
        
        response = self.table.query(
            KeyConditionExpression=Key('user_id').eq(user_id)
        )
        
        order_total = 0
        order_amount = 0
        discount_amount = 0

        for item in response['Items']:
            order_total += item['order_total']
            order_amount += item['total_amount']
            discount_amount += item['discount_amount']

        return (str(order_amount) + '_' + str(discount_amount))

# udf 最近一个月订单数_订单金额_订单折扣金额
st_env.register_function("latest_month_order_sum", udf(LatestMonthOrderSumFunction(), DataTypes.STRING(), DataTypes.STRING()))

# udf 最近一年(不含当月)订单总金额_订单总折扣金额
st_env.register_function("latest_year_order_sum", udf(LatestMonthOrderSumFunction(), DataTypes.STRING(), DataTypes.STRING()))

3.3 特征计算作业

%ssql

INSERT INTO user_features (
    user_id,
    total_orders_lastest_month,
    total_order_amount_latest_month,
    total_discount_amount_latest_month,
    total_orders_latest_year,
    total_order_amount_latest_year,
    total_discount_amount_latest_year
)
SELECT
    user_id,
    SPLIT_INDEX(latest_month_order_sum(user_id), '_' ,0) as total_orders_lastest_month,
    SPLIT_INDEX(latest_month_order_sum(user_id), '_' ,1)  as total_order_amount_latest_month,
    SPLIT_INDEX(latest_month_order_sum(user_id), '_' ,2)   as total_discount_amount_latest_month,
    SPLIT_INDEX(latest_month_order_sum(user_id), '_' ,0) + SPLIT_INDEX(latest_year_order_sum(user_id), '_' ,0)  as total_orders_latest_year,
    SPLIT_INDEX(latest_month_order_sum(user_id), '_' ,1) + SPLIT_INDEX(latest_year_order_sum(user_id), '_' ,1)  as total_order_amount_latest_year,
    SPLIT_INDEX(latest_month_order_sum(user_id), '_' ,2) + SPLIT_INDEX(latest_year_order_sum(user_id), '_' ,2)  as total_discount_amount_latest_year
    
FROM kafka_orders

4 总结

使用 Dynamodb 结合 Flink 进行实时计算的任务开发,可以极大的降低传统架构中需要维护重型组件如 Hbase 类的数据库,并且提供一致的请求性能。利用 Flink SQL 统一 connector 框架,可以快速构建自己的 connector 实现灵活的业务扩展。

参考文档

DynamoDB 数据一致性模型:https://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/DAX.consistency.html

DyanmoDB 功能特性:https://aws.amazon.com/cn/dynamodb/features/#Performance_at_scale

分布式键值存储 DynamoDB 实现原理:https://draveness.me/dynamo

分布式系统弱一致性模型和 DynamoDB 的设计思想:https://www.jianshu.com/p/4b6bc3b84cdc

本篇作者

张鑫

AWS 解决方案架构师,负责基于 AWS 云平台的解决方案咨询和设计,在系统架构、数仓和实时离线计算领域有丰富的研发和架构实践经验。