Amazon Web Services 한국 블로그

Amazon Kinesis 비디오 스트림 및 Amazon SageMaker를 사용한 실시간 대규모 영상 분석

오늘은 Amazon Kinesis Video Streams Inference Template(KIT) for Amazon SageMaker의 기능에 대해 소개합니다. 이 기능은 고객이 Kinesis 비디오 스트림을 Amazon SageMaker 엔드포인트에 몇 분 만에 연결할 수 있습니다. 따라서 서비스를 통합하기 위해 다른 라이브러리를 사용하거나 맞춤형 소프트웨어를 작성하지 않고도 실시간 추론이 가능합니다.

KIT는 Docker 컨테이너로 패키징된 Kinesis Video Client Library 소프트웨어와 필요한 모든 AWS 리소스의 배포를 자동화하는 AWS CloudFormation 템플릿으로 구성됩니다. Amazon Kinesis Video Streams는 오디오, 비디오 및 관련 메타데이터를 분석, ML(기계 학습), 재생 및 기타 처리를 목적으로 연결된 디바이스에서 AWS로 쉽고 안전하게 스트리밍해 줍니다. Amazon SageMaker는 개발자와 데이터 과학자가 ML 모델을 쉽고 빠르게 구축하고 학습시키고 배포할 수 있게 해 주는 관리형 플랫폼입니다.

고객은 주택 보안 카메라, 엔터프라이즈 IP 카메라, 교통 카메라, AWS DeepLens, 휴대폰 등의 소스로부터 오디오 및 비디오 피드를 Kinesis Video Streams로 수집합니다. 스마트 홈부터 스마트 시티에 이르기까지, 지능형 제조업부터 소매업에 이르기까지, 다양한 업종에 걸쳐 개발자와 데이터 과학자는 AWS 클라우드에서 이러한 비디오 피드를 분석하기 위해 자신의 기계 학습 알고리즘을 배포하려고 합니다.

이러한 고객들은 Kinesis Video Streams를 최소한의 운영 오버헤드로 확장 가능한 실시간 ML 기반 비디오 분석 파이프라인을 구축할 수 있도록 Amazon SageMaker 엔드포인트에 안정적으로 연결할 수 있는 솔루션을 원합니다.

이 블로그 게시물에서는 이 새로운 기능을 소개하고 Kinesis Video Streams Client Library와 CloudFormation 템플릿의 기능을 설명합니다. 또한 Kinesis Video Streams를 Amazon SageMaker using KIT에 통합하는 과정을 단계별로 보여 주는 예제를 제공합니다.

Kinesis Video Streams와 기계 학습 기반 분석

Amazon Kinesis Video Streams는 re:Invent 2017에서 출시되었습니다. 출시 당시 이미 Amazon Rekognition Video와 통합되어 안면 메타데이터의 프라이빗 데이터베이스를 사용한 실시간 안면 인식을 손쉽게 수행할 수 있는 기능을 지원했습니다. 이 이전 블로그 게시물에서는 안면 인식 기능을 사용하여 Amazon Kinesis Video Streams와 Amazon Rekognition Video를 사용한 하이엔드 소비자 환경을 제공하는 방법을 자세히 설명했습니다.

고객이 Kinesis Video Streams를 사용하여 다양한 비디오 피드를 수집함에 따라 사용 사례, 학습 데이터 세트, 수행하는 추론 유형도 다양해집니다. 일례로 한 유수의 주택 보안 업체는 Kinesis Video Streams를 사용하여 주택 보안 카메라에서 오디오와 비디오를 수집하고자 했습니다. 그런 다음 Amazon SageMaker에서 실행 중인 자체적인 맞춤형 ML 모델을 연결하여 반려동물과 물체를 탐지하여 더 풍부한 사용자 경험을 만들어냅니다. 한 매장 내 물리적 소매에 대한 인텔리전스를 제공하는 업체는 매장 내에 설치된 카메라에서 비디오를 스트리밍하여 Amazon SageMaker를 사용한 맞춤형 사람 수 세기 모델을 학습시키고자 했습니다. 이를 통해 매장의 쇼핑객 수를 추정하는 실시간 추론을 수행하여 매장 운영에 필요한 정보를 얻을 수 있습니다. 

KIT를 사용하여 Amazon SageMaker와 Kinesis 비디오 스트림 통합

이제 Amazon SageMaker의 KIT를 구성하는 두 가지 구성 요소에 대해서 설명하겠습니다.

Kinesis Video Streams Client Library는 분산된 작업자 세트 전반에 걸쳐 미디어를 최소 한 번 이상 처리하는 확장 가능한 기능을 지원하고, Amazon SageMaker 엔드포인트의 안정적인 호출과 후속 처리를 위해 Kinesis 데이터 스트림에 추론 결과를 게시하는 작업을 관리합니다. 특히 이 라이브러리는 처리해야 할 Kinesis 비디오 스트림을 결정하고, 스트림에 연결하고, 주기적으로 스트림을 새로 고쳐 처리할 스트림을 포함 또는 제외합니다. 이 소프트웨어는 특정 시점에 Kinesis 비디오 스트림의 처리를 담당하는 소비자를 실행하는 작업자를 인스턴스화합니다. 이 같은 인스턴스화의 일부로서, 다양한 스트림을 처리하는 기능을 조정하기 위해 하나 이상의 작업자에서 실행되는 모든 소비자에 대한 리스도 유지합니다. 또한 리스 스트림 단위로 체크포인트를 관리하여 미디어 조각이 한 번 이상 안정적으로 처리되도록 보장합니다.

이 소프트웨어는 실시간 Kinesis Video Streams GetMedia API 작업을 사용하여 스트림에서 미디어 조각을 가져오고 미디어 조각을 구문 분석하여 H264 청크를 추출하며 디코딩이 필요한 프레임을 샘플링한 후 Amazon SageMaker 엔드포인트를 호출하기 전에 I-프레임을 디코딩하여 JPEG/PNG와 같은 이미지 형식으로 변환합니다. Amazon SageMaker에서 호스팅되는 모델이 추론을 반환하면 KIT는 그 결과를 캡처하여 Kinesis 데이터 스트림에 게시합니다. 그러면 고객이 그 결과를 AWS Lambda와 같은 유용한 서비스에 활용할 수 있습니다. 마지막으로, 이 라이브러리는 고객이 대시보드를 구축하고 프로덕션 환경에 배포되는 임계값을 모니터링하고 경보를 발생시킬 수 있도록 다양한 지표를 Amazon CloudWatch에 게시합니다.

AWS CloudFormation 템플릿은 고객의 자체 계정에서 모든 관련 AWS 인프라의 구축을 자동화하고 Kinesis Video Streams에서 미디어를 읽으며 ML 기반 분석을 위해 Amazon SageMaker 엔드포인트를 호출합니다. 따라서 통합 기능을 구축하고 운영하며 확장하는 데 소요되는 시간을 절약할 수 있습니다.

CloudFormation 템플릿은 먼저, Docker 컨테이너에서 호스팅되는 라이브러리 소프트웨어를 실행하는 AWS Fargate 컴퓨팅 엔진을 사용하여 Amazon Elastic Container Services(ECS) 클러스터를 생성합니다.

또한 Fargate Tasks 및 Amazon Kinesis Data Streams에서 실행되며 Amazon SageMaker에서 생성된 추론 출력을 캡처하는 여러 작업자에 걸쳐 체크포인트와 관련 상태를 유지 관리하기 위해 Amazon DynamoDB 테이블을 스핀업합니다.  그 뿐만 아니라 전체 인프라를 모니터링하기 위한 필수 AWS Identity and Access Management(IAM) 정책과 Amazon CloudWatch 리소스도 생성합니다. KIT for Amazon SageMaker는 이미지 데이터를 받는 모든 Amazon SageMaker 엔드포인트와 호환됩니다. 고객은 필요에 따라 구체적인 사용 사례에 맞게 템플릿을 수정할 수 있습니다.

KIT 설정 방법

사전 조건

KIT 배포를 위한 단계별 지침

  • CloudFormation을 통해 웹 사이트 구축
  • CloudFormation은 템플릿으로부터 반복해서 인프라 리소스를 구축할 수 있게 해 주는 강력한 Infrastructure-as-Code 도구입니다.
    1. 이 도구가 없는 경우 AWS 계정에 로그인하십시오. 이미 로그인되어 있는 경우 다음 URL을 통해 2단계를 진행하십시오.
      https://xxxxxxxxxxxx.signin.aws.amazon.com/console 앞부분의 x는 12자리 숫자로 된 여러분의 계정ID로 대체합니다.
    2. AWS Services 검색 표시줄에서 CloudFormation을 선택합니다.
    3. 이 위치에서 대상 리전에 대한 [CloudFormation Template]을 선택합니다.
    4. 스택의 이름을 지정하고 파라미터를 채운 후 Next를 선택합니다.
      • AppName – 모든 리소스를 생성하는 데 사용되는 고유한 애플리케이션 이름
      • DockerImageRepository – Kinesis Video Streams 및 SageMaker 드라이버용 Docker 이미지
      • EndPointAcceptContentType – 현재 SageMaker 엔드포인트를 호출하는 데 지원되는 image/jPEG 또는 image/png 이미지 형식
      • LambdaFunctionBucket – 맞춤형 Lambda 함수의 Amazon S3 버킷 위치
      • LambdaFunctionKey – 맞춤형 Lambda 함수 코드 zip 파일의 Amazon S3 객체 키
      • SageMaker Endpoint – 기계 학습 모델을 호스팅하는 Amazon SageMaker 엔드포인트
      • StreamNames – 스트림 이름을 지정하는 문자열의 CSV 목록
      • TagFilters – 태그 필터의 JSON 문자열
    5. Options 페이지의 파라미터를 기본값으로 두고 Next를 선택합니다.
    6. Review의 구성 정보를 검토하고 IAM 역할 생성을 확인하는 Acknowledge 확인란을 선택한 후 Create를 선택합니다.

솔루션 확장

사용 사례에 맞춰 Lambda 함수를 업데이트하고 다른 AWS 서비스와 통합하는 방식으로 이 솔루션을 확장할 수 있습니다.

이 예에서는 Kinesis 비디오 조각을 검색하여 탐지 데이터와 함께 Amazon S3 버킷에 저장합니다.

  1. Amazon S3 버킷을 생성합니다.
  2. 올바른 버킷 이름과 Kinesis 비디오 스트림 ARN으로 바꾸어 다음 추가 권한을 AWS Lambda Execution 역할에 추가합니다. 이 추가 권한은 AWS Lambda가 Kinesis 비디오 스트림에서 조각을 검색하여 S3 버킷에 쓸 수 있도록 합니다.
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject",
        ],
        "Resource": [
            "arn:aws:s3:::<<YOUR BUCKET>>/*",
        ]
    },
    {
        "Effect": "Allow",
        "Action": [
            "kinesisvideo:GetMediaForFragmentList",
            "kinesisvideo:GetDataEndpoint",
        ],
        "Resource": [
            "<< YOUR KINESIS VIDEO STREAM ARNs>>",
        ]
    }
    
  3. 다음 코드에서 <<YOUR BUCKET>>을 Lambda 함수 코드로 바꾸십시오.
    from __future__ import print_function
    import base64
    import json
    import boto3
    import os
    import datetime
    import time
    from botocore.exceptions import ClientError
    
    bucket='<<YOUR BUCKET>>'
    
    #Lambda 함수는 Amazon SageMaker 예제의 출력을 기반으로 작성되었습니다. 
    #https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/object_detection_pascalvoc_coco/object_detection_image_json_format.ipynb
    object_categories = ['person', 'bicycle', 'car',  'motorbike', 'aeroplane', 'bus', 'train', 'truck', 'boat', 
                         'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog',
                         'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag',
                         'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat',
                         'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup',
                         'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot',
                         'hot dog', 'pizza', 'donut', 'cake', 'chair', 'sofa', 'pottedplant', 'bed', 'diningtable',
                         'toilet', 'tvmonitor', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven',
                         'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
                         'toothbrush']
    
    def lambda_handler(event, context):
      for record in event['Records']:
        payload = base64.b64decode(record['kinesis']['data'])
        #Kinesis Data Stream 출력의 Json 형식 가져오기
        result = json.loads(payload)
        #FragmentMetaData 가져오기
        fragment = result['fragmentMetaData']
        
        # Fragment ID 및 Timestamp 추출
        frag_id = fragment[17:-1].split(",")[0].split("=")[1]
        srv_ts = datetime.datetime.fromtimestamp(float(fragment[17:-1].split(",")[1].split("=")[1])/1000)
        srv_ts1 = srv_ts.strftime("%A, %d %B %Y %H:%M:%S")
        
        #FrameMetaData 가져오기
        frame = result['frameMetaData']
        #StreamName 가져오기
        streamName = result['streamName']
       
        #Json 형식의 SageMaker 응답
        sageMakerOutput = json.loads(base64.b64decode(result['sageMakerOutput']))
        #확률이 가장 높은 5개 탐지된 객체 인쇄
        for i in range(5):
          print("detected object: " + object_categories[int(sageMakerOutput['prediction'][i][0])] + ", with probability: " + str(sageMakerOutput['prediction'][i][1]))
        
        detections={}
        detections['StreamName']=streamName
        detections['fragmentMetaData']=fragment
        detections['frameMetaData']=frame
        detections['sageMakerOutput']=sageMakerOutput
    
        #KVS fragment를 가져오고 .webm 파일 및 탐지 세부 정보를 S3에 작성
        s3 = boto3.client('s3')
        kv = boto3.client('kinesisvideo')
        get_ep = kv.get_data_endpoint(StreamName=streamName, APIName='GET_MEDIA_FOR_FRAGMENT_LIST')
        kvam_ep = get_ep['DataEndpoint']
        kvam = boto3.client('kinesis-video-archived-media', endpoint_url=kvam_ep)
        getmedia = kvam.get_media_for_fragment_list(
                                StreamName=streamName,
                                Fragments=[frag_id])
        base_key=streamName+"_"+time.strftime("%Y%m%d-%H%M%S")
        webm_key=base_key+'.webm'
        text_key=base_key+'.txt'
        s3.put_object(Bucket=bucket, Key=webm_key, Body=getmedia['Payload'].read())
        s3.put_object(Bucket=bucket, Key=text_key, Body=json.dumps(detections))
        print("Detection details and fragment stored in the S3 bucket "+bucket+" with object names : "+webm_key+" & "+text_key)
      return 'Successfully processed {} records.'.format(len(event['Records']))
    

비디오 조각과 탐지 세부 정보가 있는 S3 버킷

다음 스크린샷에는 Amazon S3 버킷으로 탐지된 비디오 조각과 해당 추론을 내보내는 KIT for Amazon SageMaker가 나와 있습니다.

처리된 출력을 보여 주는 AWS Lambda 함수 로그

이 솔루션은 다양한 사용 사례에 맞추어 확장할 수 있습니다. 예를 들어 Computer Vision OpenCV 라이브러리와 Amazon SageMaker 예측 세부 정보를 결합하여 경계 상자를 비디오 프레임의 탐지된 객체에 추가하고 실시간 알림 포털에 공급할 수 있습니다.

KIT에서 관리되는 인프라스트럭처 모니터링

이 라이브러리 소프트웨어는 고객이 개별 스트림을 처리하는 작업의 진행률을 모니터링할 수 있는 다양한 CloudWatch 지표를 기본적으로 제공합니다. 여기에는 클러스터에서 작업자의 리소스 소비량, Amazon SageMaker 엔드포인트가 호출되는 속도, Kinesis Data Stream에 추론 결과가 게시되는 방식을 결정하는 지표가 포함됩니다. CloudFormation 템플릿은 고객이 바로 사용할 수 있는 CloudWatch 대시보드를 생성하여 솔루션의 용도를 확장할 수 있도록 합니다. 기본적으로 이 대시보드는 KIT의 기반 서비스에 대한 주요 지표와 지연 시간, 안정성 및 확장성 특성에 대한 맞춤형 지표를 캡처합니다.

CloudWatch 대시보드 – KIT 지표

마치면서

이제 여러분은 KIT for Amazon SageMaker를 사용하여 실시간 ML 기반 미디어 스트림 처리를 높은 안정성과 확장성을 지니면서도 구현은 단순화 할 수 있습니다. 고객은 모든 Kinesis 비디오 스트림을 Amazon SageMaker 엔드포인트에 연결하여 최소한의 운영 오버헤드로 ML 기반 사용 사례를 지원할 수 있습니다. 이 기능에 대한 자세한 내용은 설명서를 참조하십시오. 앞으로 모든 개발자가 사용 사례에 맞추어 더 효과적으로 맞춤화할 수 있도록 고객 피드백에 따라 향상된 기본 Kinesis Video Client Library 소프트웨어를 꾸준히 발표할 예정입니다.

Aditya Krishnan은 Amazon Kinesis Video Streams의 책임자로, 고객, 하드웨어 및 소프트웨어 파트너 및 뛰어난 엔지니어링 팀과 협력하며 인터넷 지원 카메라 디바이스에서 대규모로 매우 손쉽게 비디오를 스트리밍할 수 있도록 한다는 비전을 실현해나가고 있습니다.

 

 

 

Jagadeesh Pusapadi는 AWS의 솔루션스 아키텍트로, 고객과 함께 전략적 이니셔티브를 진행하고 있습니다. 또한 고객이 원하는 비즈니스 성과를 실현할 수 있도록 아키텍처와 관련한 지침을 제공하며 고객이 AWS 클라우드에서 혁신적인 솔루션을 구축하도록 돕습니다.

 

 

 

이 글은 AWS Machine Learning Blog의 Analyze live video at scale in real time using Amazon Kinesis Video Streams and Amazon SageMaker의 한국어 번역으로 정도현 AWS 테크니컬 트레이너가 감수하였습니다.