亚马逊AWS官方博客

使用 Amazon EKS 轻松运行 Flink 作业

Kubernetes 是目前最流行的用于自动部署,扩展和管理容器化应用程序的开源系统。我们看到越来越多的用户开始使用 Kubernetes 来作为企业容器应用部署平台。 Amazon Elastic Kubernetes Service (Amazon EKS) 是一项完全托管的 Kubernetes 服务。相比基于 EC2 自建 Kubernetes ,EKS有高可用、安全、支持AWS Fargate 无服务器计算资源等优势。

目前有大量的客户在使用 Kubernetes 部署Web服务。同时,我们也看到越来越多的客户使用Kubernetes 部署大数据、机器学习等应用。在大数据计算领域,Spark分布式框架以其优秀的处理性能,经过多年的迭代和进化已被大众广泛接受和应用。最近两年Apache的另一个分布式处理框架 Flink,在社区中的关注度持续上升,它集众多优点于一身,包括快速、可靠、完全兼容Hadoop、使用简便、表现卓越。Flink 与Spark 一样也可以部署在 Kubernetes 平台中运行,我们就可以得到一个健壮和高可扩的数据处理应用,享受容器化的优势。

本文将向大家介绍如何将 Flink 运行在AWS EKS之上,并与Amazon S3结合使用体验计算与存储分离,为您构建数据湖迈出第一步。本文使用AWS Cloud9 作为开发环境完成本次实验,您也可以使用个人电脑或者AWS EC2来完成本文内容。简要内容如下:

  • 构建 Flink 容器镜像
  • 使用 AWS ECR 管理镜像
  • 整理 Flink 集群部署文件
  • 启动集群并运行任务
  • 监控任务运行过程

一、准备工作

如果使用 cloud9 进行实验,可以忽略1 和2 过程。

二、 构建 Flink 容器镜像

Apache Flink 官方已经提供了构建好的标准的 Flink 镜像。由于我们需要使用Amazon S3 来作为数据输入源,需要自定义容器镜像。

1. 下载并编辑Dockerfile

从以下地址下载官方标准版 Dockerfile 和 entrypoint 文件:

https://github.com/apache/flink-docker/tree/8aafb8413a9675ebbe74fe3e5d22141f26922977/1.11/scala_2.12-java8-debian

下载 Dokerfile 和 docker-edntrypoint.sh 文件

mkdir flink; cd flink

默认 Flink 不支持读写 Amazon S3,为了让 Flink 能正常访问Amazon S3,需要简单修改Dockerfile文件。修改后的内容如下:

 

# Grab gosu for easy step-down from root
ENV GOSU_VERSION 1.11
RUN set -ex; \
wget -nv -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture)"; \
wget -nv -O /usr/local/bin/gosu.asc "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture).asc"; \
export GNUPGHOME="$(mktemp -d)"; \
for server in ha.pool.sks-keyservers.net $(shuf -e \
hkp://p80.pool.sks-keyservers.net:80 \
keyserver.ubuntu.com \
hkp://keyserver.ubuntu.com:80 \
pgp.mit.edu) ; do \
gpg --batch --keyserver "$server" --recv-keys B42F6819007F00F88E364FD4036A9C25BF357DD4 && break || : ; \
done && \
gpg --batch --verify /usr/local/bin/gosu.asc /usr/local/bin/gosu; \
gpgconf --kill all; \
rm -rf "$GNUPGHOME" /usr/local/bin/gosu.asc; \
chmod +x /usr/local/bin/gosu; \
gosu nobody true


# Configure Flink version
ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz \
FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz.asc \
GPG_KEY=6B6291A8502BA8F0913AE04DDEB95B05BF075300 \
CHECK_GPG=true


# Prepare environment
ENV FLINK_HOME=/opt/flink
ENV PATH=$FLINK_HOME/bin:$PATH
RUN groupadd --system --gid=9999 flink && \
useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink
WORKDIR $FLINK_HOME


# Install Flink
RUN set -ex; \
wget -nv -O flink.tgz "$FLINK_TGZ_URL"; \
\
if [ "$CHECK_GPG" = "true" ]; then \
wget -nv -O flink.tgz.asc "$FLINK_ASC_URL"; \
export GNUPGHOME="$(mktemp -d)"; \
for server in ha.pool.sks-keyservers.net $(shuf -e \
hkp://p80.pool.sks-keyservers.net:80 \
keyserver.ubuntu.com \
hkp://keyserver.ubuntu.com:80 \
pgp.mit.edu) ; do \
gpg --batch --keyserver "$server" --recv-keys "$GPG_KEY" && break || : ; \
done && \
gpg --batch --verify flink.tgz.asc flink.tgz; \
gpgconf --kill all; \
rm -rf "$GNUPGHOME" flink.tgz.asc; \
fi; \
\
tar -xf flink.tgz --strip-components=1; \
rm flink.tgz; \
mkdir -p ${FLINK_HOME}/plugins/flink-s3-fs-hadoop/; \
cp $FLINK_HOME/opt/flink-s3-fs-hadoop-1.11.1.jar $FLINK_HOME/plugins/flink-s3-fs-hadoop/; \
chown -R flink:flink .;


# Configure container
COPY docker-entrypoint.sh /
RUN chmod +x /docker-entrypoint.sh
ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["help"]

 

2. 构建Flink 镜像并将镜像上传到镜像仓库

本文使用 AWS ECR 存储镜像,您也可以使用 Docker hub 等其他镜像仓库。
登录 AWS 控制台,打开 ECR 服务,创建 repository 名字为 Flink,复制 URI ,执行以下命令创建并推送镜像到 ECR。

 

可以从控制台获取到登录 AWS ECR 并把容器镜推送到 ECR 的操作命令:

  1. 检索身份验证令牌并向注册表验证 Docker 客户端身份。

aws ecr get-login-password --region <Your Region> | docker login --username AWS --password-stdin <Your ECR URI>

  1. 使用以下命令生成 Docker 映像。

docker build -t flink:v1.11.1 .

  1. 生成完成后,标记您的映像,以便将映像推送到此存储库:

docker tag flink:v1.11.1 <Your ECR URI>flink:v1.11.1

  1. 运行以下命令将此映像推送到您新创建的 AWS 存储库:

docker push <Your ECR URI>/flink:v1.11.1

三、 准备集群启动文件

在 Kubernetes上部署 Flink 有两种运行模式:Session Cluster 和 Job Cluster。Session Cluster 模式适合运行那些短时作业和即席查询。Job Cluster 则是为单个脚本部署一整套服务,包括 JobManager 和 TaskManager,运行结束后这些资源也随即释放。本实验以 Session Cluster 为例介绍。Job Cluster 与之相似。

从 Flink 官网下载 Flink 集群启动 yaml 文件,并做适当修改。
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html

flink-configuration-configmap.yaml
jobmanager-service.yaml
jobmanager-session-deployment.yaml
taskmanager-session-deployment.yaml

修改yaml文件:修改文件中image字段为ECR镜像地址。

jobmanager-deployment.yaml
taskmanager-deployment.yaml

为模拟从客户端提交任务到 Flink 集群,新建 pod 模拟客户端。也可以通过在 cloud9 或 EC2中安装Flink 的模式来模拟客户端。如果使用 AWS cloud9 安装 Flink 客户端提交任务,需要将 cloud9 环境中默认 JDK 版本由 1.7 升级到 1.8+。
新建 Flink 客户端 yaml 文件,文件名为client.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: client
spec:
replicas: 1
template:
metadata:
labels:
app: client
component: client
spec:
containers:
- name: client
image: <Image>
workingDir: /opt/flink
command: ["/bin/bash"]
stdin: true
tty: true

修改 yaml 文件:修改文件中 image 字段为 ECR 镜像地址

四、 启动 Flink 环境

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-session-deployment.yaml
kubectl create -f taskmanager-session-deployment.yaml
kubectl create -f client.yaml

确认 Flink 集群环境已经成功运行:

五、 为 Flink 集群增加读写 Amazon S3 权限

Flink 程序读写S3中数据文件,需要有对 S3 中对应 bucket 的读写权限。有几种为 Flink 授权的方法:

  1. 使用 Identity and Access Management (IAM) 为 EKS Worker Node 授权

为 EKS 集群中 Worker Node 增加S3读写权限,此方法操作简单,但是权限控制粒度粗。为 Worker Node 授权后,运行在该 Node 上的所有应用,都有对S3上响应内容的读写权限。



注:由于 flink-s3-fs-hadoop 原因,目前还无法在 pod 级别使用 ServiceAccount 进行权限控制 。

  1. 使用 Access Keys 方式授权

将 AccessKey 和 SecretKey 写入到 Flink 配置文件中。具体操作:修改 flink-configuration-configmap.yaml 文件,修改完成后需要重新部署集群。

s3.access-key: your-access-key
s3.secret-key: your-secret-key

六、 配置监控和 Dashboard

设置端口转发,通过 cloud9 浏览 Flink 监控页面:

kubectl proxy --port=8081 --address='0.0.0.0' --disable-filter=true &

在 cloud9 或者浏览器中打开以下地址,<>中内容替换为您当前 cloud9 地址。

https:// <6c2c5d2fa8a247468da8900c82ebd287.vfs.cloud9.ap-southeast-1.amazonaws.com>/ api/v1/namespaces/default/services/flink-jobmanager:webui/proxy

七、 测试运行 Flink 作业读写 S3文件

获取 Flink 集群 service 地址

kubectl get svc | grep flink-jobmanager

登录Flink 客户端

  • 获取 client names: kubectl get pods
  • 登录客户端 kubectl exec <pod name> -it /bin/sh

提交任务

  • 登录 client pod 提交任务

kubectl get pods | grep client

kubectl exec -it <client-pod-name> /bin/sh

./bin/flink run -m <serviceIP>:8081 ./examples/batch/WordCount.jar --input s3://<bucket-name>/input.txt --output s3://<bucket-name>/output

查看任务结果和执行过程

  1. 到 S3 存储桶对应 <bucket-name>/output目录下查看确认执行结果


 

  1. 从 Flink Dashboard 中查看该任务执行过程

 

总结:

Flink on EKS 可以充分利用 Kubernetes 的优势,kubernetes可以很好地集成其他集群维护工具,如监控工具普罗米修斯,同时在资源弹性方面,kubernetes可以很方便地进行扩缩容。同时 Flink 结合S3计算和存储分离的模式,充分利用Amazon S3高持久性、低成本的特点,进一步降低大数据分析使用成本。

本篇作者

马卫军

AWS中国团队的解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,同时致力于AWS云服务在国内教育行业的应用和推广。有丰富的数据仓库以及大数据开发和架构设计经验。马卫军平时热爱爬山和足球,同时也乐于和他人分享自己的各种经历。