亚马逊AWS官方博客

可持续性最佳架构实践—基于Spot的Flink作业集群部署与优化

 1.前言

AWS Well-Architected Framework 描述了用于在云中设计和运行工作负载的关键概念、设计原则和架构最佳实践。其中可持续性支柱作为目前 Well-Architected Framework 中的最新一员侧重于减少运行的云工作负载对环境的影响。在正确的时间以正确的数量提供正确的资源,以满足明确定义的业务需求。以Reuse,Recycle,Reduce,Rearchitect 四个方面为准则构建最佳架构。本系列博客将以在EKS上的部署Flink作业为例,通过 Karpenter, Spot, Graviton 等技术,遵循Reuse,Recycle,Reduce,Rearchitect 四大原则,从零开始构建最佳架构。

这一篇我们将介绍如何搭建EKS集群和 Karpenter 弹性伸缩工具,并添加Spot工作节点,运行 Flink作业。

  • 遵循 Recycle 原则,通过Spot 实例实现云应用的交付和优化:
    • Amazon EC2 Spot 实例让您可以利用 亚马逊云中的备用 EC2 容量,与按需实例相比,可享受高达 90% 的折扣。当这些实例即将被 Amazon EC2 回收时,Spot 实例会收到两分钟的警告。 有许多优雅的方式来处理中断,例如EC2 实例重新平衡建议,以便在 Spot 实例处于较高中断风险时发送主动通知。Spot 实例是扩展和增加大数据工作负载吞吐量的好方法,已被许多客户采用。
  • 遵循 Reuse原则,结合 Karpenter 弹性伸缩工具最大限度地提高利用率和资源效率:
    • EKS集群中根据不同应用的可用性要求,可能存在多组不同类型算力,例如相对固定的按需托管节点组,用于部署系统组件如集群弹性伸缩器本身等。如果没有特殊的隔离和稳定性要求,我们尽量先把现有节点填满,不够再由Karpenter动态快速拉起。
    • Kapenter从15版本开始支持工作负载整合(consolidation),启用后将根据集群中的工作负载变化,自动寻找机会重新安排到未充分利用的节点上,最大限度复用现有算力,帮助提高利用率并降低集群计算成本。

总体架构如下所示:

架构概要说明:

1.   创建 EKS 集群时,添加一个托管按需节点组(默认一个节点),用于部署系统组件例如 EBS CSI 驱动程序等。

2.   借助 Karpenter 动态拉起 Flink 作业需要的计算资源,通过配置多个Provisioner,每个 Provisioner 设置不同 weight,实现精细化协同控制。

3.   ARM 节点主动打上 Taints,配合使用 Tolerations,以确保 Flink 作业调度到合适的节点上。

4.   利用 docker buildx 工具一键打包 Multi-Arch 镜像并推送到镜像仓库。

5.   Flink Job Manager (Flink JM) 利用 nodeSelector 主动调度到由按需节点(包括部署系统组件的按需节点组和 Karpenter拉起的节点)。

6.   Flink Task Manager (Flink TM) 默认不加任何限定条件(nodeSelector/ nodeAffinity),并且配置HPA(基于CPU)。当资源不够时,由 Provisioner 按优先级协调拉起合适节点。

7.   利用 Kinesis Data Generator 生成大量模拟数据,打到 Kinesis Data Stream 数据。随着数据的增加,配置了 HPA 的 Task Manager 自动弹出更多Pod。

8.   Flink 作业启用检查点,并将作业检查点数据写入 S3,从而允许 Flink 保存状态并具备容错性。

9.   使用 Fault Injection Simulator 模拟 Spot 回收事件。

10. Node Termination Handler 配合 Spot,让应用运行更平稳。

2.搭建集群

2.1 准备基础资源

测试区域:美东(us-east-1)。

部署创建VPC,通过使用CloudFormation在实验环境中部署以下内容:

  • 创建一个包含公有、私有子网的VPC
  • 公有子网: 分别位于两个不同的可用区,该子网内的资源会暴露在互联网上,可被用户或客户端直接访问。用于部署NAT Gateway, 堡垒机,ELB负载均衡器等。
  • 私有子网: 分别位于两个不同的可用区,该子网内的资源无法直接被互联网上的用户直接访问。用于部署Web应用服务器,中间件服务,数据库服务等无需直接暴露在互联网上的服务。
  • 多个安全组: 用于控制EKS集群/EC2网络传入和传出流量。
  • Cloud9实例: 用于管理和维护EKS集群。
  • EKS管理角色:用做EKS集群的最高管理员。

首先下载模板:

https://raw.githubusercontent.com/BWCXME/cost-optimized-flink-on-kubernetes/main/cfn-infra.yaml

然后打开CloudFormation控制台https://us-east-1.console.aws.amazon.com/cloudformation/home?region=us-east-1#/

上传刚刚下载的模板,输入堆栈名称,例如“eks-infra”:

# 定义CloudFormation模板名称,例如 eks-infra
CFN_NAME="eks-infra"

其他参数保持默认,点击创建,约等待 3~4 分钟即可创建完成。

2.2 配置Cloud9

堆栈创建完成后,切到“Outputs”,点击输出里的Cloud9链接进入。

或者通过 Cloud9 控制台:

https://us-east-1.console.aws.amazon.com/cloud9/home?region=us-east-1

进入环境后,注意需要绑定EKS管理员角色。首先跳转到EC2控制台:

然后修改角色配置:

搜索“eks”,绑定到实例:

然后修改安全组配置,添加三个安全组后保存:

  • ControlPlaneSecurityGroup
  • SharedNodeSecurityGroup
  • ExternalSecurityGroup

在Cloud9里新开一个终端,升级awscli命令行,接着取消临时凭证:

sudo mv /bin/aws /bin/aws1
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

/usr/local/bin/aws cloud9 update-environment --environment-id $C9_PID --managed-credentials-action DISABLE
rm -vf ${HOME}/.aws/credentials

定义堆栈名称,注意一定要和前面创建的CloudFormation堆栈名称保持一致:

# 定义CloudFormation模板名称
CFN_NAME="eks-infra"

更新环境变量和安装辅助工具:

# 更新环境变量
wget https://raw.githubusercontent.com/BWCXME/cost-optimized-flink-on-kubernetes/main/prepareCFNENVs.sh
chmod u+x prepareCFNENVs.sh
./prepareCFNENVs.sh ${CFN_NAME}
source ~/.bashrc

# 安装辅助工具
wget https://raw.githubusercontent.com/BWCXME/cost-optimized-flink-on-kubernetes/main/prepareCloud9IDE.sh
chmod u+x prepareCloud9IDE.sh
./prepareCloud9IDE.sh
source ~/.bashrc

2.3 生成SSH Key

生成SSH Key,方便后续有问题时,可以直接进入Worker节点进行调试:

#连续按3次回车
ssh-keygen -t rsa -b 4096

生成后导入:

aws ec2 import-key-pair --key-name "k8s" --public-key-material fileb://~/.ssh/id_rsa.pub

2.4 准备Flink节点权限

生成策略文件:

cat <<EoF > flink-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:*",
                "firehose:*",
                "kinesisanalytics:*",
                "ec2:*",
                "ec2messages:*",
                "s3:*",
                "s3-object-lambda:*",
                "elasticfilesystem:*",
                "logs:*",
                "cloudwatch:*",
                "autoscaling:*",
                "sns:*",
                "ssm:*",
                "ssmmessages:*",
                "lambda:*"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:ListPolicyVersions",
                "iam:ListRoles",
                "iam:PassRole"
            ],
            "Resource": "*"
        }        
    ]
}
EoF

执行创建:

aws iam create-policy   \
  --policy-name flink-policy \
  --policy-document file://flink-policy.json

2.5 创建EKS集群

指定EKS版本和集群名称到环境变量:

export EKS_VERSION=1.23
export EKS_CLUSTER_NAME=ekslab
export EKS_IP_FAMILY=IPv4

echo "export EKS_VERSION=\"$EKS_VERSION\"" >> ~/.bashrc
echo "export EKS_CLUSTER_NAME=\"$EKS_CLUSTER_NAME\"" >> ~/.bashrc
source ~/.bashrc

创建一个集群配置文件 flink-cluster.yaml:

cat > flink-cluster.yaml <<EOF
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: ${EKS_CLUSTER_NAME}
  region: ${AWS_REGION}
  version: "${EKS_VERSION}"

kubernetesNetworkConfig:
  ipFamily: ${EKS_IP_FAMILY}

vpc:
  id: "${EKS_VPC_ID}"
  securityGroup: ${EKS_CONTROLPLANE_SG} 
  sharedNodeSecurityGroup: ${EKS_SHAREDNODE_SG} 
  clusterEndpoints:
    publicAccess: true
    privateAccess: true
  subnets:
    public:
      public-${AWS_REGION}a:
          id: "${EKS_PUB_SUBNET_01}"
      public-${AWS_REGION}b:
          id: "${EKS_PUB_SUBNET_02}"
      public-${AWS_REGION}c:
          id: "${EKS_PUB_SUBNET_03}"          
    private:
      private-${AWS_REGION}a:
          id: "${EKS_PRI_SUBNET_01}"
      private-${AWS_REGION}b:
          id: "${EKS_PRI_SUBNET_02}"
      private-${AWS_REGION}c:
          id: "${EKS_PRI_SUBNET_03}"            

iam:
  withOIDC: true

cloudWatch:
  clusterLogging:
    enableTypes: ["*"]
    logRetentionInDays: 60

secretsEncryption:
  keyARN: ${EKS_KEY_ARN}

managedNodeGroups:
  - name: ondemand-x86-al2-cpu
    amiFamily: AmazonLinux2
    instanceTypes: [ "m5.xlarge" ]
    minSize: 1
    maxSize: 3
    desiredCapacity: 1
    volumeSize: 1000
    volumeType: gp3
    volumeEncrypted: true    
    ssh:
      allow: true
      publicKeyName: k8s
    subnets:
      - ${EKS_PRI_SUBNET_01}
      - ${EKS_PRI_SUBNET_02}
      - ${EKS_PRI_SUBNET_03}
    updateConfig:
      maxUnavailablePercentage: 33
    labels:
      os-distribution: amazon-linux-2
      cpu-architecture: x86
      network: private
      group: managed
    iam:
      withAddonPolicies:        
        imageBuilder: true # Write access to ECR
        cloudWatch: true  # Allow workers to send data to CloudWatch
        ebs: true
        fsx: true
        efs: true
        externalDNS: true
        certManager: true
      attachPolicyARNs:
        # Required by EKS workers      
        - arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy
        - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
        - arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore
        - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy 
        # For our Application 
        - arn:aws:iam::${ACCOUNT_ID}:policy/flink-policy        

addons:
  - name: vpc-cni
    version: latest  
    attachPolicyARNs:
      - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy  
  - name: coredns
    version: latest # auto discovers the latest available
  - name: kube-proxy
    version: latest 
  - name: aws-ebs-csi-driver
EOF

执行创建:

eksctl create cluster -f flink-cluster.yaml

集群创建完成并添加第一个x86的节点组需要约20分钟左右,期间可以去两个控制台查看进度:

创建完成后,检查集群状态:

k cluster-info

3.安装集群弹性伸缩器

Karpenter 是一个为 Kubernetes 构建的开源自动扩缩容项目。它提高了 Kubernetes 应用程序的可用性,而无需手动或过度配置计算资源。Karpenter 旨在通过观察不可调度的 Pod 的聚合资源请求并做出启动和终止节点的决策,以最大限度地减少调度延迟和基础设施成本。

3.1 添加标签

首先需要先给指定“子网”和“安全组”添加标签。

Karpenter 发现标记为 “kubernetes.io/cluster/$EKS_CLUSTER_NAME” 的子网。将此标签添加到为您的集群配置的关联子网。检索子网 ID 并使用集群名称标记它们。

子网打标签(这里示例选择私有子网):

aws ec2 create-tags \
    --resources ${EKS_PRI_SUBNET_01} ${EKS_PRI_SUBNET_02} ${EKS_PRI_SUBNET_03} \
    --tags Key="karpenter.sh/discovery,Value=${EKS_CLUSTER_NAME}"

Karpenter 拉起新节点时,自动挂上打了对应标签的安全组。这里示例,自动捞取现有节点组,然后标记集群中第一个节点组的安全组:

NODEGROUP=$(aws eks list-nodegroups --cluster-name ${EKS_CLUSTER_NAME} \
    --query 'nodegroups[0]' --output text)

LAUNCH_TEMPLATE=$(aws eks describe-nodegroup --cluster-name ${EKS_CLUSTER_NAME} \
    --nodegroup-name ${NODEGROUP} --query 'nodegroup.launchTemplate.{id:id,version:version}' \
    --output text | tr -s "\t" ",")

# If your EKS setup is configured to use only Cluster security group, then please execute -

SECURITY_GROUPS=$(aws eks describe-cluster \
    --name ${EKS_CLUSTER_NAME} --query "cluster.resourcesVpcConfig.clusterSecurityGroupId")

# If your setup uses the security groups in the Launch template of a managed node group, then :

SECURITY_GROUPS=$(aws ec2 describe-launch-template-versions \
    --launch-template-id ${LAUNCH_TEMPLATE%,*} --versions ${LAUNCH_TEMPLATE#*,} \
    --query 'LaunchTemplateVersions[0].LaunchTemplateData.[NetworkInterfaces[0].Groups||SecurityGroupIds]' \
    --output text)

aws ec2 create-tags \
    --tags "Key=karpenter.sh/discovery,Value=${EKS_CLUSTER_NAME}" \
    --resources ${SECURITY_GROUPS}

如果您有多个节点组或多个安全组,您可以手动指定,例如这里我们再加上共享安全组:

aws ec2 create-tags \
    --tags "Key=karpenter.sh/discovery,Value=${EKS_CLUSTER_NAME}" \
    --resources ${EKS_SHAREDNODE_SG}

3.2 指定版本

截至到2022年9月初,最新版本为0.16.1:

export KARPENTER_VERSION=v0.16.1
echo "export KARPENTER_VERSION=\"$KARPENTER_VERSION\"" >> ~/.bashrc
source ~/.bashrc

3.3 准备Karpenter权限

首先通过CloudFormation模板,快捷创建KarpenterNode角色,并添加基础权限:

TEMPOUT=$(mktemp)

curl -fsSL https://karpenter.sh/"${KARPENTER_VERSION}"/getting-started/getting-started-with-eksctl/cloudformation.yaml  > $TEMPOUT \
&& aws cloudformation deploy \
  --stack-name "Karpenter-${EKS_CLUSTER_NAME}" \
  --template-file "${TEMPOUT}" \
  --capabilities CAPABILITY_NAMED_IAM \
  --parameter-overrides "ClusterName=${EKS_CLUSTER_NAME}"

增加Flink作业需要的额外权限:

aws iam attach-role-policy --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/flink-policy --role-name KarpenterNodeRole-${EKS_CLUSTER_NAME}

将 Karpenter 节点角色添加到您的 aws-auth configmap,允许具有此角色的节点连接到集群:

eksctl create iamidentitymapping \
  --username system:node:{{EC2PrivateDNSName}} \
  --cluster "${EKS_CLUSTER_NAME}" \
  --arn "arn:aws:iam::${ACCOUNT_ID}:role/KarpenterNodeRole-${EKS_CLUSTER_NAME}" \
  --group system:bootstrappers \
  --group system:nodes

接着准备KarpenterController权限:

eksctl create iamserviceaccount \
  --cluster "${EKS_CLUSTER_NAME}" --name karpenter --namespace karpenter \
  --role-name "${EKS_CLUSTER_NAME}-karpenter" \
  --attach-policy-arn "arn:aws:iam::${ACCOUNT_ID}:policy/KarpenterControllerPolicy-${EKS_CLUSTER_NAME}" \
  --role-only \
  --approve

export KARPENTER_IAM_ROLE_ARN="arn:aws:iam::${ACCOUNT_ID}:role/${EKS_CLUSTER_NAME}-karpenter"

3.4 准备Spot服务关联角色(可选)

仅当您第一次在账户中使用 EC2 Spot 时,才需要执行此步骤:

aws iam create-service-linked-role --aws-service-name spot.amazonaws.com || true

3.5 安装Karpenter

通过Helm快速安装:

helm repo add karpenter https://charts.karpenter.sh
helm repo update

helm upgrade --install --namespace karpenter --create-namespace \
  karpenter karpenter/karpenter \
  --version ${KARPENTER_VERSION} \
  --set serviceAccount.annotations."eks\.amazonaws\.com/role-arn"=${KARPENTER_IAM_ROLE_ARN} \
  --set clusterName=${EKS_CLUSTER_NAME} \
  --set clusterEndpoint=$(aws eks describe-cluster --name ${EKS_CLUSTER_NAME} --query "cluster.endpoint" --output json) \
  --set aws.defaultInstanceProfile=KarpenterNodeInstanceProfile-${EKS_CLUSTER_NAME} \
  --set defaultProvisioner.create=false \
  --wait # for the defaulting webhook to install before creating a Provisioner

检查部署:

k get all -n karpenter

k logs -f deployment/karpenter -c controller -n karpenter

您也可以启用Debug日志,方便排查问题(此步骤可选):

k patch configmap config-logging -n karpenter --patch '{"data":{"loglevel.controller":"debug"}}'

3.6 部署Provisioner

Karpenter 0.16版本开始支持对多个Provisioner设置优先级,使用 “.spec.weight” 字段对Provisioner进行排序。我们可以更细粒度的进行协调控制:

  • 配置多个Provisioner
  • 设置不同优先级
  • 动态调整优先级(例如集成Spot Placement Score,这里不做展开)

例如我们可以分别设置Spot和按需Provisioner,先优先尝试Spot Provisioner,Spot资源不足,拿不到再退回到按需Provisioner。

这里需要注意,Karpenter 不基于自动伸缩组管理,拉起的节点默认打上的标签与托管节点组有差异,例如区分按需和Spot的标签键值都不一样:

这里我们遵循 “Reuse” 原则,先填满托管节点组,然后到Karpenter现有节点并适时拉起新节点。为方便应用通过统一的标签识别 ,在Provisioner里手动指定2个标签:

  • amazonaws.com/capacityType,值与托管节点组保持一致
  • group,值设置为“NONE”,便于快速识别Karpenter节点

如果您希望保持简单,不要求最大化托管节点组利用率,或者计划后期通过手工调整托管节点组数量和机型,Flink 作业可以只使用 Kaprenter 节点。Karpenter 已经内置了优先选择 Spot 然后按需的机制,这样只需配置一个 Provisioner。同时注意不要设置eks.amazonaws.com/capacityType标签,因为拉起的节点可能是 Spot,也可能是按需。对于 Flink Job Manager 等必须使用按需实例的任务,只需利用 node selector 通过原生标签 karpenter.sh/capacity-type 指定即可。

可以利用别名(前面步骤安装辅助工具时设置),快速查看节点信息,有一列显示eks.amazonaws.com/capacityType,帮助快速识别节点资源类型:

kgn

创建x86 Provisioner配置文件 provisioner-x86.yaml:

cat > provisioner-x86.yaml <<EOF
apiVersion: karpenter.sh/v1alpha5
kind: Provisioner
metadata:
  name: x86-spot-provisioner
spec:
  consolidation:
    enabled: true
  ttlSecondsUntilExpired: 2592000 # 30 Days = 60 * 60 * 24 * 30 Seconds;
  weight: 50 # 值越大,优先级越高
  
  requirements:
    - key: karpenter.sh/capacity-type
      operator: In
      values: ["spot"]
    - key: "node.kubernetes.io/instance-type"
      operator: In
      values: ["m5.xlarge", "m5d.xlarge", "m5dn.xlarge"]
    - key: "topology.kubernetes.io/zone"
      operator: In
      values: ["${AWS_REGION}a", "${AWS_REGION}b", "${AWS_REGION}c"]
    - key: "kubernetes.io/arch"
      operator: In
      values: ["amd64"] # arm64 
  kubeletConfiguration:
    systemReserved:
      cpu: 1
      memory: 5Gi
      ephemeral-storage: 10Gi
    maxPods: 20
  limits:
    resources:
      cpu: 1000
      memory: 2000Gi      
  providerRef: # optional, recommended to use instead of provider
    name: flink      
  labels:
    eks.amazonaws.com/capacityType: 'SPOT'
    cpu-architecture: x86
    network: private      
    group: 'NONE'
---
apiVersion: karpenter.sh/v1alpha5
kind: Provisioner
metadata:
  name: x86-ondemand-provisioner
spec:
  consolidation:
    enabled: false
  ttlSecondsAfterEmpty: 60
  ttlSecondsUntilExpired: 2592000 # 30 Days = 60 * 60 * 24 * 30 Seconds;
  weight: 10 # 值越大,优先级越高
  
  requirements:
    - key: karpenter.sh/capacity-type
      operator: In
      values: ["on-demand"]
    - key: "node.kubernetes.io/instance-type"
      operator: In
      values: ["m5.xlarge", "m5d.xlarge", "m5dn.xlarge"]
    - key: "topology.kubernetes.io/zone"
      operator: In
      values: ["${AWS_REGION}a", "${AWS_REGION}b", "${AWS_REGION}c"]
    - key: "kubernetes.io/arch"
      operator: In
      values: ["amd64"] # arm64 
  kubeletConfiguration:
    systemReserved:
      cpu: 1
      memory: 5Gi
      ephemeral-storage: 10Gi
    maxPods: 20
  limits:
    resources:
      cpu: 1000
      memory: 2000Gi      
  providerRef:
    name: flink
  labels:
    eks.amazonaws.com/capacityType: 'ON_DEMAND'
    cpu-architecture: x86
    network: private      
    group: 'NONE'  
---
apiVersion: karpenter.k8s.aws/v1alpha1
kind: AWSNodeTemplate
metadata:
  name: flink
spec:
  subnetSelector:                            
    karpenter.sh/discovery: ${EKS_CLUSTER_NAME}
  securityGroupSelector:  # required, when not using launchTemplate
    karpenter.sh/discovery: ${EKS_CLUSTER_NAME}
  instanceProfile: KarpenterNodeInstanceProfile-${EKS_CLUSTER_NAME} # optional, if set in controller args
  blockDeviceMappings:
    - deviceName: /dev/xvda
      ebs:
        volumeSize: 1000Gi
        volumeType: gp3
        iops: 10000
        encrypted: true
        deleteOnTermination: true
        throughput: 125
EOF

执行部署:

k apply -f provisioner-x86.yaml

检查部署:

k get provisioners

4.安装Node Termination Handler

Node Termination Handler是配合Spot使用的好搭档,有助于在Spot 实例中断、EC2 计划维护窗口或扩展事件期间让应用运行的更平稳。

Node Termination Handler有两种模式:

  • 实例元数据服务(IMDS),在每个节点上运行一个 pod 来监控事件并采取相应措施。
  • 队列处理器,使用 Amazon SQS 接收 Auto Scaling Group (ASG) 生命周期事件、EC2 状态更改事件、Spot 中断终止通知事件和 Spot 重新平衡建议事件。这些事件可以配置为发布到 Amazon EventBridge。

这里我们使用队列处理器模式,注意Karpenter 拉起的实例不属于任何Auto Scaling组,Auto Scaling Group 生命周期事件无需配置。

4.1 准备SQS队列

创建SQS队列:

export SQS_QUEUE_NAME="nth"

QUEUE_POLICY=$(cat <<EOF
{
    "Version": "2012-10-17",
    "Id": "NTHQueuePolicy",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {
            "Service": ["events.amazonaws.com", "sqs.amazonaws.com"]
        },
        "Action": "sqs:SendMessage",
        "Resource": [
            "arn:aws:sqs:${AWS_REGION}:${ACCOUNT_ID}:${SQS_QUEUE_NAME}"
        ]
    }]
}
EOF
)

## make sure the queue policy is valid JSON
echo "$QUEUE_POLICY" | jq .

## Save queue attributes to a temp file
cat << EOF > /tmp/queue-attributes.json
{
  "MessageRetentionPeriod": "300",
  "Policy": "$(echo $QUEUE_POLICY | sed 's/\"/\\"/g' | tr -d -s '\n' " ")"
}
EOF

aws sqs create-queue --queue-name "${SQS_QUEUE_NAME}" --attributes file:///tmp/queue-attributes.json

获取队列URL和ARN,并输出到变量:

export SQS_QUEUE_URL=$(aws sqs get-queue-url --queue-name ${SQS_QUEUE_NAME} | jq -r '.QueueUrl')

export SQS_QUEUE_ARN=$(aws sqs get-queue-attributes --queue-url ${SQS_QUEUE_URL} --attribute-names QueueArn | jq -r '.Attributes.QueueArn')

4.2 定义事件

通过以下命令快速创建(您也可以通过EventBridge在控制台创建):

aws events put-rule \
--name NTHASGTermRule \
--event-pattern "{\"source\":[\"aws.autoscaling\"],\"detail-type\":[\"EC2 Instance-terminate Lifecycle Action\"]}"

aws events put-targets --rule NTHASGTermRule \
--targets "Id"="1","Arn"=${SQS_QUEUE_ARN}

aws events put-rule \
--name NTHSpotTermRule \
--event-pattern "{\"source\": [\"aws.ec2\"],\"detail-type\": [\"EC2 Spot Instance Interruption Warning\"]}"

aws events put-targets --rule NTHSpotTermRule \
--targets "Id"="1","Arn"=${SQS_QUEUE_ARN}

aws events put-rule \
--name NTHRebalanceRule \
--event-pattern "{\"source\": [\"aws.ec2\"],\"detail-type\": [\"EC2 Instance Rebalance Recommendation\"]}"

aws events put-targets --rule NTHRebalanceRule \
--targets "Id"="1","Arn"=${SQS_QUEUE_ARN}

aws events put-rule \
--name NTHInstanceStateChangeRule \
--event-pattern "{\"source\": [\"aws.ec2\"],\"detail-type\": [\"EC2 Instance State-change Notification\"]}"

aws events put-targets --rule NTHInstanceStateChangeRule \
--targets "Id"="1","Arn"=${SQS_QUEUE_ARN}

aws events put-rule \
--name NTHScheduledChangeRule \
--event-pattern "{\"source\": [\"aws.health\"],\"detail-type\": [\"AWS Health Event\"],\"detail\": {\"service\": [\"EC2\"],\"eventTypeCategory\": [\"scheduledChange\"]}}"

aws events put-targets --rule NTHScheduledChangeRule \
  --targets "Id"="1","Arn"=${SQS_QUEUE_ARN}

4.3 准备NTH权限

创建策略并进行关联:

# 配置IAM策略
cat <<EoF > /tmp/nth-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "autoscaling:CompleteLifecycleAction",
                "autoscaling:DescribeAutoScalingInstances",
                "autoscaling:DescribeTags",
                "ec2:DescribeInstances",
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage"
            ],
            "Resource": "*"
        }
    ]
}
EoF

aws iam create-policy   \
  --policy-name nth-policy \
  --policy-document file:///tmp/nth-policy.json


# 把IAM Role和SA关联起来
eksctl create iamserviceaccount \
    --name nth \
    --namespace kube-system \
    --cluster ${EKS_CLUSTER_NAME} \
    --attach-policy-arn "arn:aws:iam::${ACCOUNT_ID}:policy/nth-policy" \
    --approve \
    --override-existing-serviceaccounts

# 查看并确认
k -n kube-system describe sa nth

4.4 部署NTH

生成配置文件:

cat > nth-values.yaml<<EOF
enableSqsTerminationDraining: true
queueURL: "${SQS_QUEUE_URL}"
awsRegion: "${AWS_REGION}"
serviceAccount:
  create: false
  name: nth

checkASGTagBeforeDraining: false # <-- set to false as instances do not belong to any ASG
enableSpotInterruptionDraining: true
enableRebalanceMonitoring: true
enableRebalanceDraining: true
enableScheduledEventDraining: true
EOF

执行部署:

helm repo add eks https://aws.github.io/eks-charts

helm upgrade --install aws-node-termination-handler \
  --namespace kube-system \
  -f nth-values.yaml \
  eks/aws-node-termination-handler

检查部署:

k get po -n kube-system -l k8s-app=aws-node-termination-handler

5.部署Flink

Flink 本身具备一定的容错能力,可以利用检查点来恢复流中的状态和位置,前提是:

  • 持久数据源(Apache Kafka、Amazon Kinesis)能够重放数据
  • 持久分布式存储(Amazon S3、EFS、HDFS)以存储状态

Job Manager 和 Task Manager 是 Flink 的关键组件:

  • Task Manager计算密集型,可以跑在Spot实例上;
  • Job Manager是中央协调器,建议运行在按需实例上。

数据流如下所示:

实验代码仓库:

https://github.com/BWCXME/cost-optimized-flink-on-kubernetes

5.1 准备镜像

拉取代码:

cd ~/environment
git clone https://github.com/BWCXME/cost-optimized-flink-on-kubernetes flink-demo

cd flink-demo

创建仓库:

aws ecr create-repository --repository-name flink-demo --image-scanning-configuration scanOnPush=true

登录仓库:

aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com

打包x86镜像并推送到镜像仓库:

docker build --tag ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest .

docker push ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest

5.2 准备Flink Service Account

增加Flink作业需要的额外权限:

export FLINK_SA=flink-service-account

kubectl create serviceaccount $FLINK_SA

kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:$FLINK_SA

echo "export FLINK_SA=\"$FLINK_SA\"" >> ~/.bashrc
source ~/.bashrc

5.3 准备输入/输出

创建Kinesis输入流并保存名称到环境变量:

export FLINK_INPUT_STREAM=FlinkDemoInputStream

aws kinesis create-stream \
    --stream-name ${FLINK_INPUT_STREAM} \
    --shard-count 3

echo "export FLINK_INPUT_STREAM=\"${FLINK_INPUT_STREAM}\"" >> ~/.bashrc
source ~/.bashrc

创建输出S3桶并保留名称到环境变量:

export FLINK_S3_BUCKET=flink-${ACCOUNT_ID}

aws s3 mb s3://${FLINK_S3_BUCKET}

echo "export FLINK_S3_BUCKET=\"${FLINK_S3_BUCKET}\"" >> ~/.bashrc
source ~/.bashrc

5.4 提交任务 (YAML)

您即可以通过明确定义YAML文件来部署,也可以利用命令行快捷部署。这里演示我们用YAML文件。

准备目录:

mkdir x86
cd x86

生成配置文件:

cat > flink-configuration-configmap.yaml <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    kubernetes.cluster-id: flink-demo
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3a://${FLINK_S3_BUCKET}/recovery
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 100000
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 2048m
    taskmanager.memory.process.size: 2048m
    scheduler-mode: reactive
    parallelism.default: 4    
    rest.flamegraph.enabled: true 
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = DEBUG

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = \${sys:log.file}
    appender.rolling.filePattern = \${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
EOF

参考Flink Kubernetes高可用部署文档,这里我们通过nodeSelector,强制往按需节点上调度,同时遵循“Reuse”原则,托管节点组或者Karpenter拉起的按需节点都可以。

如果您希望限定到没有自动伸缩组的节点(由Karpenter拉起),请手动添加到后面生成的jobmanager-application-ha.yaml:

      nodeSelector:
        'group': 'NONE' 

生成jobmanager部署文件:

cat > jobmanager-application-ha.yaml <<EOF
apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  parallelism: 1 # Set the value to greater than 1 to start standby JobManagers
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      serviceAccountName: ${FLINK_SA}
      nodeSelector:
        'eks.amazonaws.com/capacityType': 'ON_DEMAND'
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest
          imagePullPolicy: Always
          env:
          - name: POD_IP
            valueFrom:
              fieldRef:
                apiVersion: v1
                fieldPath: status.podIP
          args: ["standalone-job", "--host", "\$(POD_IP)","--job-classname", "com.amazonaws.services.kinesisanalytics.S3StreamingSinkJob","--inputStreamName", "${FLINK_INPUT_STREAM}", "--region", "${AWS_REGION}", "--s3SinkPath", "s3a://${FLINK_S3_BUCKET}/data",  "--checkpoint-dir", "s3a://${FLINK_S3_BUCKET}/recovery"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
EOF

对于Task Manager,这里演示我们遵循“Reuse”原则,不加 nodeSelector或nodeAffinity,保证充分利用现有计算资源。

如果您需要限定托管按需节点组优先部署集群管理相关组件,或者稳定性要求高的应用,可以参考以下配置,让 Task Manager优先运行在 Karpenter 拉起的节点上:

我们通过设置nodeAffinity 让 task manager 优先放置在的 Karpenter拉起的实例上(如果需要,请手动添加到后面生成的taskmanager-job-deployment.yaml):

      affinity:
        nodeAffinity:
           preferredDuringSchedulingIgnoredDuringExecution:
           - weight: 50
             preference:
               matchExpressions:
               - key: group
                 operator: In
                 values:
                 - NONE

生成taskmanager部署文件:

cat > taskmanager-job-deployment.yaml <<EOF
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      serviceAccountName: ${FLINK_SA}
      containers:
      - name: taskmanager
        image: ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest
        imagePullPolicy: Always
        resources:
          requests:
            cpu: 250m
            memory: "4096Mi"
          limits:
            cpu: 500m
            memory: "8192Mi"
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
EOF

准备服务部署文件:

cat > jobmanager-svc.yaml <<EOF
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-web
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-security-groups: "${EKS_EXTERNAL_SG}"
spec:
  type: LoadBalancer
  ports:
  - name: web
    port: 80
    targetPort: 8081
  selector:
    app: flink
    component: jobmanager
EOF

注意这里为了方便测试,使用LoadBalancer将服务暴露出来,并且绑定了安全组ExternalSecurityGroup,请确保:

  • 这个安全组允许您的本机IP访问80端口。
  • 如果您修改了暴露端口80,例如用的8081,请相应在安全组中放开8081端口。

执行部署(请先确认在x86目录下):

k apply -f .

检查部署:

kgp

一开始Job Manager和Task Manager都是处于Pending状态,等待Karpenter拉起机器。

拉起速度很快,一分钟左右就开始创建容器了。

获取服务地址:

k get svc flink-jobmanager-web

拿到地址后在浏览器中打开,类似如下:

5.6 提交任务 (命令行)

目前 flink 1.13+ 以上 支持 pod 模板,我们可以自定义JM跟TM的启动方式。这允许直接支持 Flink Kubernetes 配置选项不支持的高级功能。

定义任务名称:

export kubernetes_cluster_id=your-flink-job-name

使用参数 kubernetes.pod-template-file 指定包含 pod 定义的本地文件。它将用于初始化 JobManager 和 TaskManager。平台化这里可以作为高级功能开放给用户。

指定 job manager 运行在按需节点上 :

cat > jobmanager-pod-template.yaml <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: jobmanager-pod-template
spec:
  nodeSelector:
    'eks.amazonaws.com/capacityType': 'ON_DEMAND'
EOF

使用命令行提交任务, 注意指定参数 kubernetes.pod-template-file.jobmanager:

flink run-application -p 2 -t kubernetes-application \
  -Dkubernetes.cluster-id=${kubernetes_cluster_id} \
  -Dkubernetes.container.image=${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/flink-demo:latest \
  -Dkubernetes.container.image.pull-policy=Always \
  -Dkubernetes.jobmanager.service-account=flink-service-account \
  -Dkubernetes.pod-template-file.jobmanager=./jobmanager-pod-template.yaml \
  -Dkubernetes.rest-service.exposed.type=LoadBalancer \
  -Dkubernetes.rest-service.annotations=service.beta.kubernetes.io/aws-load-balancer-security-groups:${EKS_EXTERNAL_SG} \
  -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory\
  -Dhigh-availability.cluster-id=${kubernetes_cluster_id} \
  -Dhigh-availability.storageDir=s3://${FLINK_S3_BUCKET}/recovery \
  -Dstate.savepoints.dir=s3://${FLINK_S3_BUCKET}/savepoints/${kubernetes_cluster_id} \
  -Dkubernetes.taskmanager.service-account=flink-service-account \
  -Dkubernetes.taskmanager.cpu=1 \
  -Dtaskmanager.memory.process.size=4096m \
  -Dtaskmanager.numberOfTaskSlots=2 \
  local:///opt/flink/usrlib/aws-kinesis-analytics-java-apps-1.0.jar \
  --inputStreamName ${FLINK_INPUT_STREAM} --region ${AWS_REGION} --s3SinkPath s3://${FLINK_S3_BUCKET}/data --checkpoint-dir s3://${FLINK_S3_BUCKET}/recovery

注意:设置命令行参数时请注意对照代码配置:

https://github.com/BWCXME/cost-optimized-flink-on-kubernetes/blob/main/src/main/java/com/amazonaws/services/kinesisanalytics/S3StreamingSinkJob.java

例如,代码中已经指定检查点间隔为10秒,无需在命令行重复设置 “-Dexecution.checkpointing.interval”。

我们同样可以通过设置 nodeAffinity 让 task manager 优先放置在的 Karpenter 拉起的实例上。

准备 task manager 的 pod 模板(主容器应使用名称 flink-main-container 定义,不可改变):

cat > taskmanager-pod-template.yaml <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: taskmanager-pod-template
spec:
  affinity:
    nodeAffinity:
       preferredDuringSchedulingIgnoredDuringExecution:
       - weight: 50
         preference:
           matchExpressions:
           - key: group
             operator: In
             values:
             - NONE
  containers:
    # Do not change the main container name
    - name: flink-main-container
      env:
        - name: HADOOP_USER_NAME
          value: "hdfs"
EOF

然后在提交任务时,通过参数 -Dkubernetes.pod-template-file.taskmanager=/path/taskmanager-pod-template.yaml 指定即可:

-Dkubernetes.pod-template-file.taskmanager=./taskmanager-pod-template.yaml

总结

在本文中,我们遵循 AWS Well-Architected Framework 的可持续性支柱,从  Reuse 角度,在 EKS 上部署 Flink 容器化应用,并结合 Karpenter 弹性伸缩工具最大限度地提高利用率和资源效率。从 Recycle 的角度,再利用 Spot 实例实现云应用的交付和优化。

参考文档

Optimizing Apache Flink on Amazon EKS using Amazon EC2 Spot Instances:

https://aws.amazon.com/blogs/compute/optimizing-apache-flink-on-amazon-eks-using-amazon-ec2-spot-instances/

本篇作者

蒋龙(John)

OPPO实时计算平台高级研发工程师,Apache Flink Contributor,长期专注于大数据领域。曾就职于金山、美团、360等互联网公司,在大数据引擎,调度、架构等方面有丰富的实战经验。

龙斌

亚马逊云科技解决方案架构师,负责协助客户业务系统上云的解决方案架构设计和咨询,现致力于容器和机器学习相关领域的研究。

王子豪

亚马逊云科技弹性计算解决方案架构师,主要负责 AWS 弹性计算相关产品的技术咨询与方案设计

翁建清

亚马逊云科技资深解决方案架构师,具有多年IT从业经验,涉及移动互联网、企业、金融、政府等行业,曾任职咨询总监、CIO、企业架构师等岗位,具有多年丰富的各类项目经验,尤其在数据仓库、大数据、数据应用场景等方面具有丰富的实战经验,目前专注于企业整体上云的架构规划、设计和实施。