Amazon Web Services ブログ

Amazon Chime SDKを使ってMedia Capture Pipelineからビデオを合成する

2021年7月にAWSは、Chime SDKのミーティング中に生成されるビデオ、オーディオ、およびその他のデータをS3バケットにキャプチャする方法として、Chime SDK Media Capture Pipelineを発表しました。続いて新機能が最近リリースされ、キャプチャ方法をカスタマイズできるようになりました。この機能には、参加者の個々のビデオを個別にキャプチャするオプションがあります。キャプチャしたデータに機械学習分析 (AWS Rekognition、Amazon Comprehend) を適用でき、柔軟性が向上する一方、単一のアウトプットファイルを作成するには追加の合成が必要になります。

このブログ記事では、Media Capture Pipelineから生成された個々のビデオを単一のビデオファイルに合成する数多くの方法のうちの1つについてご紹介します。この方法は、1)リモートコーチングや遠隔学習で、ミーティング出席者のビデオが横並びに配置されている場合や、2)スクリーン共有を使うカスタマーサービスで、共有スクリーンがメインビデオとなり、個人が画面脇に小さなタイルとして表示される場合などのユースケースに役立ちます。

前提条件

  • Amazon Chime SDKとAWS Lambdaについての基本的な理解 (簡単なPythonコードを書くことができる)
  • Amazon Chime Media Capture Pipelineについての基本的な理解 (詳細については、このドキュメントを参照してください)
  • Media Capture Pipelineを使用したAmazon Chime SDKアーキテクチャがすでに実行中で、ビデオとオーディオがS3バケットに保存可能であること(方法については、このブログ記事GitHub のサンプルを参照してください)
  • Lambda FFmpegレイヤーの作成に関する基本的な理解 (方法については、このブログ記事を参照してください)
  • FFmpegバージョン4.2以降(この記事で示すサンプルコードではtpadフィルターオプションを使用するため)

ライセンス

このブログ記事内のコードは、MIT-0 ライセンス条件下でライセンスされているものです。

デモ版アプリケーションアーキテクチャ

このデモでは、ミーティングデータ(ビデオおよびオーディオファイル)をS3バケットにキャプチャするMedia Capture Pipelineのアーキテクチャがすでに実行されていることが前提となります。ステップ1~3に示すPythonコードスニペットは、以下のアーキテクチャ図に示す赤いボックスでAWS Lambda関数として実行するためのものです。

ステップ1 — オーディオチャンクファイルを単一のオーディオファイルに連結

まず、すべてのオーディオチャンクファイルを単一のファイルにまとめます。Chime SDKでは、ミーティングに参加している出席者の数に関係なく、すべてのオーディオが単一の、一連のチャンクファイルとして保存されます。以下のコード例では、S3バケット内の「/audio」プレフィックスの付いたオブジェクトのリストを取得し、それらをS3署名付きURLのリストとしてテキストファイルに追加する方法を示します。FFmpegは、ファイルからそのオーディオオブジェクトのリストを取得し、それらを単一のオーディオファイルに連結します。連結が完了するとオーディオファイルをS3にアップロードし、後の合成時の処理に向けて、アップロードされたオーディオファイルのCloudfrontエンドポイントを返します。

def audio_process():
    prefix = SOURCE_PREFIX + '/' + MEETING_ID + '/audio'
    
    client = boto3.client('s3')
    paginator = client.get_paginator('list_objects')
    operation_parameters = {'Bucket': SOURCE_BUCKET, 'Prefix': prefix}
    page_iterator = paginator.paginate(**operation_parameters)
    objects = []
    for page in page_iterator:
        objects.extend(page.get('Contents', []))
    
    #set timestamp of audio stream start to help calculate offset of video later on
    register_offset('audio', objects[0]['Key'])
    
    with open('/tmp/audio_list.txt', 'w') as f:
        for object in objects:
            s3_source_signed_url = client.generate_presigned_url('get_object',
                Params={'Bucket': SOURCE_BUCKET, 'Key': object['Key']},
                ExpiresIn=SIGNED_URL_TIMEOUT)
            f.write(f'file \'{s3_source_signed_url}\'\n')
    
    ffmpeg_cmd = "ffmpeg -f concat -safe 0 -protocol_whitelist file,https,tls,tcp -i /tmp/audio_list.txt -c copy /tmp/ "+AUDIO_FILE+" -y"
    p1 = subprocess.run(shlex.split(ffmpeg_cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    client.upload_file('/tmp/'+AUDIO_FILE , SOURCE_BUCKET,  SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+AUDIO_FILE)
    
    #cleanup /tmp
    if os.path.exists('/tmp/'+AUDIO_FILE):
        os.remove('/tmp/'+AUDIO_FILE)

    #as FFmpeg doesn't take S3 signed url directly, using cloudfront endpoint instead
    processed_audio_url = CLOUDFRONT_ENDPOINT + SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+AUDIO_FILE
    
    return processed_audio_url

各出席者がミーティングに参加し、異なるタイミングでカメラを有効にする場合は、オーディオストリームと対照して各ビデオに遅延補正を適用し、ビデオとオーディオを同期させる必要があります。以下の関数は、ステップ3でオーディオストリームからの遅延秒数を計算するために使用される、各ストリーム開始のタイムスタンプを保存するのに役立ちます。

def register_offset(type, key):
    global AUDIO_TIME, VIDEO1_TIME, VIDEO2_TIME, CONTENT_TIME
    date_format_str = '%Y-%m-%d-%H-%M-%S'
    
    filename = key[key.rfind('/')+1:]
    start_timestamp = datetime.strptime(filename[0:19], date_format_str)
    
    if type == 'audio':
        AUDIO_TIME = start_timestamp
    if type == 'video1':
        VIDEO1_TIME = start_timestamp
    if type == 'video2':
        VIDEO2_TIME = start_timestamp
    if type == 'content':
        CONTENT_TIME = start_timestamp

ステップ2 — 個別の出席者ファイルを単一のビデオファイルに連結

1人の参加者のビデオファイルを連結する場合、手順は以前のオーディオ処理と似ていますが、ステップが1つ追加されます。Media Capture Pipelineでは、出席者のビデオチャンクファイルはすべて、同じS3プレフィックス(「/video」の下)に格納されるため、参加者別にファイルを並べ替える必要があります。各チャンクファイルのファイル名には識別子としてattendeeIDが含まれるため、これを使用して2人の出席者を区別し、そのファイルを別々のリストに入れることができます。出席者それぞれのビデオチャンクファイルのリストが完成すれば、チャンクファイルの連結はステップ1と同様です。

Media Capture Pipelineには、ミーティングでコンテンツストリームが有効になっている場合に、それを追加のビデオストリームとしてキャプチャするオプションがあります。コンテンツストリームファイルには、各ファイル名の末尾に「#content.mp4」が付きます。この例では、これを使用してコンテンツストリームファイルを特定し、個別のリストとして追加します。

個々のビデオファイルの命名規則は「YYYY-MM-DD-HH-MM-SS-MS-<attendeeID>.mp4」です。
コンテンツストリームビデオファイルの命名規則は「YYYY-MM-DD-HH-MM-SS-MS-<attendeeID>#content.mp4」です。

def video_process():
    prefix = SOURCE_PREFIX + '/' + MEETING_ID + '/video'
    
    # abbreviated objects retrieval from S3 as same as above audio_process() code
    ....
        
    global USER_A, USER_B, CONTENT_TIME
    userA_list=[]
    userB_list=[]
    content_list=[]
    for object in objects:
        filename = object['Key'][object['Key'].rfind('/')+1:]
        val = -1
        for i in range(0, 7):
            val = filename.find('-', val + 1)
        attendeeId = filename[val+1:]
        
        #set timestamp of each video stream start to calculate offset later on
        if ('#content' in filename) and (CONTENT_TIME == ''):
            register_offset('content', object['Key'])
        elif (USER_A == '') and ('#content' not in filename):
            USER_A = userid
            register_offset('video1', object['Key'])
        elif (USER_B == '') and (userid != USER_A) and ('#content' not in filename):
            USER_B = userid
            register_offset('video2', object['Key'])
        
        s3_source_signed_url = client.generate_presigned_url('get_object',
            Params={'Bucket': SOURCE_BUCKET, 'Key': object['Key']},
            ExpiresIn=SIGNED_URL_TIMEOUT) 
        
        if '#content' in filename:
            content_list.append(s3_source_signed_url)
        elif USER_A == userid:
            userA_list.append(s3_source_signed_url)
        elif USER_B == userid:
            userB_list.append(s3_source_signed_url)
            
    with open('/tmp/userA_list.txt', 'w') as f:
        for k in userA_list:
            f.write(f'file \'{k}\'\n')
    with open('/tmp/userB_list.txt', 'w') as f:
        for k in userB_list:
            f.write(f'file \'{k}\'\n')
    with open('/tmp/content_list.txt', 'w') as f:
        for k in content_list:
            f.write(f'file \'{k}\'\n')
              
    ffmpeg_cmd = "ffmpeg -f concat -safe 0 -protocol_whitelist file,https,tls,tcp -i /tmp/userA_list.txt -c copy -f mp4 -movflags frag_keyframe+empty_moov -"
    command = shlex.split(ffmpeg_cmd)
    p1 = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    client.put_object(Body=p1.stdout, Bucket=SOURCE_BUCKET, Key="captures/" + MEETING_ID + "/processed/" + USER_A)
    
    # abbreviated as same as above process for USER_B
    ....
    
    #as FFmpeg doesn't take S3 signed url directly, using cloudfront endpoint instead
    processed_userA_video_url = CLOUDFRONT_ENDPOINT + SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+USER_A
    processed_userB_video_url = CLOUDFRONT_ENDPOINT + SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+USER_B
    
    if len(content_list) != 0:
        ffmpeg_cmd = "ffmpeg -f concat -safe 0 -protocol_whitelist file,https,tls,tcp -i /tmp/content_list.txt -c copy -f mp4 -movflags frag_keyframe+empty_moov -"
        command1 = shlex.split(ffmpeg_cmd)
        p1 = subprocess.run(command1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        S3CLIENT.put_object(Body=p1.stdout, Bucket=SOURCE_BUCKET, Key="captures/" + MEETING_ID + "/processed/" + 'content.mp4')
        processed_content_video_url = CLOUDFRONT_ENDPOINT + SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+'content.mp4'
        
        return [processed_userA_video_url,processed_userB_video_url,processed_content_video_url]
    else:
        return [processed_userA_video_url,processed_userB_video_url]

注意:連結サンプルをシンプルにするために、このコードでは、出席者が有効にした時点でカメラが常時オンになると仮定しており、メディアキャプチャ中にカメラのオン/オフが複数回繰り返されないものとしています。

ステップ3 — 複数のビデオとオーディオファイルを単一のビデオファイルに合成

ステップ1、ステップ2が完了した時点で、S3バケットには合成用の3つまたは4つの個別の連結ファイル(オーディオ、2つの個別ビデオ、および任意のコンテンツストリーム)が入っているはずです。FFmpegのオーバーレイフィルターを使用すると、任意のレイアウトで複数のビデオを合成できます。次のコードでは、2つのビデオを横並びに配置しています。出力されたビデオファイルがS3にアップロードされたら、完了です!

def composite_process(audio_url,video_url_list):
    global AUDIO_TIME, VIDEO1_TIME, VIDEO2_TIME, CONTENT_TIME
    
    #calculate how long delay of videos (userA,userB, and content share) should start after audio
    video1_delay = str(int((VIDEO1_TIME - AUDIO_TIME).total_seconds()))
    video2_delay = str(int((VIDEO2_TIME - AUDIO_TIME).total_seconds()))
    if CONTENT_TIME != '':
        content_delay = str(int((CONTENT_TIME - AUDIO_TIME).total_seconds()))
    
    # video_url_list have following videos in the order
    # video_url_list[0] = userA video
    # video_url_list[1] = userB video
    # video_url_list[2] = content share video if exists
    if len(video_url_list) > 2:
        ffmpeg_cmd = 'ffmpeg -i '+video_url_list[0] +' -i '+video_url_list[1]+' -i '+audio_url+' -i '+video_url_list[2]+' -filter_complex "[3:v] scale=640:480, tpad=start_duration='+content_delay+':start_mode=add:color=blue[content]; [0:v] scale=120:90, tpad=start_duration='+video1_delay+':start_mode=add:color=blue[userA]; [1:v] scale=120:90, tpad=start_duration='+video2_delay+':start_mode=add:color=blue[userB]; [content][userA] overlay=510:10[content-userA]; [content-userA][userB] overlay=510:110[final]" -map "[final]" -map 2:a -f mp4 -movflags +faststart /tmp/'+FINAL_FILE
    else:
        ffmpeg_cmd = 'ffmpeg -i '+video_url_list[0] +' -i '+video_url_list[1]+' -i '+audio_url+' -filter_complex "[0:v] scale=640:480, pad=640*2:480, tpad=start_duration='+video1_delay+':start_mode=add:color=blue[left]; [1:v] scale=640:480, pad=640*2:480, tpad=start_duration='+video2_delay+':start_mode=add:color=blue[right]; [left][right] overlay=main_w/2:0[final]" -map "[final]" -map 2:a -f mp4 -movflags +faststart /tmp/'+FINAL_FILE
    
    command1 = shlex.split(ffmpeg_cmd)
    p1 = subprocess.run(command1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    S3CLIENT.upload_file('/tmp/'+FINAL_FILE , SOURCE_BUCKET,  SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+FINAL_FILE)

最終ビデオには、2つのビデオフィードおよびミーティングの同期音声が入り、次のように表示されます。メディアキャプチャの開始後に個々のビデオがオンにされた場合、最終ビデオでは、その個々のビデオが使用可能になるまでブルースクリーンが表示されます。

3番目のコンテンツストリームのビデオファイルが生成された場合、サンプルでは、異なるFFmpegコマンドを使用してコンテンツストリームをメインビデオとして指定し、2つの個々のビデオを画面右上にオーバーレイします。

AWS Lambdaでプロセスを実行する際のコツ

このようなプロセスは、多くの場合、サーバーレスインフラストラクチャで実行することで費用対効果高く簡単に実装できます。ただし、AWS Lambdaにはリソースの制約があることも覚えておく必要があります。

現在、AWS Lambdaでは関数を最大15分まで実行することができ、/tmpディレクトリのストレージサイズは最大512 MBです。多数のビデオファイルに対する複数のFFmpegプロセス(長時間のミーティングからのキャプチャ)には数分かかる可能性があるため、AWS Lambda関数に適用するメモリ割り当ての必要サイズを調査しておく必要があります。また、ユースケースによっては、必要に応じてプロセスを複数のLambda関数、ステップ関数、またはオンデマンドインスタンスに分割することも検討した方が良い場合もあります。

結論

このブログ記事では、Chime SDKを使用したシンプルな1対1のビデオミーティング(コンテンツ共有は任意)について、Chime SDK Media Capture Pipelineからキャプチャされたビデオファイルを合成する方法の1つをご紹介しました。Amazon Chime SDKを使用して独自のアプリケーションにオーディオ、ビデオ、および画面共有を追加する方法の詳細については、開発者ガイドをお読みいただくか、GitHubでChime SDK JavaScript SDK for Javascript をご覧ください。


参考リンク

AWS Media Services
AWS Media & Entertainment Blog (日本語)
AWS Media & Entertainment Blog (英語)

AWSのメディアチームの問い合わせ先: awsmedia@amazon.co.jp
※ 毎月のメルマガをはじめました。最新のニュースやイベント情報を発信していきます。購読希望は上記宛先にご連絡ください。

翻訳は BD山口、SA小林が担当しました。原文はこちらをご覧ください。