亚马逊AWS官方博客

使用 AWS Batch 多容器作业运行大规模模拟

汽车、机器人和金融等行业越来越多地实施模拟、机器学习(ML)模型训练和大数据分析等计算工作负载,以改进其产品。例如,汽车制造商依靠模拟来测试自动驾驶功能,机器人公司训练机器学习算法以增强机器人感知能力,金融公司进行深入分析以更好地管理风险、处理交易和检测欺诈行为。

其中一些工作负载(包括模拟),由于其组件的多样性和密集的计算需求,运行起来特别复杂。例如,驾驶模拟涉及生成三维虚拟环境、车辆传感器数据、控制汽车行为的车辆动力学等。机器人模拟可能会测试数百个自主交付机器人,这些机器人在大型仓库环境中相互交互并与其他系统交互。

AWS Batch 是一项完全托管的服务,可以帮助您在一系列 AWS 计算产品中运行批量工作负载,包括 Amazon Elastic Container Service(Amazon ECS)Amazon Elastic Kubernetes Service(Amazon EKS)AWS FargateAmazon EC2 竞价型按需型实例。传统上,AWS Batch 仅允许单容器任务,并且需要执行额外的步骤才能将所有组件合并到一个单一容器中。它还不允许使用单独的“边车”容器。这些容器是辅助容器,通过提供数据记录等额外服务来补充主应用程序。这项额外工作需要多个团队之间协作,例如软件开发、IT 运营和质量保证(QA),因为任何代码更改都意味着需要重建整个容器。

现在,AWS Batch 提供多容器任务,使在自动驾驶汽车和机器人等领域运行大规模模拟变得更加轻松快捷。这些工作负载通常分为模拟本身,以及与模拟交互的被测系统(又称为代理)。这两个组件通常由不同的团队开发和优化。由于能够为每项作业运行多个容器,因此,您可以获得 AWS Batch 提供的高级扩展、计划和成本优化,还可以使用代表不同组件(如 3D 环境、机器人传感器或监控边车)的模块化容器。实际上,IPG AutomotiveMORAIRobotec.ai 等客户已经在使用 AWS Batch 多容器任务在云端运行他们的模拟软件。

我们用一个简单的示例来了解它在实践中的工作原理,并在解开这个迷宫的过程中找到一些乐趣。

构建在容器上运行的模拟
在生产中,您可能会使用现有的模拟软件。在这篇博文中,我构建了一个简化版本的模拟代理/模型。如果您对代码详情不感兴趣,可以跳过这部分,直接转到有关如何配置 AWS Batch 的内容。

在这个模拟中,要探索的世界是一个随机生成的二维迷宫。代理的任务是探索迷宫找到钥匙,然后到达出口。从某种意义上说,这是包含三个位置的寻路问题的典型示例。

这是迷宫的地图示例,我在地图中突出显示了起点(S)、终点(E)和关键(K)位置。

ASCII 迷宫地图示例。

将代理和模型分别放在两个单独的容器中,这样可以让不同的团队分别处理每个容器。每个团队都可以专注于改进自己的工作,例如,为模拟添加细节,或为代理探索迷宫找到更好的策略。

这是迷宫模型的代码(app.py)。我在这两个示例中都使用了 Python。此模型展示了一个 REST API,代理可以使用它在迷宫中移动,并知道是否找到了密钥并到达了出口。迷宫模型使用 Flask 来实现此 REST API。

import json
import random
from flask import Flask, request, Response

ready = False

# How map data is stored inside a maze
# with size (width x height) = (4 x 3)
#
#    012345678
# 0: +-+-+ +-+
# 1: | |   | |
# 2: +-+ +-+-+
# 3: | |   | |
# 4: +-+-+ +-+
# 5: | | | | |
# 6: +-+-+-+-+
# 7: Not used

class WrongDirection(Exception):
    pass

class Maze:
    UP, RIGHT, DOWN, LEFT = 0, 1, 2, 3
    OPEN, WALL = 0, 1
    

    @staticmethod
    def distance(p1, p2):
        (x1, y1) = p1
        (x2, y2) = p2
        return abs(y2-y1) + abs(x2-x1)


    @staticmethod
    def random_dir():
        return random.randrange(4)


    @staticmethod
    def go_dir(x, y, d):
        if d == Maze.UP:
            return (x, y - 1)
        elif d == Maze.RIGHT:
            return (x + 1, y)
        elif d == Maze.DOWN:
            return (x, y + 1)
        elif d == Maze.LEFT:
            return (x - 1, y)
        else:
            raise WrongDirection(f"Direction: {d}")


    def __init__(self, width, height):
        self.width = width
        self.height = height        
        self.generate()
        

    def area(self):
        return self.width * self.height
        

    def min_lenght(self):
        return self.area() / 5
    

    def min_distance(self):
        return (self.width + self.height) / 5
    

    def get_pos_dir(self, x, y, d):
        if d == Maze.UP:
            return self.maze[y][2 * x + 1]
        elif d == Maze.RIGHT:
            return self.maze[y][2 * x + 2]
        elif d == Maze.DOWN:
            return self.maze[y + 1][2 * x + 1]
        elif d ==  Maze.LEFT:
            return self.maze[y][2 * x]
        else:
            raise WrongDirection(f"Direction: {d}")


    def set_pos_dir(self, x, y, d, v):
        if d == Maze.UP:
            self.maze[y][2 * x + 1] = v
        elif d == Maze.RIGHT:
            self.maze[y][2 * x + 2] = v
        elif d == Maze.DOWN:
            self.maze[y + 1][2 * x + 1] = v
        elif d ==  Maze.LEFT:
            self.maze[y][2 * x] = v
        else:
            WrongDirection(f"Direction: {d}  Value: {v}")


    def is_inside(self, x, y):
        return 0 <= y < self.height and 0 <= x < self.width


    def generate(self):
        self.maze = []
        # Close all borders
        for y in range(0, self.height + 1):
            self.maze.append([Maze.WALL] * (2 * self.width + 1))
        # Get a random starting point on one of the borders
        if random.random() < 0.5:
            sx = random.randrange(self.width)
            if random.random() < 0.5:
                sy = 0
                self.set_pos_dir(sx, sy, Maze.UP, Maze.OPEN)
            else:
                sy = self.height - 1
                self.set_pos_dir(sx, sy, Maze.DOWN, Maze.OPEN)
        else:
            sy = random.randrange(self.height)
            if random.random() < 0.5:
                sx = 0
                self.set_pos_dir(sx, sy, Maze.LEFT, Maze.OPEN)
            else:
                sx = self.width - 1
                self.set_pos_dir(sx, sy, Maze.RIGHT, Maze.OPEN)
        self.start = (sx, sy)
        been = [self.start]
        pos = -1
        solved = False
        generate_status = 0
        old_generate_status = 0                    
        while len(been) < self.area():
            (x, y) = been[pos]
            sd = Maze.random_dir()
            for nd in range(4):
                d = (sd + nd) % 4
                if self.get_pos_dir(x, y, d) != Maze.WALL:
                    continue
                (nx, ny) = Maze.go_dir(x, y, d)
                if (nx, ny) in been:
                    continue
                if self.is_inside(nx, ny):
                    self.set_pos_dir(x, y, d, Maze.OPEN)
                    been.append((nx, ny))
                    pos = -1
                    generate_status = len(been) / self.area()
                    if generate_status - old_generate_status > 0.1:
                        old_generate_status = generate_status
                        print(f"{generate_status * 100:.2f}%")
                    break
                elif solved or len(been) < self.min_lenght():
                    continue
                else:
                    self.set_pos_dir(x, y, d, Maze.OPEN)
                    self.end = (x, y)
                    solved = True
                    pos = -1 - random.randrange(len(been))
                    break
            else:
                pos -= 1
                if pos < -len(been):
                    pos = -1
                    
        self.key = None
        while(self.key == None):
            kx = random.randrange(self.width)
            ky = random.randrange(self.height)
            if (Maze.distance(self.start, (kx,ky)) > self.min_distance()
                and Maze.distance(self.end, (kx,ky)) > self.min_distance()):
                self.key = (kx, ky)


    def get_label(self, x, y):
        if (x, y) == self.start:
            c = 'S'
        elif (x, y) == self.end:
            c = 'E'
        elif (x, y) == self.key:
            c = 'K'
        else:
            c = ' '
        return c

                    
    def map(self, moves=[]):
        map = ''
        for py in range(self.height * 2 + 1):
            row = ''
            for px in range(self.width * 2 + 1):
                x = int(px / 2)
                y = int(py / 2)
                if py % 2 == 0: #Even rows
                    if px % 2 == 0:
                        c = '+'
                    else:
                        v = self.get_pos_dir(x, y, self.UP)
                        if v == Maze.OPEN:
                            c = ' '
                        elif v == Maze.WALL:
                            c = '-'
                else: # Odd rows
                    if px % 2 == 0:
                        v = self.get_pos_dir(x, y, self.LEFT)
                        if v == Maze.OPEN:
                            c = ' '
                        elif v == Maze.WALL:
                            c = '|'
                    else:
                        c = self.get_label(x, y)
                        if c == ' ' and [x, y] in moves:
                            c = '*'
                row += c
            map += row + '\n'
        return map


app = Flask(__name__)

@app.route('/')
def hello_maze():
    return "<p>Hello, Maze!</p>"

@app.route('/maze/map', methods=['GET', 'POST'])
def maze_map():
    if not ready:
        return Response(status=503, retry_after=10)
    if request.method == 'GET':
        return '<pre>' + maze.map() + '</pre>'
    else:
        moves = request.get_json()
        return maze.map(moves)

@app.route('/maze/start')
def maze_start():
    if not ready:
        return Response(status=503, retry_after=10)
    start = { 'x': maze.start[0], 'y': maze.start[1] }
    return json.dumps(start)

@app.route('/maze/size')
def maze_size():
    if not ready:
        return Response(status=503, retry_after=10)
    size = { 'width': maze.width, 'height': maze.height }
    return json.dumps(size)

@app.route('/maze/pos/<int:y>/<int:x>')
def maze_pos(y, x):
    if not ready:
        return Response(status=503, retry_after=10)
    pos = {
        'here': maze.get_label(x, y),
        'up': maze.get_pos_dir(x, y, Maze.UP),
        'down': maze.get_pos_dir(x, y, Maze.DOWN),
        'left': maze.get_pos_dir(x, y, Maze.LEFT),
        'right': maze.get_pos_dir(x, y, Maze.RIGHT),

    }
    return json.dumps(pos)


WIDTH = 80
HEIGHT = 20
maze = Maze(WIDTH, HEIGHT)
ready = True

迷宫模型的唯一要求(在 requirements.txt 中)是具有 Flask 模块。

为了创建运行迷宫模型的容器镜像,我使用以下 Dockerfile

FROM --platform=linux/amd64 public.ecr.aws/docker/library/python:3.12-alpine

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY . .

CMD [ "python3", "-m" , "flask", "run", "--host=0.0.0.0", "--port=5555"]

以下是代理的代码(agent.py)。首先,代理向模型询问迷宫的大小和起始位置。然后,它运用自己的策略来探索和解开迷宫。在此实现中,代理随机选择路径,尽量避免多次遵循相同的路径。

import random
import requests
from requests.adapters import HTTPAdapter, Retry

HOST = '127.0.0.1'
PORT = 5555

BASE_URL = f"http://{HOST}:{PORT}/maze"

UP, RIGHT, DOWN, LEFT = 0, 1, 2, 3
OPEN, WALL = 0, 1

s = requests.Session()

retries = Retry(total=10,
                backoff_factor=1)

s.mount('http://', HTTPAdapter(max_retries=retries))

r = s.get(f"{BASE_URL}/size")
size = r.json()
print('SIZE', size)

r = s.get(f"{BASE_URL}/start")
start = r.json()
print('START', start)

y = start['y']
x = start['x']

found_key = False
been = set((x, y))
moves = [(x, y)]
moves_stack = [(x, y)]

while True:
    r = s.get(f"{BASE_URL}/pos/{y}/{x}")
    pos = r.json()
    if pos['here'] == 'K' and not found_key:
        print(f"({x}, {y}) key found")
        found_key = True
        been = set((x, y))
        moves_stack = [(x, y)]
    if pos['here'] == 'E' and found_key:
        print(f"({x}, {y}) exit")
        break
    dirs = list(range(4))
    random.shuffle(dirs)
    for d in dirs:
        nx, ny = x, y
        if d == UP and pos['up'] == 0:
            ny -= 1
        if d == RIGHT and pos['right'] == 0:
            nx += 1
        if d == DOWN and pos['down'] == 0:
            ny += 1
        if d == LEFT and pos['left'] == 0:
            nx -= 1 

        if nx < 0 or nx >= size['width'] or ny < 0 or ny >= size['height']:
            continue

        if (nx, ny) in been:
            continue

        x, y = nx, ny
        been.add((x, y))
        moves.append((x, y))
        moves_stack.append((x, y))
        break
    else:
        if len(moves_stack) > 0:
            x, y = moves_stack.pop()
        else:
            print("No moves left")
            break

print(f"Solution length: {len(moves)}")
print(moves)

r = s.post(f'{BASE_URL}/map', json=moves)

print(r.text)

s.close()

代理的唯一依赖项(位于 requirements.txt 中)是 requests 模块。

我使用下面的 Dockerfile 来为代理创建容器镜像。

FROM --platform=linux/amd64 public.ecr.aws/docker/library/python:3.12-alpine

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY . .

CMD [ "python3", "agent.py"]

您可以在本地轻松运行这个简化版本的模拟,但在云端,您可以更大规模地运行(例如,使用更大、更详细的迷宫),并测试多个代理以找到最佳的使用策略。在现实场景中,对代理的改进将实施到物理设备中,例如自动驾驶汽车或机器人吸尘器。

使用多容器作业运行模拟
要使用 AWS Batch 运行作业,我需要配置三个资源:

  • 运行作业的计算环境
  • 提交作业的作业队列
  • 描述如何运行作业的作业定义,包括要使用的容器镜像

AWS Batch 控制台中,我从导航窗格中选择计算环境,然后选择创建。目前,我可以选择使用 Fargate、Amazon EC2 或 Amazon EKS。Fargate 需要严格满足在作业定义中指定的资源要求。然而,模拟通常需要访问大量静态的资源,并使用 GPU 加速计算。为此,我选择使用 Amazon EC2

控制台屏幕截图。

我选择托管编排类型,以便 AWS Batch 可以扩展和配置 EC2 实例。然后,我输入计算环境的名称,选择服务相关角色(AWS Batch 之前给我创建的角色)和 ECS 容器代理(在 EC2 实例上运行)使用的实例角色,以便替我调用 AWS API。我选择下一步

控制台屏幕截图。

实例配置设置中,我选择 EC2 实例的大小和类型。例如,我可以选择具有 GPU 或使用 Graviton 处理器的实例类型。我没有具体要求,将所有设置保留为默认值。对于网络配置,控制台已经选择了我的默认 VPC默认安全组。最后,我检查所有配置并完成计算环境的创建。

现在,我从导航窗格中选择作业队列,然后选择创建。然后,选择用于计算环境的相同编排类型 (Amazon EC2)。在作业队列配置中,我输入作业队列的名称。在连接的计算环境下拉列表中,我选择刚刚创建的计算环境并完成队列的创建。

控制台屏幕截图。

我从导航窗格中选择作业定义,然后选择创建。和之前一样,我选择 Amazon EC2 作为编排类型。

为了使用多个容器,我禁用了使用旧版 containerProperties 结构选项,然后继续执行下一步。默认情况下,如果账户中已经有旧版作业定义,则控制台会创建旧版单容器作业定义。我的情况就是这样。对于没有旧版作业定义的账户,控制台会禁用此选项。

控制台屏幕截图。

我输入作业定义的名称。然后,我必须考虑此作业所需的权限。我要用于此作业的容器镜像存储在 Amazon ECR 私有存储库中。为了让 AWS Batch 将这些镜像下载到计算环境中,在任务属性部分中,我选择了执行角色,以便可以只读访问 ECR 存储库。我不需要配置任务角色,因为模拟代码不调用 AWS API。例如,如果我的代码要将结果上传到 Amazon Simple Storage Service(Amazon S3)存储桶,我可以在此处选择一个授予相应权限的角色。

在下一步中,我将配置此作业使用的两个容器。第一个容器是 maze-model。我输入容器名称和镜像位置。在这里,我可以指定容器在 vCPU、内存和 GPU 方面的资源要求。这类似于为 ECS 任务配置容器。

控制台屏幕截图。

我为代理添加第二个容器,然后和之前一样输入容器名称、镜像位置和资源要求。由于代理需要在启动后立即访问迷宫,因此我在依赖项部分中添加容器依赖项。我在容器名称下选择 maze-model,并在条件下选择 START。如果我不添加此依赖项,agent 容器可能会在 maze-model 容器运行并能够响应之前失败。由于这两个容器在此作业定义中都标记为主要容器,因此整个作业将以失败告终。

控制台屏幕截图。

我检查所有配置并完成作业定义。下面,我可以启动作业了。

在导航窗格的作业部分中,我提交一个新作业。我输入作业名称,然后选择刚刚创建的作业队列和作业定义。

控制台屏幕截图。

在接下来的步骤中,我不需要覆盖任何配置,直接创建作业即可。几分钟后,作业创建成功,我可以访问两个容器的日志。

控制台屏幕截图。

代理解开了迷宫,我可以从日志中了解所有详细信息。以下是作业的输出,可查看代理如何开始、获得密钥,然后找到出口。

SIZE {'width': 80, 'height': 20}
START {'x': 0, 'y': 18}
(32, 2) key found
(79, 16) exit
Solution length: 437
[(0, 18), (1, 18), (0, 18), ..., (79, 14), (79, 15), (79, 16)]

在下图中,红色星号(*)表示代理在起点(S)、密钥(K)和出口(E)位置之间使用的路径。

已解开迷宫的 ASCII 图。

使用“边车”容器提高可观测性
使用多个组件运行复杂的作业时,有助于更好地了解这些组件的作用。例如,如果出现错误或性能问题,这些信息可以帮助您找到问题所在。

为了检测我的应用程序,我使用适用于 OpenTelemetry 的 AWS Distro

使用以这种方式收集的遥测数据,我可以设置控制面板(例如,使用 CloudWatch 或 Amazon Managed Grafana)和警报(使用 CloudWatch 或 Prometheus),从而帮助我更好地了解正在发生的情况并缩短解决问题的时间。总体来说,“边车”容器可以帮助将来自 AWS Batch 作业的遥测数据与您的监控和可观测性平台相集成。

注意事项
目前,在所有提供 Batch 的 AWS 区域中,都已在 AWS 管理控制台AWS 命令行界面(AWS CLI)AWS SDK 中推出了 AWS Batch 对多容器作业的支持。有关更多信息,请参阅 AWS 区域服务列表

在 AWS Batch 中使用多容器作业不会产生额外费用。实际上,使用 AWS Batch 就不会产生额外费用。您只需为存储和运行应用程序而创建的 EC2 实例和 Fargate 容器等 AWS 资源付费。要优化成本,您可以在计算环境中使用预留实例节省计划、EC2 竞价型实例和 Fargate。

使用多容器作业可以减少作业准备工作,从而缩短开发时间,并且无需使用自定义工具将多个团队的工作合并到一个容器中。另外,还可以定义明确的组件职责,从而简化 DevOps,这样团队就可以快速识别和修复自己专业领域的问题,而不会分散注意力。

要了解更多信息,请参阅 AWS Batch 用户指南中的如何设置多容器作业。

Danilo