在 Amazon EKS 中使用 SQS 和 EFS 持久存储管理异步任务

使用作业队列运行后台任务,使用可扩展的多可用区存储。
发布时间:2023 年 9 月 29 日
EKS 集群设置
EKS
Kubernetes
eksctl
教程
亚马逊云科技
Olawale Olaleye
亚马逊云科技使用经验
200 - 中级
完成所需时间
30 分钟
前提条件

注册 / 登录 亚马逊云科技账户

上次更新时间
2023 年 9 月 29 日

在 Kubernetes 环境中管理后台工作的使用场景并不局限于大规模数据处理或实时分析。更多情况下,您需要关注细致的任务,如数据同步、文件上传或其他在后台默默进行的异步活动。Amazon SQS 支持的最大消息为 256 KB,特别适合用来队列化处理元数据或表示作业是否已完成还是仍在进行中的状态标志数据。Amazon EFS 可以为大型数据对象提供多可用区安全存储。当 Amazon SQS 与 Amazon EFS 结合使用时,可以使 Amazon SQS 专注于任务调度。这种组合带来两个关键优势:可以通过隔离后台任务的不同方面实现模块化操作;利用 EFS 的可扩展的多可用区架构来满足不断变化的存储需求,且无需中断服务。

本教程以本课程系列的第一部分中创建的 Amazon EKS 集群为基础,详细介绍如何部署批处理作业和作业队列。该集群配置中,包括安装 EFS CSI 驱动程序插件、创建 EFS CSI 驱动程序的服务账户 IAM 角色 (IRSA)和 OpenID Connect (OIDC) 端点。有关详细信息,请参阅 为运行异步批处理任务构建预配置 Amazon EKS 集群。现在,您需要在集群上设置 EFS CSI 驱动程序插件。有关指南,请参阅在 Amazon EKS 上使用 Amazon EFS CSI 实现可扩展的多功能存储解决方案

此外,还需将 Amazon Simple Queue Service (SQS) 集成到您的 Amazon Elastic Container Registry (ECR) 集群中,构建一个批处理应用程序,将应用程序容器化并部署到 Amazon ECR,然后使用 Amazon SQS 作业队列运行您的批处理任务。教程的第二部分介绍 EFS CSI 驱动程序。EFS CSI 可以帮助我们在运行批处理工作负载时保证多个节点上的数据完整性。

前提条件

开始本教程学习之前,您需要:

  • 安装最新版本的 kubectl。运行以下命令检查您的 kubectl 版本:kubectl version --short。
  • 安装最新版本的 eksctl。运行以下命令检查您的 eksctl 版本:eksctl info。
  • 安装 Python 3.9+。运行以下命令检查您的 Python 版本:python3 --version。
  • 安装 Docker 或任何其他等效的容器引擎来构建容器。

步骤 1:配置集群环境变量

在使用 Helm 或其他命令行工具与 Amazon EKS 集群互动之前,必须定义封装集群详细信息的特定环境变量。这些变量将在后续命令中使用,确保命令的作用目标是正确的集群和资源。

  • 首先,确认集群环境。因此,可以确保后续命令都能发送到正确的 Kubernetes 集群。执行以下命令来验证当前环境:
kubectl config current-context
  • 定义 CLUSTER_NAME 环境变量,指定您的集群。请务必将 region 值替换为实际值。
export CLUSTER_NAME=$(aws eks describe-cluster --region us-east-1 --name batch-quickstart --query "cluster.name" --output text)
  • 定义 CLUSTER_REGION 环境变量,指定您的集群所属区域。请务必将 region 值替换为实际值。
export CLUSTER_REGION=$(aws eks describe-cluster --name ${CLUSTER_NAME} --region us-east-1 --query "cluster.arn" --output text | cut -d: -f4)
  • 定义 ACCOUNT_ID 环境变量,指定与 EKS 集群关联的账户。
export ACCOUNT_ID=$(aws eks describe-cluster --name ${CLUSTER_NAME} --region ${CLUSTER_REGION} --query "cluster.arn" --output text | cut -d':' -f5)

步骤 2:验证或创建服务账户的 IAM 角色

在本节中,我们将验证是否已在 Amazon EKS 集群中正确设置所需的服务账户 IAM 角色。这些 IAM 角色用于实现亚马逊云科技服务与 Kubernetes 的顺畅交互,允许在您的 Pod 中使用亚马逊云科技功能。由于批处理工作负载通常存储在私有容器注册表中,我们将为 Amazon ECR 创建一个专用服务账户。

验证集群中已正确设置所需的服务账户:

kubectl get sa -A | egrep "efs-csi-controller|ecr"

输出结果应如下所示:

default ecr-sa 0 31m
kube-system efs-csi-controller-sa 0 30m

(可选)如果您还未设置服务帐户,或者您收到错误提示,使用以下命令创建服务帐户。请注意,在运行这些命令之前,您的集群必须已经关联了一个 OpenID Connect (OIDC) 端点

  • 为 Amazon ECR 创建一个 Kubernetes 服务账户:
eksctl create iamserviceaccount \ 
 --region ${CLUSTER_REGION} \ 
 --name ecr-sa \ 
 --namespace default \ 
 --cluster batch-quickstart \ 
 --attach-policy-arn arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly \
 --approve

亚马逊云科技上没有 EFS CSI 驱动程序对应的策略,因此需要执行一些额外步骤才能创建服务账户。有关指南,请参阅在 Amazon EKS 上使用 Amazon EFS CSI 实现可扩展的多功能存储解决方案

步骤 3:验证是否已安装 EFS CSI 驱动程序插件

验证 EFS CSI 驱动程序管理的插件是否已正确安装并在您的 Amazon EKS 集群上处于激活状态。EFS CSI 驱动程序对于 Amazon EFS 与 Kubernetes 间的无缝协作至关重要。有了 EFS CSI 驱动程序,您可以将 EFS 文件系统作为批量工作负载的持久卷。

  • 检查 EFS CSI 驱动程序是否已安装:
eksctl get addon --cluster ${CLUSTER_NAME} --region ${CLUSTER_REGION} | grep efs

输出结果应如下所示:

aws-efs-csi-driver v1.5.8-eksbuild.1 ACTIVE 0

如果集群上还未安装EFS CSI 驱动程序插件,请参阅在 Amazon EKS 上使用 Amazon EFS CSI 实现可扩展的多功能存储解决方案

步骤 4:运行示例批处理应用程序

本小节将详细介绍示例批处理应用程序。我们使用一个基于 Python 的批处理应用程序作为一个示例,演示如何批量读取、处理和写入数据。它从 input.csv 文件中读取数据,完成随机化数据操作,并将处理后的数据写回到 output.csv 文件中。我们将介绍如何将此应用程序部署到 Amazon ECR 和 Amazon EKS。

  • 创建一个名为 batch_processing.py 的 Python 脚本文件并粘贴以下内容:
import csv
import time
import random

def read_csv(file_path):
 with open(file_path, 'r') as f:
 reader = csv.reader(f)
 data = [row for row in reader]
 return data

def write_csv(file_path, data):
 with open(file_path, 'w', newline='') as f:
 writer = csv.writer(f)
 writer.writerows(data)

def process_data(data):
 processed_data = [["ID", "Value", "ProcessedValue"]]
 for row in data[1:]:
 id, value = row
 processed_value = float(value) * random.uniform(0.8, 1.2)
 processed_data.append([id, value, processed_value])
 return processed_data

def batch_task():
 print("Starting batch task...")
 
 # Read data from CSV
 input_data = read_csv('input.csv')
 
 # Process data
 processed_data = process_data(input_data)
 
 # Write processed data to CSV
 write_csv('output.csv', processed_data)
 
 print("Batch task completed.")

if __name__ == "__main__":
 batch_task()
  • 在 Python 脚本所在的目录中,创建一个名为 input.csv 的文件并粘贴以下内容:
ID,Value
1,100.5
2,200.3
3,150.2
4,400.6
5,300.1
6,250.4
7,350.7
8,450.9
9,500.0
10,600.8
  • 运行该 Python 脚本:
`python3 batch_processing.py`

输出结果应如下所示:

`Starting batch task...`
`Batch task completed.`

此外,将生成一个包含处理后数据的文件 output.csv,其中包含一个增加的列,存储处理后的值:

ID,Value,ProcessedValue
1,100.5,101.40789448456849
2,200.3,202.2013222517103
3,150.2,139.82822974457673
4,400.6,470.8262553815611
5,300.1,253.4504054915937
6,250.4,219.48492376021267
7,350.7,419.3203869922816
8,450.9,495.56898757853986
9,500.0,579.256459785631
10,600.8,630.4063443182313

步骤 5:准备并部署批处理容器

本小节介绍如何构建一个容器并将其存储在私有 ECR 存储库中。本节将引导您完成将批处理应用程序打包到容器中,并上传到 Amazon Elastic Container Registry (ECR) 的过程。完成后,您将在一个私有 ECR 存储库中安全地存储一个 Docker 容器镜像,为在 EKS 上部署做好准备。

  • 在您创建其他文件的同一个目录中,创建一个 Dockerfile 文件并粘贴以下内容:
FROM python:3.8-slim

COPY batch_processing.py /
COPY input.csv /

CMD ["python", "/batch_processing.py"]
  • 构建 Docker 镜像:
docker build -t batch-processing-image .
  • 创建一个私有 Amazon ECR 存储库:
aws ecr create-repository --repository-name batch-processing-repo --region ${CLUSTER_REGION}
  • 在您的 Amazon ECR 注册表中认证 Docker CLI:
aws ecr get-login-password --region ${CLUSTER_REGION} | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${CLUSTER_REGION}.amazonaws.com
  • 标记容器镜像:
docker tag batch-processing-image:latest ${ACCOUNT_ID}.dkr.ecr.${CLUSTER_REGION}.amazonaws.com/batch-processing-repo:latest
  • 将已标记的镜像推送到 ECR 存储库:
docker push ${ACCOUNT_ID}.dkr.ecr.${CLUSTER_REGION}.amazonaws.com/batch-processing-repo:latest

步骤 6:创建多架构镜像

为了确保批处理应用程序可以部署在多种硬件架构中,例如 Kubernetes 集群内,需要创建一个多架构容器镜像。您可以使用 Docker 的 buildx 工具来构建镜像。完成以下操作后,您将成功构建一个多架构容器镜像,并将其到 Amazon ECR。然后,您可以使用该镜像在 Amazon EKS 集群上部署。

  • 为批处理服务创建并启动新的构建器实例:
docker buildx create --name batchBuilder
docker buildx use batchBuilder
docker buildx inspect --bootstrap
  • 构建批处理服务的镜像,并推送到 Amazon ECR:
docker buildx build --platform linux/amd64,linux/arm64 -t ${ACCOUNT_ID}.dkr.ecr.${CLUSTER_REGION}.amazonaws.com/batch-processing-repo:latest . --push
  • 确认该多架构镜像是否存在于 ECR 存储库中:
aws ecr list-images --repository-name batch-processing-repo --region ${CLUSTER_REGION}

步骤 7:部署 Kubernetes 作业

本小节介绍如何将容器化的批处理应用程序作为 Kubernetes 作业部署到 Amazon EKS 集群上。批处理任务已封装在容器中,并存储在私有 ECR 仓库中。现在,将部署到可管理、可扩缩的 EKS 环境中。

  • 获取您的 ECR 中的镜像 URL:
echo ${ACCOUNT_ID}.dkr.ecr.${CLUSTER_REGION}.amazonaws.com/batch-processing-repo:latest
  • 创建一个名为 batch-job.yaml 的 Kubernetes 作业清单文件,并粘贴以下内容。请务必将 image 值替换为您的镜像在 ECR 中的 URL。
apiVersion: batch/v1
kind: Job
metadata:
 name: my-batch-processing-job
spec:
 template:
 spec:
 serviceAccountName: ecr-sa 
 containers:
 - name: batch-processor
 image: 123456789012.dkr.ecr.us-west-1.amazonaws.com/batch-processing-repo:latest
 restartPolicy: Never
  • 将作业清单应用到您的 EKS 集群:
kubectl apply -f batch-job.yaml

输出结果应如下所示:

job.batch/batch-processing-job created
  • 查询作业执行情况:
kubectl get jobs
响应输出应显示作业已完成:
NAME COMPLETIONS DURATION AGE
my-batch-processing-job 1/1 8s 11s

步骤 8:授予批处理作业 Amazon SQS 访问权限

本小节介绍 Kubernetes 集群中批处理作业的调度工作,并使用 Amazon SQS 作为作业队列。此外,还将介绍如何扩展现有 Kubernetes 服务账户的权限。我们将注解 Amazon ECR 服务账户,授予其 Amazon SQS 访问权限,从而为批处理任务创建一个更加多功能和安全的环境。

  • 创建一个作为作业队列的 Amazon SQS 队列:
aws sqs create-queue --queue-name eks-batch-job-queue

输出结果应如下所示:保存队列的 URL 以供后续步骤使用。

{
 "QueueUrl": "https://sqs.us-west-1.amazonaws.com/123456789012/eks-batch-job-queue"
}
  • 为现有的 Amazon ECR 服务账户添加注解以授予 Amazon SQS 访问权限:
eksctl create iamserviceaccount \ 
 --region ${CLUSTER_REGION} \ 
 --cluster batch-quickstart \ 
 --namespace default \ 
 --name ecr-sa \ 
 --attach-policy-arn arn:aws:iam::aws:policy/AmazonSQSFullAccess \ 
 --override-existing-serviceaccounts \ 
 --approve

步骤 9:创建 Kubernetes 密钥

创建一个 Kubernetes 密钥。Pod 访问私有 Amazon ECR 仓库时需要使用该密钥。这样才能确保 Kubernetes 集群能从私有 ECR 仓库拉取所需的容器镜像。ECR 令牌有效期仅为 12 小时,因此还需确认这个 ECR 密钥是否能在 Pod 重启后仍然有效。Kubernetes 将在密钥过期前自动刷新密钥,确保能持续访问该私有 ECR 仓库。

  • 生成一个 Amazon ECR 授权令牌:
ECR_TOKEN=$(aws ecr get-login-password --region ${CLUSTER_REGION})
  • 在 default 命名空间创建名为 regcred 的 Kubernetes 密钥:
kubectl create secret docker-registry regcred \ 
--docker-server=${ACCOUNT_ID}.dkr.ecr.${CLUSTER_REGION}.amazonaws.com \ 
--docker-username=AWS \ 
--docker-password="${ECR_TOKEN}" \ 
-n default

输出结果应如下所示:

secret/regcred created

步骤 10:部署集成了 SOS 队列的 Kubernetes 作业

本小节介绍如何调度一个集成了 Amazon SQS 队列的 Kubernetes 作业。这个集成作业可以以分布式和可扩展的方式处理批处理任务。使用 Amazon SQS,可以解耦云应用程序的组件,从而提高可扩缩性和可靠性。首先,创建一个包含 SQS 队列 URL 环境变量的 Kubernetes 作业清单。这使批处理应用程序能够与 SQS 队列交互,消费消息并可能触发更复杂的工作流程。

  • 创建一个名为 batch-job-queue.yaml 的 Kubernetes 作业清单文件,并粘贴以下内容。将 image 值替换为您的镜像在 ECR 中的 URL,并将 value 替换为您的 SQS 队列 URL。
apiVersion: batch/v1
kind: Job
metadata:
 name: batch-processing-job-with-queue
spec:
 template:
 spec:
 containers:
 - name: batch-processor
 image: 123456789012.dkr.ecr.us-west-1.amazonaws.com/batch-processing-repo:latest
 env:
 - name: SQS_QUEUE_URL
 value: "https://sqs.us-west-1.amazonaws.com/123456789012/eks-batch-job-queue"
 restartPolicy: Never
 serviceAccountName: ecr-sa
 imagePullSecrets:
 - name: regcred
  • 将任务清单应用到您的 EKS 集群:
kubectl apply -f batch-job-queue.yaml

输出结果应如下所示:

job.batch/batch-processing-job-with-queue created
  • 查询作业执行情况:
kubectl get jobs

当作业完成后,输出中显示为完成状态:

NAME COMPLETIONS DURATION AGE
batch-processing-job-with-queue 1/1 8s 13s
my-batch-processing-job 1/1 8s 16m

恭喜您!您已成功地部署了一个集成 Amazon SQS 任务队列的批处理任务到您的 Amazon EKS 集群。这样的设置让您能够更有效地管理和扩展您的批处理任务,充分利用 Amazon EKS 和亚马逊云科技服务的全部能力。

步骤 11:为 EFS 创建持久存储卷和持久存储卷声明

本小节介绍如何创建使用 EFS 存储类的持久存储卷 (PV) 和持久存储声明 (PVC),用于为 Kubernetes 作业提供一个持久的存储层。这需要在之前的教程 在 Amazon EKS 上使用 Amazon EFS CSI 实现可扩展的多功能存储解决方案的基础上进行的。在这个解决方案中,我们设置了 EFS URL 的环境变量。

  • 回显并保存您的 EFS URL 以供下一步使用:
echo $FILE_SYSTEM_ID.efs.$CLUSTER_REGION.amazonaws.com
  • 创建一个名为 batch-pv-pvc.yaml 的 YAML 文件,并粘贴以下内容。将 server 值替换为您的 EFS URL。
apiVersion: v1
kind: PersistentVolume
metadata:
 name: efs-pv
spec:
 capacity:
 storage: 5Gi
 volumeMode: Filesystem
 accessModes:
 - ReadWriteMany
 persistentVolumeReclaimPolicy: Retain
 storageClassName: efs-sc
 nfs:
 path: /
 server: fs-0ff53d77cb74d6474.efs.us-east-1.amazonaws.com
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
 name: efs-claim
spec:
 storageClassName: efs-sc
 accessModes:
 - ReadWriteMany
 resources:
 requests:
 storage: 5Gi
  • 将 PV 和 PVC 应用到您的 Kubernetes 集群:
kubectl apply -f batch-pv-pvc.yaml

输出结果应如下所示:

persistentvolume/efs-pv created
persistentvolumeclaim/efs-claim created

步骤 12:使用 Amazon EFS 实现持久存储

本小节介绍如何优化 Kubernetes 作业,使用 Amazon EFS 进行持久化存储。在 Amazon EKS 上使用 Amazon EFS CSI 实现可扩展的多功能存储解决方案中,您已经设置了基于 EFS 的存储类。现在,在现有的作业清单中添加一个持久卷声明。由于作业的不可变特性,还需要采用版本控制策略。需要创建不同名称但规格相似的新作业,而不是直接更新现有作业。这样,可以通过标签和注解跟踪历史版本和管理版本。

  • 创建一个名为 update-batch-job.yaml 的 Kubernetes 作业清单文件,并粘贴以下内容。请务必将 image 值替换为您的镜像在 ECR 中的 URL。
apiVersion: batch/v1
kind: Job
metadata:
 name: new-batch-job
 namespace: default
spec:
 template:
 spec:
 serviceAccountName: ecr-sa
 containers:
 - name: batch-processor
 image: 123456789012.dkr.ecr.us-west-1.amazonaws.com/batch-processing-repo:latest
 volumeMounts:
 - name: efs-volume
 mountPath: /efs
 volumes:
 - name: efs-volume
 persistentVolumeClaim:
 claimName: efs-claim
 restartPolicy: OnFailure
  • 将作业清单应用到您的 EKS 集群:
kubectl apply -f update-batch-job.yaml
  • 创建一个名为 update-batch-job-queue.yaml 的 Kubernetes 作业队列清单文件,并粘贴以下内容。将 image 值替换为您的镜像在 ECR 中的 URL,并将 value 替换为您的 SQS 队列 URL。
apiVersion: batch/v1
kind: Job
metadata:
 name: new-batch-processing-job-queue
 namespace: default
spec:
 template:
 spec:
 containers:
 - name: batch-processor
 image: 123456789012.dkr.ecr.us-west-1.amazonaws.com/batch-processing-repo:latest
 env:
 - name: SQS_QUEUE_URL
 value: "https://sqs.us-west-1.amazonaws.com/123456789012/eks-batch-job-queue"
 volumeMounts:
 - name: efs-volume
 mountPath: /efs
 volumes:
 - name: efs-volume
 persistentVolumeClaim:
 claimName: efs-claim
 restartPolicy: OnFailure
 serviceAccountName: ecr-sa
 imagePullSecrets:
 - name: regcred
  • 将作业队列清单应用到您的 EKS 集群:
kubectl apply -f update-batch-job-queue.yaml

查看作业日志,了解批处理任务执行情况:

kubectl logs -f new-batch-job-k267b

输出结果应如下所示:

Starting batch task...
Batch task completed.

清理资源

为更好的管理资源,建议您完成试验后及时删除试验过程中创建的资源。

# Delete the SQS Queue
aws sqs delete-queue --queue-url YOUR_SQS_QUEUE_URL

# Delete the ECR Repository
aws ecr delete-repository --repository-name YOUR_ECR_REPO_NAME --force

如果您喜欢本教程、发现任何问题或者想给我们提供意见反馈,请发送反馈意见我们

总结

您已成功地使用 Amazon SQS 和 Amazon EFS 在 Amazon EKS 集群中调度批处理任务!您不仅集成了一个强大的 SQS 作业队列,还利用 EFS CSI 驱动程序在多个节点间实现了持久存储。您完成了 Amazon EKS 集群设置,部署了基于 Python 的批处理应用程序,以及在 Amazon ECR 中实现该应用的容器化和存储。您还创建了多架构镜像,并基于多架构镜像部署 Kubernetes 作业。此外,通过将 Kubernetes 作业与 Amazon SQS 集成,并通过 Amazon EFS 提供持久存储来拓展了作业的能力。

要继续您的学习旅程,请设置 Cluster Autoscaler 实现动态扩缩,或启用 EFS Lifecycle Management 自动切换文件性能类别,或启用 EFS Intelligent-Tiering 来优化成本。