亚马逊AWS官方博客

亚马逊云科技Flink计算引擎使用指南

一、前言

亚马逊云科技对Flink计算引擎在产品形态上提供了全面的支持。Amazon EMR on EC2,Amazon EMR on EKS,Amazon Managed Service for Apache Flink这三个产品都支持Flink计算引擎,客户可以根据自己的场景选择最适合的服务来运行Flink任务。本文内容会着重介绍Amazon EMR on EC2和Amazon Managed Service for Apache Flink的使用指南,包括作业的提交、监控方案、Autoscaler、Iceberg集成。目的是帮助客户快速上手使用这两个服务。对于Amazon EMR on EKS Flink我们提供了详细的Workshop,可以点击这里访问Flink on EKS动手实验

二、EMR on EC2 Flink使用指南

2.1 AutoScaler说明

Apache Flink在1.18版本之后对AutoScaler做了增强支持in-place scaling support, 在EKS上可以直接集成使用,但在ON YARN上只提供了一个Standalone的包,并不能满足生产要求。EMR on EC2的Flink对AutoScaler做了产品级别的集成,方便客户直接配置使用。对于使用AutoScaler这里做几点说明:

  • 应该使用EMR 7.x+, Flink 1.18+版本,因为1.18可以做in-place作业重启升级不用执行完整的升级流程(不用先savepoint再启动,直接从checkpoint restore),缩短作业调整并行度后的重启时间。
  • EMR的AutoScaler是专门优化过的,且集成到flink的runtime中(flink-dist,org/apache/flink/runtime/scheduler/autoscaler/),这是开源Flink不具备的.
  • 如果要尽快在缩容底层EC2资源,将yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs减少,默认是3600秒,如果节点进入decommission状态但是有container在EC2上运行,3600秒后才强制终止。注意调整这个值对作业稳定性有影响,请在在快速缩减资源和作业SLA之间做权衡。

2.2 创建Session

  • 下面以Flink session+Flink-CLI模式为列做使用说明,注意Flink生产使用模式建议使用application模式提交作业,session和per-job模式已经不再是推荐的使用模式。 这里使用flink cli只是为了方便说明,使用其它模式时下面的配置AutoScaler参数是一致的。EMR AutoScaler支持的参数配置在这里AutoScaler Configurations
  • 我们为Flink作业设定的目标利用率job.autoscaler.target.utilization, AutoScaler会尽可能保证作业在没有背压延迟的条件下,通过调整并行度,来满足设定的目标利用率。
checkpoints=s3://your-buckt-name/flink/checkpoints/datagen/
flink-yarn-session -jm 1024 -tm 4096 -s 2  \
-D state.backend=rocksdb \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=${checkpoints} \
-D execution.checkpointing.interval=60000 \
-D state.checkpoints.num-retained=5 \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D state.backend.incremental=true \
-D execution.checkpointing.max-concurrent-checkpoints=1 \
-D rest.flamegraph.enabled=true \
-D job.autoscaler.metrics.window=60s  \
-D job.autoscaler.decision.interval=10s  \
-D job.autoscaler.enabled=true  \
-D job.autoscaler.stabilization.interval=60s  \
-D jobmanager.scheduler=adaptive  \
-D job.autoscaler.debug.logs.interval=60s  \
-D job.autoscaler.vertex.max-parallelism=200 \
-D pipeline.max-parallelism=300 \
-D job.autoscaler.target.utilization.boundary=0.1 \
-D job.autoscaler.target.utilization=0.5 \
-d \
-t /etc/hive/conf/hive-site.xml

2.3 提交作业

  • 下面是启动Flink sql-client 指定到之前创建的flink session集群,同时关闭operator-chaining方便查看每个算子的并行度,注意不要在生产环境关闭,会影响性能。
# 这是使用flink sql client写SQL提交作业  
# 启动client, 这里的application_xxxx,指的是上一步flink session启动后yarn的application id
  
/usr/lib/flink/bin/sql-client.sh -s application_xxxx

# 关闭operator-chaining,方便查看,生产不要关闭
set pipeline.operator-chaining.enabled=false;
set sql-client.execution.result-mode=tableau;


CREATE TABLE datagen_01 (
  `distinct_id` INT,
  `address` VARCHAR(2147483647),
  `region` VARCHAR(2147483647),
  `city` VARCHAR(2147483647),
  `info` VARCHAR(2147483647),
  `name` VARCHAR(2147483647),
  `age` INT,
  `model` VARCHAR(2147483647),
  `price` INT,
  `recv_time` TIMESTAMP(3)
) WITH (
  'fields.distinct_id.max' = '100000000',
  'connector' = 'datagen',
  'fields.distinct_id.min' = '1',
  'number-of-rows' = '100000000'
);



# 注意替换S3路径
CREATE TABLE  flink_s3_sink(
  distinct_id int,
  address string,
  region string,
  city string,
  info string,
  name string,
  age int,
  model string,
  price int,
  recv_time   TIMESTAMP(3),
  logday VARCHAR(255),
  hh VARCHAR(255)
)PARTITIONED BY (`logday`,`hh`)
WITH (
  'connector' = 'filesystem',
  'path' = 's3://your-s3-bucket/flink_s3_write/',
  'format'='csv'
);

insert into  flink_s3_sink select  * ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd') as logday, DATE_FORMAT(CURRENT_TIMESTAMP, 'hh') as hh from  datagen_01;

2.4 观察AutoScaler

  • 可以通过观察jobmanager日志和webui来看到AutoScaler的过程
  • 在EMR on EC2上使用Iceberg,只需在集群开启Iceberg配置即可, 启用方式在这里EMR Iceberg
  • Flink 写Iceberg目前只有MOR模式,没有COW模式,即便显示配置COW也不会生效,这一点要注意。
  • Iceberg表如果使用upsert模式,分区键必须包含主键,Iceberg的compaction操作原则上是必须要做的,合并小文件提升查询性能。 尤其在upsert模式下,如果没有compaction流程,在大量写入upsert数据的场景下,查询性能表现会比较差,因为写入是MOR所以写入并不会有明显的瓶颈。
  • 我们使用Glue Catalog结合Flink写入Iceberg表做个例子, Flink Session和Flink Client创建
checkpoints=s3://your-bucket/flink/checkpoints/datagen/

flink-yarn-session -jm 1024 -tm 4096 -s 2  \
-D state.backend=rocksdb \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=${checkpoints} \
-D execution.checkpointing.interval=60000 \
-D state.checkpoints.num-retained=5 \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D state.backend.incremental=true \
-D execution.checkpointing.max-concurrent-checkpoints=1 \
-D rest.flamegraph.enabled=true \
-d \
-t /etc/hive/conf/hive-site.xml 

# 这是使用flink sql client写SQL提交作业  
# 启动client, 这里的application_xxxx,指的是上一步flink session启动后yarn的application id
/usr/lib/flink/bin/sql-client.sh -s application_xxxx
# result-mode
set sql-client.execution.result-mode=tableau;
# set default parallesim 
set 'parallelism.default' = '32';
  • 创建glue catalog,在glue catalog中创建flink iceberg database
CREATE CATALOG glue_catalog WITH (
   'type'='iceberg',
   'warehouse'='s3://you-bucket/flink-iceberg/',
   'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
   'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
 );

USE CATALOG  glue_catalog;

CREATE DATABASE IF NOT EXISTS flink_iceberg_db;

USE flink_iceberg_db;
  • Datagen生产测试数据写入Iceberg表
drop table if exists default_catalog.default_database.datagen_tb_01;
CREATE TABLE default_catalog.default_database.datagen_tb_01 (
    distinct_id bigint,
    address string,
    region string,
    city string,
    info string,
    name string,
    age int,
    model_type string,
    price int,
    recv_time   TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'fields.distinct_id.min' = '1',
  'fields.distinct_id.max'= '1000000000',
  'rows-per-second'='10000'
);

CREATE TABLE  iceberg_tb_01(
  distinct_id bigint,
  address string,
  region string,
  city string,
  info string,
  name string,
  age int,
  model_type string,
  price int,
  recv_time   TIMESTAMP(3),
  logday VARCHAR(255),
  hh VARCHAR(255),
  PRIMARY KEY (distinct_id) NOT ENFORCED
  PARTITION BY (logday, hh)
)WITH (
  'write.format.default' = 'parquet',
  'write.target-file-size-bytes' = '536870912',
  'write.upsert.enabled' = 'false',
  'write.metadata.delete-after-commit.enabled' = 'true',
  'write.metadata.previous-versions-max' = '3',
  'write.parquet.compression-codec' = 'zstd'
);


insert into  iceberg_tb_05 select  * ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd') as logday, DATE_FORMAT(CURRENT_TIMESTAMP, 'hh') as hh from  default_catalog.default_database.datagen_tb_01 ;
  • 下图是Flink写入的Iceberg,append模式下的截图。当前配置下写入速度35w/s. 在Upsert模式下的写入速度也很快因为是MOR写,但是如果没有compaction,直接查询表速度会比较慢。

2.7 Glue Catalog Iceberg Auto Optimization

  • Glue提供了对于Iceberg表自动维护管理的功能,包括compaction,Snapshot retention,Orphan file deletion. 当使用Glue Catalog作为元数据管理时,对于格式是Iceberg的表类型,可以在Glue中通过启用Iceberg表优化,自动帮您完成Iceberg表运维管理工作。

2.8 EMR on EC2 Flink作业监控

  • 在EMR on EC2上对于系统指标的监控,比如CPU,内存,网络,磁盘等,EMR 7.0版本之后集成了cloudwatch agent,开启之后这些指标会自动发送到cloudwatch. 此外cloudwatch agent 还支持配置hdfs,yarn,hbase相关服务的JMX的监控指标。 但是对于Flink作业本身的metrics的监控,比如Flink作业的状态,restart次数,背压等,因为是在作业层面,EMR并不提供作业级别的Metrics. 因此如果果要对每个Flink作业做监控,我们可以使用如下两种方式,1. YARN Flink Rest API 2. Flink Prometheus Exporter。Rest API 方式更加简单,yarn 应用指标通过http://master_ip:8088/ws/v1/cluster/apps/application_id 获取,flink job 指标通过yarn代理过去的flink restapi 获取http://application_master_id:20888/proxy/application_id/jobs/job_id 这里的application_master_id 通过yarn的rest api可以获取到ApplicationMaster地址。 相关截图如下:
  • 通过这两个restapi可以获取到所有flink rest api 支持的全部指标查看所有Rest指标。有了这两个获取指标的URL,接下来如果要开发一个根据这两个URL展示的指标转换为Prometheus接口的数据的程序,这种开发操作,完全不用人工从头开发,只需要借助AI变成工具Kiro就可以帮你完成开发,您也可以直接在EC2或者EMR Master节点上安装Kiro CLI ,通过CLI开发,这样因为网络环境是通的,可以让他帮你直接开发调试,指导程序符合逾期。您可以去尝试一下,一定会有意外的惊喜,注意请在测试环境中调试开发。
  • 对于使用Prometheus Exporter收集指标,这也是Flink官方提供的一种方式,需要安装Prometheus Pushgateway,启动作业配置发送或者全局配置发送Metrics. 下面代码是一个例子,供参考。
    # 安装pushgateway
    sudo mkdir -p /opt/app/prometheus-push-gateway
    cd /tmp && wget https://github.com/prometheus/pushgateway/releases/download/v1.11.1/pushgateway-1.11.1.linux-amd64.tar.gz
    cd /tmp && tar -xzf pushgateway-1.11.1.linux-amd64.tar.gz && sudo mv pushgateway-1.11.1.linux-amd64/* /opt/app/prometheus-push-gateway/
    sudo tee /etc/systemd/system/pushgateway.service > /dev/null << 'EOF'
    [Unit]
    Description=Prometheus Push Gateway
    After=network.target
    
    [Service]
    Type=simple
    User=root
    ExecStart=/opt/app/prometheus-push-gateway/pushgateway --web.listen-address=:9091
    Restart=always
    RestartSec=5
    
    [Install]
    WantedBy=multi-user.target
    EOF
    sudo systemctl daemon-reload && sudo systemctl enable pushgateway && sudo systemctl start pushgateway
    sudo systemctl status pushgateway
    curl -s http://localhost:9091/metrics | head -5
    rm -f /tmp/pushgateway-1.11.1.linux-amd64.tar.gz && rm -rf /tmp/pushgateway-1.11.1.linux-amd64
    • flink prometheus metric reporter是不能获取到yarn的application id的,只能获取到flink job id. 所以无法自爱grafana上直接跳转到webui, 有一个折中的办法,可以在启动作业的时候指定webui端口,然后这个配置也传递到metric,这样就可以通过job manager 地址和port直接访问的flink webui了,如下配置。
    • 但这个方法需要注意每个作业的port需要不一样,默认这个port是随机的,我们手动指定后每个作业的port,每个作业需要不同,因为不同作业的job manager会调度到同一台机器,所以端口要不同,这个对么个作业配置就可以。 同时咱们网络要能访问emr各个节点,就可以直接访问了,本地可以可以ssh 动态端口转发
     

    # 配置了rest.bind-port 33089, metrics.reporter.promgateway.groupingKey="webui_port=33089;xxx"
    flink run-application \
    -Dyarn.application-attempts=2 \
    -Dyarn.application.name="TopSpeedWindowing_WITH_UI" \
    -Dmetrics.reporter.promgateway.factory.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory \
    -Dmetrics.reporter.promgateway.hostUrl="http://172.31.23.80:9091" \
    -Dmetrics.reporter.promgateway.jobName=TopSpeedWindowing_WITH_UI \
    -Dmetrics.reporter.promgateway.randomJobNameSuffix=true \
    -Dmetrics.reporter.promgateway.deleteOnShutdown=false \
    -Drest.bind-port="33089" \
    -Dmetrics.reporter.promgateway.groupingKey="webui_port=33089;my_env=product_ui;my_cluster=emr_flink" \
    -Dmetrics.reporter.promgateway.interval="30  SECONDS" \
    -Dpipeline.name="TopSpeedWindowing_WITH_UI" \
    -t yarn-application /usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
    
    # 在prometheus中配置relable config就可以把host:port拼接在一起, 之后就可以点击跳转了
      - job_name: 'pushgateway'
        static_configs:
          - targets: ['localhost:9091']
        metric_relabel_configs:
          - source_labels: [host, webui_port]
            regex: 'ip_(\d+)_(\d+)_(\d+)_(\d+)_.*;(\d+)'
            target_label: flink_web_ui
            replacement: '${1}.${2}.${3}.${4}:${5}'
        honor_labels: true
        
    
    
                      
    注意修改完配置,restart prometheus就可以,或者动态加载,这个是Prometheus的配置,不是pushgateway
    • 再提示您一下,指标到了Prometheus,你想要开发Dashboard, 也不用自己开发Kiro CLI可以直接帮你生成你想要的dashbaord创建好,你给到Kiro CLI Grafnan的地址和访问权限,他就可以帮你自动创建了。

三、MSF Flink使用指南

3.1 MSF Flink说明

关于MSF的基础知识本文不再赘述,本文会以一个python flink使用MSF写Iceberg的例子来做一个使用说明。对于MSF这里说明两个重要点。1. MSF的运维相比EMR on EC2 Flink的运维是要轻很多的。比如Metrics指标,日志这些都会自动发送到Cloudwatch,也不需要管理底层资源。2. MSF的成本相比EMR on EC2 Flink在大部分场景下从列表价看是要高的,但如果您的Flink作业较少,在EMR on EC2上开启高可用需要3个Master节点这会带来成本的增加,而在MSF上本身作业就是支持高可用且支持跨AZ容灾的,EMR on EC2只支持单AZ,同时是安装作业指定的KPU(1C/4GB)来计费的,因此这种情况下,MSF的成本并不一定比EMR on EC2高。

  • 下面是python flink消费MSK(Kafka)数据Sink Iceberg的程序,可以看到和我们平时开发flink作业没有区别,对于使用MSF而言,最主要的是开发和调试相对不是特别遍历,可以使用python flink local模式调试,调试完毕后再部署到MSF。也有客户会选择EMR on EC2作为开发环境调试环境,MSF作为生产环境。
  • 使用MSF时, pyflink相关的依赖jar,比如iceberg,kafka 等,都需要maven 编译打包到zip中使用,使用udf需要python的额外库,可以在添加requirenments.txt。可以参考这里
  • 对于下面例子完整的MSF的作业API提交,local调试的相关代码和说明可以点击这里查看MSF-PYTHON-Flink
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
import sys

def main():
    """
    Flink MSK to Iceberg Job
    从Kafka MSK读取数据并进行窗口聚合,输出到Iceberg表
    """
    # Iceberg配置
    iceberg_catalog_name = "iceberg_catalog"
    iceberg_database_name = "default_db"
    iceberg_table_name = "msk_stats"
    iceberg_warehouse_path = "s3://your-bucket/tmp/msf-test/"
    kafka_server="b-1.xxxx.kafka.ap-southeast-1.amazonaws.com:9092"
    kafka_topic="test"

    # 创建表环境
    env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
    table_env = TableEnvironment.create(env_settings)

    # 启用检查点,间隔60秒
    table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "60s")

    # 创建Kafka源表SQL
    create_source_table = f"""
        CREATE TABLE msk_source (
            id BIGINT,
            username STRING,
            proc_time AS PROCTIME()
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{kafka_topic}',
            'properties.bootstrap.servers' = '{kafka_server}',
            'format' = 'json',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true',
            'scan.startup.mode' = 'latest-offset'
        )
    """

    # 创建Iceberg catalog
    create_catalog_sql = f"""
        CREATE CATALOG {iceberg_catalog_name} WITH (
            'type' = 'iceberg',
            'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
            'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
            'warehouse' = '{iceberg_warehouse_path}'
            )
    """

    # 创建数据库SQL
    create_database_sql = f"CREATE DATABASE IF NOT EXISTS `{iceberg_database_name}`"

    # 创建Iceberg表SQL
    create_iceberg_table_sql = f"""
        CREATE TABLE IF NOT EXISTS `{iceberg_catalog_name}`.`{iceberg_database_name}`.`{iceberg_table_name}` (
            window_start TIMESTAMP(3),
            window_end TIMESTAMP(3),
            record_count BIGINT,
            unique_users BIGINT,
            created_at TIMESTAMP(3))
        PARTITIONED BY (window_start)
        WITH (
            'write.format.default' = 'parquet',
            'write.target-file-size-bytes' = '536870912',
            'write.upsert.enabled' = 'false',
            'write.metadata.delete-after-commit.enabled' = 'true',
            'write.metadata.previous-versions-max' = '3',
            'write.parquet.compression-codec' = 'zstd'
        )
    """

    # 数据处理和聚合SQL
    insert_sql = f"""
        INSERT INTO `{iceberg_catalog_name}`.`{iceberg_database_name}`.`{iceberg_table_name}`
        SELECT
            TUMBLE_START(proc_time, INTERVAL '1' MINUTE) as window_start,
            TUMBLE_END(proc_time, INTERVAL '1' MINUTE) as window_end,
            COUNT(*) as record_count,
            COUNT(DISTINCT username) as unique_users,
            CURRENT_TIMESTAMP as created_at
        FROM default_catalog.default_database.msk_source
        GROUP BY TUMBLE(proc_time, INTERVAL '1' MINUTE)
    """

    try:
        # 执行SQL语句
        print("创建Kafka源表...")
        table_env.execute_sql(create_source_table)
        print("创建Iceberg catalog...")
        table_env.execute_sql(create_catalog_sql)
        print("使用catalog...")
        table_env.execute_sql(f"USE CATALOG `{iceberg_catalog_name}`;")
        print("创建数据库...")
        table_env.execute_sql(create_database_sql)
        print("使用数据库...")
        table_env.execute_sql(f"USE `{iceberg_database_name}`;")
        print("创建Iceberg表...")
        table_env.execute_sql(create_iceberg_table_sql)
        print("开始数据处理...")
        table_result = table_env.execute_sql(insert_sql)
        print("Flink作业已启动,正在处理数据...")
        # 等待作业完成,保持程序运行
        table_result.wait()
    except Exception as e:
        print(f"作业执行失败: {e}", file=sys.stderr)
        raise e

if __name__ == "__main__":
    main()

四、总结

本文内容说明了Flink引擎在亚马逊云科技上使用的最佳实践。亚马逊云科技提供了对Flink引擎的全面支持,可以满足您在不同场景的需求。Amazon EMR on EC2 Flink提供了最灵活可控的Flink运行时,Amazon Managed Service for Apache Flink提供了Serverless运行时,可以大幅度减少对Flink作业的运维,同时可以做到资源的弹性扩缩。而EMR on EKS Flink对于K8S技术栈的客户提供了便利支持。需要强调的是亚马逊云科技的Flink相比开源的Flink在AutoScaler的能力上做了扩展和增强,无论在EMR on EC2,EMR on EKS你都可以体验AutoScaler的功能带来的优势和成本节省。

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

潘超

亚马逊云科技数据分析解决方案架构师。负责客户大数据解决方案的咨询与架构设计,在开源大数据方面拥有丰富的经验。工作之外喜欢爬山。

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用