Business Productivity

Compositing videos from media capture pipelines with Amazon Chime SDK

In July 2021 we announced Chime SDK media capture pipelines as way to capture video, audio, and other data generated during a Chime SDK meeting into an S3 bucket. A new feature was recently released that allows the capture to be customized. This feature provides options to capture individual videos of attendees separately so you have the power of flexibility to apply machine learning analysis (AWS Rekognition, Amazon Comprehend) to the captured data but requires some additional compositing to create a single output file.

In this blog post, we will share one of the many ways you can composite individual videos produced from media capture pipelines into single video file. This can help with use-cases like 1) remote coaching or distance learning where video of meeting attendees are aligned next to each other 2) customer service with screen sharing where screen sharing as main video with individuals showing as small tiles at the side.

Prerequisites

  • Basic understanding of Amazon Chime SDK and AWS Lambda (writing simple Python code)
  • Basic understanding of Amazon Chime media capture pipelines (for detail, see this document)
  • Running Amazon Chime SDK architecture with media capture pipelines ready to store videos and audio into your S3 bucket (for how, see this blog post and github sample)
  • Basic understanding of creating Lambda FFmpeg layer (for how to, see this blog post)
  • The sample code shown in this post requires FFmpeg version 4.2 or later to use tpad filter option

License

The code in this blog post is licensed under the terms of the MIT-0 license.

Demo Application Architecture

This demo assumes you already have media capture pipeline architecture running that captures meeting data (video and audio files) into an S3 bucket. The Python code snippets presented in steps 1 – 3 are targeted to run as AWS Lambda functions in the red box represented in the below architecture diagram.

Step 1 – Concatenating audio chunk files into single audio file

First, let’s combine all of the audio chunk files into a single file. With Chime SDK, regardless of how many attendees there are in the meeting, all audio is saved as single series of chunk files. In the code example below, it shows how to retrieve the list of objects in S3 bucket under the ‘/audio’ prefix and add them as a list of S3 signed urls in a text file. FFmpeg will take that list of audio objects from the file and concatenate them into a single audio file. After the concatenating is complete, we will upload the audio file to S3 and return the Cloudfront endpoint of uploaded the audio file for later processing when compositing.

def audio_process():
    prefix = SOURCE_PREFIX + '/' + MEETING_ID + '/audio'
    
    client = boto3.client('s3')
    paginator = client.get_paginator('list_objects')
    operation_parameters = {'Bucket': SOURCE_BUCKET, 'Prefix': prefix}
    page_iterator = paginator.paginate(**operation_parameters)
    objects = []
    for page in page_iterator:
        objects.extend(page.get('Contents', []))
    
    #set timestamp of audio stream start to help calculate offset of video later on
    register_offset('audio', objects[0]['Key'])
    
    with open('/tmp/audio_list.txt', 'w') as f:
        for object in objects:
            s3_source_signed_url = client.generate_presigned_url('get_object',
                Params={'Bucket': SOURCE_BUCKET, 'Key': object['Key']},
                ExpiresIn=SIGNED_URL_TIMEOUT)
            f.write(f'file \'{s3_source_signed_url}\'\n')
    
    ffmpeg_cmd = "ffmpeg -f concat -safe 0 -protocol_whitelist file,https,tls,tcp -i /tmp/audio_list.txt -c copy /tmp/ "+AUDIO_FILE+" -y"
    p1 = subprocess.run(shlex.split(ffmpeg_cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    client.upload_file('/tmp/'+AUDIO_FILE , SOURCE_BUCKET,  SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+AUDIO_FILE)
    
    #cleanup /tmp
    if os.path.exists('/tmp/'+AUDIO_FILE):
        os.remove('/tmp/'+AUDIO_FILE)

    #as FFmpeg doesn't take S3 signed url directly, using cloudfront endpoint instead
    processed_audio_url = CLOUDFRONT_ENDPOINT + SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+AUDIO_FILE
    
    return processed_audio_url

If each attendee joins the meeting and enables their camera at different times, you will need to apply delay of each video against the audio stream to synchronize the video and audio. The function below helps to store the timestamp of each streams start that will be used at step 3 to calculate the number of seconds delay from the audio stream.

def register_offset(type, key):
    global AUDIO_TIME, VIDEO1_TIME, VIDEO2_TIME, CONTENT_TIME
    date_format_str = '%Y-%m-%d-%H-%M-%S'
    
    filename = key[key.rfind('/')+1:]
    start_timestamp = datetime.strptime(filename[0:19], date_format_str)
    
    if type == 'audio':
        AUDIO_TIME = start_timestamp
    if type == 'video1':
        VIDEO1_TIME = start_timestamp
    if type == 'video2':
        VIDEO2_TIME = start_timestamp
    if type == 'content':
        CONTENT_TIME = start_timestamp

Step 2 – Concatenating separate attendee files into single video file

For concatenating a single attendee’s video files, the steps are similar to previous audio processing but adds one extra step. With media capture pipelines, all the video chunk files of attendees are stored into the same S3 prefix (under ‘/video’), so we need to sort the files by attendee. The file name of each chunk files include attendeeId as an identifier so we can use this to distinguish between two attendees and put their files in separate lists. Once each attendees video chunk list is complete, concatenating the chunks is similar to step 1.

When content stream is enabled in the meeting, media capture pipelines has option to capture it as additional video stream. Content stream files contains ‘#content.mp4’ at the end of each file name. The example uses that to identify content stream files and add them as separate list.

naming convention of individual video files is ‘YYYY-MM-DD-HH-MM-SS-MS-<attendeeID>.mp4’
naming convention of content stream video files is ‘YYYY-MM-DD-HH-MM-SS-MS-<attendeeID>#content.mp4’

def video_process():
    prefix = SOURCE_PREFIX + '/' + MEETING_ID + '/video'
    
    # abbreviated objects retrieval from S3 as same as above audio_process() code
    ....
        
    global USER_A, USER_B, CONTENT_TIME
    userA_list=[]
    userB_list=[]
    content_list=[]
    for object in objects:
        filename = object['Key'][object['Key'].rfind('/')+1:]
        val = -1
        for i in range(0, 7):
            val = filename.find('-', val + 1)
        attendeeId = filename[val+1:]
        
        #set timestamp of each video stream start to calculate offset later on
        if ('#content' in filename) and (CONTENT_TIME == ''):
            register_offset('content', object['Key'])
        elif (USER_A == '') and ('#content' not in filename):
            USER_A = userid
            register_offset('video1', object['Key'])
        elif (USER_B == '') and (userid != USER_A) and ('#content' not in filename):
            USER_B = userid
            register_offset('video2', object['Key'])
        
        s3_source_signed_url = client.generate_presigned_url('get_object',
            Params={'Bucket': SOURCE_BUCKET, 'Key': object['Key']},
            ExpiresIn=SIGNED_URL_TIMEOUT) 
        
        if '#content' in filename:
            content_list.append(s3_source_signed_url)
        elif USER_A == userid:
            userA_list.append(s3_source_signed_url)
        elif USER_B == userid:
            userB_list.append(s3_source_signed_url)
            
    with open('/tmp/userA_list.txt', 'w') as f:
        for k in userA_list:
            f.write(f'file \'{k}\'\n')
    with open('/tmp/userB_list.txt', 'w') as f:
        for k in userB_list:
            f.write(f'file \'{k}\'\n')
    with open('/tmp/content_list.txt', 'w') as f:
        for k in content_list:
            f.write(f'file \'{k}\'\n')
              
    ffmpeg_cmd = "ffmpeg -f concat -safe 0 -protocol_whitelist file,https,tls,tcp -i /tmp/userA_list.txt -c copy -f mp4 -movflags frag_keyframe+empty_moov -"
    command = shlex.split(ffmpeg_cmd)
    p1 = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    client.put_object(Body=p1.stdout, Bucket=SOURCE_BUCKET, Key="captures/" + MEETING_ID + "/processed/" + USER_A)
    
    # abbreviated as same as above process for USER_B
    ....
    
    #as FFmpeg doesn't take S3 signed url directly, using cloudfront endpoint instead
    processed_userA_video_url = CLOUDFRONT_ENDPOINT + SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+USER_A
    processed_userB_video_url = CLOUDFRONT_ENDPOINT + SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+USER_B
    
    if len(content_list) != 0:
        ffmpeg_cmd = "ffmpeg -f concat -safe 0 -protocol_whitelist file,https,tls,tcp -i /tmp/content_list.txt -c copy -f mp4 -movflags frag_keyframe+empty_moov -"
        command1 = shlex.split(ffmpeg_cmd)
        p1 = subprocess.run(command1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        S3CLIENT.put_object(Body=p1.stdout, Bucket=SOURCE_BUCKET, Key="captures/" + MEETING_ID + "/processed/" + 'content.mp4')
        processed_content_video_url = CLOUDFRONT_ENDPOINT + SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+'content.mp4'
        
        return [processed_userA_video_url,processed_userB_video_url,processed_content_video_url]
    else:
        return [processed_userA_video_url,processed_userB_video_url]

Notice: to keep this concatenating sample simple, the code assumes camera is turned on consistantly when attendees enable, instead of turning camera on/off multiple times during media capture.

Step 3 – Compositing multiple videos and audio file into single video file

Once steps 1 and 2 are complete, you should have 3 or 4 separate concatenated files (audio, 2 individual videos, and optional content stream) in your S3 bucket for compositing. FFmpeg’s overlay filter allows compositing multiple videos in any layout you want. For the following code, we are placing two videos next to each other in horizontal structure. Once the outputted video file is uploaded to S3, you are done!

def composite_process(audio_url,video_url_list):
    global AUDIO_TIME, VIDEO1_TIME, VIDEO2_TIME, CONTENT_TIME
    
    #calculate how long delay of videos (userA,userB, and content share) should start after audio
    video1_delay = str(int((VIDEO1_TIME - AUDIO_TIME).total_seconds()))
    video2_delay = str(int((VIDEO2_TIME - AUDIO_TIME).total_seconds()))
    if CONTENT_TIME != '':
        content_delay = str(int((CONTENT_TIME - AUDIO_TIME).total_seconds()))
    
    # video_url_list have following videos in the order
    # video_url_list[0] = userA video
    # video_url_list[1] = userB video
    # video_url_list[2] = content share video if exists
    if len(video_url_list) > 2:
        ffmpeg_cmd = 'ffmpeg -i '+video_url_list[0] +' -i '+video_url_list[1]+' -i '+audio_url+' -i '+video_url_list[2]+' -filter_complex "[3:v] scale=640:480, tpad=start_duration='+content_delay+':start_mode=add:color=blue[content]; [0:v] scale=120:90, tpad=start_duration='+video1_delay+':start_mode=add:color=blue[userA]; [1:v] scale=120:90, tpad=start_duration='+video2_delay+':start_mode=add:color=blue[userB]; [content][userA] overlay=510:10[content-userA]; [content-userA][userB] overlay=510:110[final]" -map "[final]" -map 2:a -f mp4 -movflags +faststart /tmp/'+FINAL_FILE
    else:
        ffmpeg_cmd = 'ffmpeg -i '+video_url_list[0] +' -i '+video_url_list[1]+' -i '+audio_url+' -filter_complex "[0:v] scale=640:480, pad=640*2:480, tpad=start_duration='+video1_delay+':start_mode=add:color=blue[left]; [1:v] scale=640:480, pad=640*2:480, tpad=start_duration='+video2_delay+':start_mode=add:color=blue[right]; [left][right] overlay=main_w/2:0[final]" -map "[final]" -map 2:a -f mp4 -movflags +faststart /tmp/'+FINAL_FILE
    
    command1 = shlex.split(ffmpeg_cmd)
    p1 = subprocess.run(command1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    S3CLIENT.upload_file('/tmp/'+FINAL_FILE , SOURCE_BUCKET,  SOURCE_PREFIX+'/'+MEETING_ID+'/processed/'+FINAL_FILE)

The final video should look like the following with two video feeds and synchronized audio from the meeting. When individual video is turned on after media capture is started, the final video shows blue screen until the individual video is available.

When the third video file of content stream is produced, the sample uses different FFmpeg command to specify content stream as the main video and overlay the two individual video on the top right corner.

Tips when running process on AWS Lambda

Running this kind of processing on serverless infrastructure is cost effective in many situations and simple to implement. However, you should also remember there are resource constraints on AWS Lambda.

Currently, AWS Lambda allows function to run up to 15 minutes and /tmp directory storage size is up to 512 MB. Multiple FFmpeg processes on many video files (capture from long duration meeting) could take a few minutes, so you should investigate on how much memory allocation to apply to AWS Lambda functions. Also depending on the use-case, you might want to consider dividing up the process into multiple Lambda functions, step-functions, or an on-demand instance depending on your need.

Conclusion

In this blog post, I shared one of the ways to composite video files captured from Chime SDK media capture pipelines for a simple one-to-one video (with or without content sharing) meeting using Chime SDK. To learn more about adding audio, video, and screen sharing to your own applications with Amazon Chime SDK, read the developer guide or find Chime SDK JavaScript SDK for Javascript on GitHub.