Amazon Web Services ブログ

AWS Batch マルチコンテナジョブを使用して大規模なシミュレーションを実行する

自動車、ロボット工学、金融などの業界では、製品を改善するために、シミュレーション、機械学習 (ML) モデルのトレーニング、ビッグデータ分析などの計算ワークロードの実装が増加の一途をたどっています。例えば、自動車メーカーはシミュレーションに依拠して自動運転機能をテストし、ロボット工学分野の企業は ML アルゴリズムをトレーニングしてロボットの認識機能を強化しているほか、金融業界の企業は詳細な分析を実行して、リスク管理、取引処理、不正行為の検出を改善しています。

シミュレーションを含むこれらのワークロードの一部は、コンポーネントの多様性と厳しい計算要件により、実行が特に複雑です。例えば、運転シミュレーションには、3D 仮想環境、車両センサーデータ、車両の挙動を制御する車両ダイナミクスなどの生成が伴います。ロボット工学シミュレーションでは、大規模な倉庫環境で、相互に、および他のシステムとインタラクションする何百もの自律型配送ロボットをテストする可能性があります。

AWS Batch は、Amazon Elastic Container Service (Amazon ECS)Amazon Elastic Kubernetes Service (Amazon EKS)AWS Fargate、および Amazon EC2 スポットまたはオンデマンドインスタンスなど、さまざまな AWS コンピューティングサービスにわたってバッチワークロードを実行するのに役立つフルマネージドサービスです。従来、AWS Batch では単一コンテナのジョブのみが許可されており、すべてのコンポーネントをモノリシックコンテナにマージするには追加のステップが必要でした。また、データログ記録などの追加サービスを提供することでメインアプリケーションを補完する補助コンテナである、別個の「サイドカー」コンテナも使用できませんでした。コードの変更はコンテナ全体の再構築を意味するため、この追加の作業には、ソフトウェア開発、IT 運用、品質保証 (QA) などの複数のチームにわたる調整が必要でした。

AWS Batch はマルチコンテナジョブを提供するようになりました。これにより、自動運転車やロボット工学などの分野で大規模なシミュレーションをより簡単かつ迅速に実行できます。これらのワークロードは通常、シミュレーション自体と、シミュレーションとインタラクションするテスト対象のシステム (エージェントとも呼ばれます) に分割されます。これらの 2 つのコンポーネントは、多くの場合、異なるチームによって開発および最適化されます。ジョブごとに複数のコンテナを実行できるため、AWS Batch が提供する高度なスケーリング、スケジューリング、コストの最適化を利用できるほか、3D 環境、ロボットセンサー、モニタリングサイドカーなどのさまざまなコンポーネントを表すモジュール式コンテナを使用できます。実際、IPG AutomotiveMORAIRobotec.ai などのお客様は既に AWS Batch マルチコンテナジョブを使用してクラウドでシミュレーションソフトウェアを実行しています。

簡単な例を使用して実際にこれがどのように機能するかを見て、迷路を解くのを楽しんでみましょう。

コンテナ上で実行されるシミュレーションの構築
本番環境では、おそらく既存のシミュレーションソフトウェアを使用することになります。この記事では、エージェント/モデルシミュレーションの簡易バージョンを構築しました。コードの詳細に興味がない場合は、このセクションを飛ばして、AWS Batch の設定方法にお進みください。

このシミュレーションでは、探索する世界はランダムに生成された 2D の迷路です。エージェントには、迷路を探索して鍵を見つけ、出口に到達するというタスクが課されています。ある意味、これは 3 つの場所がある経路探索問題の典型的な例です。

これは、開始 (S)、終了 (E)、およびキー (K) の位置を示した迷路のサンプルマップです。

ASCII の迷路マップのサンプル。

エージェントとモデルを 2 つの別々のコンテナに分離することで、異なるチームがそれぞれを個別に作業できるようになります。各チームは、シミュレーションに詳細を追加したり、エージェントが迷路を探索する方法についてより優れた戦略を見つけたりするなど、自分の担当部分の改善に集中できます。

迷路モデルのコードは次のとおりです (app.py)。どちらの例でも Python を使用しました。このモデルは、エージェントが迷路内を移動したり、鍵を見つけて出口に到達したかどうかを把握したりするために使用できる REST API を公開します。迷路モデルは REST API に Flask を使用します。

import json
import random
from flask import Flask, request, Response

ready = False

# マップデータを迷路内に保存する方法
# サイズ指定あり (幅 x 高さ) = (4 x 3)
#
#    012345678
# 0: +-+-+ +-+
# 1: | |   | |
# 2: +-+ +-+-+
# 3: | |   | |
# 4: +-+-+ +-+
# 5: | | | | |
# 6: +-+-+-+-+
# 7: 未使用

class WrongDirection(Exception):
    pass

class Maze:
    UP, RIGHT, DOWN, LEFT = 0, 1, 2, 3
    OPEN, WALL = 0, 1
    

    @staticmethod
    def distance(p1, p2):
        (x1, y1) = p1
        (x2, y2) = p2
        return abs(y2-y1) + abs(x2-x1)


    @staticmethod
    def random_dir():
        return random.randrange(4)


    @staticmethod
    def go_dir(x, y, d):
        if d == Maze.UP:
            return (x, y - 1)
        elif d == Maze.RIGHT:
            return (x + 1, y)
        elif d == Maze.DOWN:
            return (x, y + 1)
        elif d == Maze.LEFT:
            return (x - 1, y)
        else:
            raise WrongDirection(f"Direction: {d}")


    def __init__(self, width, height):
        self.width = width
        self.height = height        
        self.generate()
        

    def area(self):
        return self.width * self.height
        

    def min_lenght(self):
        return self.area() / 5
    

    def min_distance(self):
        return (self.width + self.height) / 5
    

    def get_pos_dir(self, x, y, d):
        if d == Maze.UP:
            return self.maze[y][2 * x + 1]
        elif d == Maze.RIGHT:
            return self.maze[y][2 * x + 2]
        elif d == Maze.DOWN:
            return self.maze[y + 1][2 * x + 1]
        elif d ==  Maze.LEFT:
            return self.maze[y][2 * x]
        else:
            raise WrongDirection(f"Direction: {d}")


    def set_pos_dir(self, x, y, d, v):
        if d == Maze.UP:
            self.maze[y][2 * x + 1] = v
        elif d == Maze.RIGHT:
            self.maze[y][2 * x + 2] = v
        elif d == Maze.DOWN:
            self.maze[y + 1][2 * x + 1] = v
        elif d ==  Maze.LEFT:
            self.maze[y][2 * x] = v
        else:
            WrongDirection(f"Direction: {d}  Value: {v}")


    def is_inside(self, x, y):
        return 0 <= y < self.height and 0 <= x < self.width


    def generate(self):
        self.maze = []
        # すべての境界を閉じます
        for y in range(0, self.height + 1):
            self.maze.append([Maze.WALL] * (2 * self.width + 1))
        # いずれかの境界線でランダムな開始点を取得します
        if random.random() < 0.5:
            sx = random.randrange(self.width)
            if random.random() < 0.5:
                sy = 0
                self.set_pos_dir(sx, sy, Maze.UP, Maze.OPEN)
            else:
                sy = self.height - 1
                self.set_pos_dir(sx, sy, Maze.DOWN, Maze.OPEN)
        else:
            sy = random.randrange(self.height)
            if random.random() < 0.5:
                sx = 0
                self.set_pos_dir(sx, sy, Maze.LEFT, Maze.OPEN)
            else:
                sx = self.width - 1
                self.set_pos_dir(sx, sy, Maze.RIGHT, Maze.OPEN)
        self.start = (sx, sy)
        been = [self.start]
        pos = -1
        solved = False
        generate_status = 0
        old_generate_status = 0                    
        while len(been) < self.area():
            (x, y) = been[pos]
            sd = Maze.random_dir()
            for nd in range(4):
                d = (sd + nd) % 4
                if self.get_pos_dir(x, y, d) != Maze.WALL:
                    continue
                (nx, ny) = Maze.go_dir(x, y, d)
                if (nx, ny) in been:
                    continue
                if self.is_inside(nx, ny):
                    self.set_pos_dir(x, y, d, Maze.OPEN)
                    been.append((nx, ny))
                    pos = -1
                    generate_status = len(been) / self.area()
                    if generate_status - old_generate_status > 0.1:
                        old_generate_status = generate_status
                        print(f"{generate_status * 100:.2f}%")
                    break
                elif solved or len(been) < self.min_lenght():
                    continue
                else:
                    self.set_pos_dir(x, y, d, Maze.OPEN)
                    self.end = (x, y)
                    solved = True
                    pos = -1 - random.randrange(len(been))
                    break
            else:
                pos -= 1
                if pos < -len(been):
                    pos = -1
                    
        self.key = None
        while(self.key == None):
            kx = random.randrange(self.width)
            ky = random.randrange(self.height)
            if (Maze.distance(self.start, (kx,ky)) > self.min_distance()
                and Maze.distance(self.end, (kx,ky)) > self.min_distance()):
                self.key = (kx, ky)


    def get_label(self, x, y):
        if (x, y) == self.start:
            c = 'S'
        elif (x, y) == self.end:
            c = 'E'
        elif (x, y) == self.key:
            c = 'K'
        else:
            c = ' '
        return c

                    
    def map(self, moves=[]):
        map = ''
        for py in range(self.height * 2 + 1):
            row = ''
            for px in range(self.width * 2 + 1):
                x = int(px / 2)
                y = int(py / 2)
                if py % 2 == 0: # 偶数行
                    if px % 2 == 0:
                        c = '+'
                    else:
                        v = self.get_pos_dir(x, y, self.UP)
                        if v == Maze.OPEN:
                            c = ' '
                        elif v == Maze.WALL:
                            c = '-'
                else: # 奇数行
                    if px % 2 == 0:
                        v = self.get_pos_dir(x, y, self.LEFT)
                        if v == Maze.OPEN:
                            c = ' '
                        elif v == Maze.WALL:
                            c = '|'
                    else:
                        c = self.get_label(x, y)
                        if c == ' ' and [x, y] in moves:
                            c = '*'
                row += c
            map += row + '\n'
        return map


app = Flask(__name__)

@app.route('/')
def hello_maze():
    return "<p>Hello, Maze!</p>"

@app.route('/maze/map', methods=['GET', 'POST'])
def maze_map():
    if not ready:
        return Response(status=503, retry_after=10)
    if request.method == 'GET':
        return '<pre>' + maze.map() + '</pre>'
    else:
        moves = request.get_json()
        return maze.map(moves)

@app.route('/maze/start')
def maze_start():
    if not ready:
        return Response(status=503, retry_after=10)
    start = { 'x': maze.start[0], 'y': maze.start[1] }
    return json.dumps(start)

@app.route('/maze/size')
def maze_size():
    if not ready:
        return Response(status=503, retry_after=10)
    size = { 'width': maze.width, 'height': maze.height }
    return json.dumps(size)

@app.route('/maze/pos/<int:y>/<int:x>')
def maze_pos(y, x):
    if not ready:
        return Response(status=503, retry_after=10)
    pos = {
        'here': maze.get_label(x, y),
        'up': maze.get_pos_dir(x, y, Maze.UP),
        'down': maze.get_pos_dir(x, y, Maze.DOWN),
        'left': maze.get_pos_dir(x, y, Maze.LEFT),
        'right': maze.get_pos_dir(x, y, Maze.RIGHT),

    }
    return json.dumps(pos)


WIDTH = 80
HEIGHT = 20
maze = Maze(WIDTH, HEIGHT)
ready = True

迷路モデル (requirements.txt 内) の唯一の要件は、Flask モジュールです。

迷路モデルを実行するコンテナイメージを作成するには、この Dockerfile を使用します。

FROM --platform=linux/amd64 public.ecr.aws/docker/library/python:3.12-alpine

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY . .

CMD [ "python3", "-m" , "flask", "run", "--host=0.0.0.0", "--port=5555"]

エージェントのコードは次のとおりです (agent.py)。まず、エージェントはモデルに迷路のサイズと開始位置をたずねます。その後、独自の戦略を適用して迷路を探索し、解決します。この実装では、エージェントはルートをランダムに選択し、同じパスを複数回たどることを避けようとします。

import random
import requests
from requests.adapters import HTTPAdapter, Retry

HOST = '127.0.0.1'
PORT = 5555

BASE_URL = f"http://{HOST}:{PORT}/maze"

UP, RIGHT, DOWN, LEFT = 0, 1, 2, 3
OPEN, WALL = 0, 1

s = requests.Session()

retries = Retry(total=10,
                backoff_factor=1)

s.mount('http://', HTTPAdapter(max_retries=retries))

r = s.get(f"{BASE_URL}/size")
size = r.json()
print('SIZE', size)

r = s.get(f"{BASE_URL}/start")
start = r.json()
print('START', start)

y = start['y']
x = start['x']

found_key = False
been = set((x, y))
moves = [(x, y)]
moves_stack = [(x, y)]

while True:
    r = s.get(f"{BASE_URL}/pos/{y}/{x}")
    pos = r.json()
    if pos['here'] == 'K' and not found_key:
        print(f"({x}, {y}) key found")
        found_key = True
        been = set((x, y))
        moves_stack = [(x, y)]
    if pos['here'] == 'E' and found_key:
        print(f"({x}, {y}) exit")
        break
    dirs = list(range(4))
    random.shuffle(dirs)
    for d in dirs:
        nx, ny = x, y
        if d == UP and pos['up'] == 0:
            ny -= 1
        if d == RIGHT and pos['right'] == 0:
            nx += 1
        if d == DOWN and pos['down'] == 0:
            ny += 1
        if d == LEFT and pos['left'] == 0:
            nx -= 1 

        if nx < 0 or nx >= size['width'] or ny < 0 or ny >= size['height']:
            continue

        if (nx, ny) in been:
            continue

        x, y = nx, ny
        been.add((x, y))
        moves.append((x, y))
        moves_stack.append((x, y))
        break
    else:
        if len(moves_stack) > 0:
            x, y = moves_stack.pop()
        else:
            print("No moves left")
            break

print(f"Solution length: {len(moves)}")
print(moves)

r = s.post(f'{BASE_URL}/map', json=moves)

print(r.text)

s.close()

エージェントの唯一の依存関係 (requirements.txt 内) は、requests モジュールです。

これは、エージェントのコンテナイメージを作成するために使用する Dockerfile です。

FROM --platform=linux/amd64 public.ecr.aws/docker/library/python:3.12-alpine

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

COPY . .

CMD [ "python3", "agent.py"]

この簡略化されたバージョンのシミュレーションはローカルで簡単に実行できますが、クラウドを利用すると、より大規模に実行し (より大規模で詳細な迷路で実行するなど)、複数のエージェントをテストして使用する最適な戦略を見つけることができます。現実のシナリオでは、エージェントの改善はその後、自動運転車やロボット掃除機などの物理デバイスに実装されます。

マルチコンテナジョブを使用したシミュレーションの実行
AWS Batch を利用してジョブを実行するには、次の 3 つのリソースを設定する必要があります:

  • ジョブを実行するコンピューティング環境
  • ジョブを送信するジョブキュー
  • 使用するコンテナイメージなど、ジョブの実行方法を記述するジョブ定義

AWS Batch コンソールで、ナビゲーションペインから [コンピューティング環境] を選択し、次に [作成] を選択します。現在、Fargate、Amazon EC2、または Amazon EKS を利用する選択肢があります。Fargate を利用すると、ジョブ定義で指定したリソース要件に厳密に一致させることができます。ただし、シミュレーションでは通常、大量ではあるものの静的な量のリソースにアクセスする必要があり、計算を高速化するために GPU が使用されます。そのため、ここでは [Amazon EC2] を選択します。

コンソールのスクリーンショット。

AWS Batch が EC2 インスタンスをスケールおよび設定できるように、[マネージド] オーケストレーションタイプを選択します。その後、コンピューティング環境の名前を入力し、サービスにリンクされたロール (AWS Batch が以前に作成したもの) と、(EC2 インスタンス上で実行されている) ECS コンテナエージェントが私に代わって AWS API に対して呼び出しを実行するために使用するインスタンスロールを選択します。[次へ] を選択します。

コンソールのスクリーンショット。

[インスタンス構成] 設定で、EC2 インスタンスのサイズとタイプを選択します。例えば、GPU を備えたインスタンスタイプや Graviton プロセッサを使用するインスタンスタイプを選択できます。特別な要件はなく、すべての設定をデフォルト値のままにします。[ネットワーク構成] で、コンソールでは既に [デフォルト VPC][デフォルトセキュリティグループ] が選択されています。最後のステップでは、すべての設定を確認し、コンピューティング環境の作成を完了します。

ここで、ナビゲーションペインから [ジョブキュー] を選択し、次に [作成] を選択します。その後、コンピューティング環境に使用したのと同じオーケストレーションタイプ (Amazon EC2) を選択します。[ジョブキューの設定] で、ジョブキューの名前を入力します。[接続されたコンピューティング環境] ドロップダウンで、作成したばかりのコンピューティング環境を選択し、キューの作成を完了します。

コンソールのスクリーンショット。

ナビゲーションペインから [ジョブ定義] を選択し、次に [作成] を選択します。前と同様に、オーケストレーションタイプとして [Amazon EC2] を選択します。

複数のコンテナを使用するには、[従来の containerProperties 構造を使用] オプションを無効にして、次のステップに進みます。デフォルトでは、アカウントに従来のジョブ定義が既に存在する場合、コンソールは従来の単一コンテナジョブ定義を作成します。私の場合はそのようになります。従来のジョブ定義を持たないアカウントの場合、コンソールではこのオプションが無効になっています。

コンソールのスクリーンショット。

ジョブ定義の名前を入力します。その後、このジョブにどの許可が必要かを考える必要があります。このジョブに使用するコンテナイメージは、Amazon ECR プライベートリポジトリに保存されています。AWS Batch がこれらのイメージをコンピューティング環境にダウンロードできるようにするには、[タスクプロパティ] セクションで、ECR リポジトリに対する読み取り専用アクセスを付与する [実行ロール] を選択します。シミュレーションコードは AWS API を呼び出していないため、[タスクロール] を設定する必要はありません。例えば、コードが結果を Amazon Simple Storage Service (Amazon S3) バケットにアップロードする場合、そのための許可を付与するロールをここで選択できます。

次のステップでは、このジョブで使用される 2 つのコンテナを設定します。1 つ目は maze-model です。名前と画像の場所を入力します。ここでは、vCPU、メモリ、GPU の観点からコンテナのリソース要件を指定できます。これは、ECS タスクのコンテナの設定に似ています。

コンソールのスクリーンショット。

エージェント用の 2 番目のコンテナを追加し、以前と同様に名前、イメージの場所、リソース要件を入力します。エージェントは迷路の開始直後に迷路にアクセスする必要があるため、[依存関係] セクションを使用してコンテナの依存関係を追加します。コンテナ名として maze-model を選択し、条件として START を選択します。この依存関係を追加しない場合、maze-model コンテナが実行されて応答できるようになる前に、agent コンテナが失敗する可能性があります。このジョブ定義では両方のコンテナに必須のフラグが設定されているため、ジョブ全体が失敗して終了します。

コンソールのスクリーンショット。

すべての設定を確認し、ジョブ定義を完了します。これで、仕事を始めることができます。

ナビゲーションペインの [ジョブ] セクションで、新しいジョブを送信します。名前を入力し、ジョブキューと作成したばかりのジョブ定義を選択します。

コンソールのスクリーンショット。

次のステップでは、設定を上書きしてジョブを作成する必要はありません。数分後、ジョブは成功し、2 つのコンテナのログにアクセスできるようになりました。

コンソールのスクリーンショット。

エージェントは迷路を解決したので、ログからすべての詳細を取得できます。エージェントがどのように開始され、キーを取得し、出口を見つけたかを示すジョブの出力を次に示します。

SIZE {'width': 80, 'height': 20}
START {'x': 0, 'y': 18}
(32, 2) key found
(79, 16) exit
Solution length: 437
[(0, 18), (1, 18), (0, 18), ..., (79, 14), (79, 15), (79, 16)]

マップでは、赤いアスタリスク (*) は、開始 (S)、キー (K)、および終了 (E) の場所間でエージェントがたどるパスを示しています。

解決された迷路の ASCII ベースの地図。

サイドカーコンテナによるオブザーバビリティの向上
複数のコンポーネントを使用して複雑なジョブを実行する場合、これらのコンポーネントが何を行っているかをより明確に把握することができます。例えば、エラーやパフォーマンスの問題が発生した場合、この情報は問題が発生した場所とその内容を見つけるのに役立ちます。

アプリケーションをインストルメント化するには、AWS Distro for OpenTelemetry を使用します。

この方法で収集されたテレメトリデータを使用すると、何が起こっているかをより良く理解し、問題を解決するための時間を短縮するのに役立つダッシュボード (例えば、CloudWatch または Amazon Managed Grafana を利用) やアラーム (CloudWatch または Prometheus を利用) を設定できます。より一般的には、サイドカーコンテナは、AWS Batch ジョブからのテレメトリデータをモニタリングおよびオブザーバビリティプラットフォームに統合するのに役立ちます。

知っておくべきこと
AWS Batch によるマルチコンテナジョブのサポートは現在、Batch が提供されているすべての AWS リージョンにおいて、AWS マネジメントコンソールAWS コマンドラインインターフェイス (AWS CLI)AWS SDK でご利用いただけます。詳細については、AWS サービス (リージョン別) をご覧ください。

AWS Batch でマルチコンテナジョブを使用する場合、追加料金はかかりません。実際、AWS Batch の利用には追加料金はかかりません。お支払いいただくのは、EC2 インスタンスや Fargate コンテナなど、アプリケーションを保存および実行するために作成した AWS リソースの料金のみです。コストを最適化するために、コンピューティング環境でリザーブドインスタンスSavings Plan、EC2 スポットインスタンス、Fargate を使用できます。

マルチコンテナジョブを使用すると、ジョブの準備作業が減ることで開発時間が短縮されるほか、複数のチームの作業を 1 つのコンテナにマージするカスタムツールの必要性がなくなります。また、コンポーネントの責任を明確に定義することで DevOps を簡素化し、チームが他の作業に煩わされることなく自分の専門分野の問題を迅速に特定して修正できるようにします。

詳細については、「AWS Batch ユーザーガイド」でマルチコンテナジョブを設定する方法をご覧ください。

Danilo

原文はこちらです。