AWS 기술 블로그

Amazon CloudFront를 활용한 미디어 서비스 모니터링 방안, 2부 : 실시간 로그 기반 모니터링

이전 1부, 표준 로그 기반 분석 게시물에서는 미디어 서비스를 운영하는 과정에서 Amazon CloudFront의 표준 로그를 기반으로 Amazon Athena, Amazon QuickSight를 활용한 모니터링 및 통계 분석 솔루션을 구축하는 방법을 제시합니다. 이를 통해 접속자의 상태, 트래픽 추이, 어플리케이션의 개선 포인트 등 인사이트를 통해 나은 서비스로 기획할 수 있는 방안으로 고려할 수 있습니다. 이 게시물에서는 Amazon CloudFront의 실시간 로그를 Amazon OpenSearch와 연동하여 실시간 대시보드를 구축하고 문제 발생 시 즉각 대응 할 수 있는 방법에 대해서 설명합니다.

솔루션 개요

Amazon Kinesis Data StreamsData Firehose를 활용하여 실시간 스트리밍 데이터를 수집하고 Amazon OpenSearch로 전송하도록 처리합니다. 그리고 Lambda 함수를 활용하여 로그에 대한 데이터 처리 변환 작업을 진행할 수 있습니다. OpenSearch에서는 인입되는 인덱스를 기반으로 실시간 대시보드를 생성하고 알람을 설정하여 실시간 운영 이슈를 확인할 수 있고 장애에 신속하게 대응할 수 있습니다.

단계 요약

·       단계 1 : Amazon Kinesis Data Streams를 활용한 저장소 생성
·       단계 2 : Amazon CloudFront에서 실시간 로그 설정
·       단계 3 : Amazon Kinesis Data Firehose 전처리용 Lambda 함수 생성
·       단계 4 : Amazon OpenSearch 생성
·       단계 5 : Amazon Kinesis Data Firehose를 통해 분석 파이프 라인 설정
·       단계 6 : Amazon OpenSearch에서 Kinesis Data Firehose 서비스 설정
·       단계 7 : Kibana 대시보드 설정
·       단계 8 : Kibana 알람 설정

단계 1 : Amazon Kinesis Data Streams를 활용한 저장소 생성

Amazon Kinesis 콘솔로 이동하여 데이터 스트림을 선택하고, 데이터 스트림을 생성합니다. 용량 모드는 온디맨드로 설정하고, 스트림의 이름을 cloudfront-real-time-log-data-stream으로 지정합니다. 쓰기 용량 : 200MiB/초, 200,000개 레코드/초, 읽기 용량 : 400MiB/초가 할당됩니다. 데이터 스트림 생성 완료 후, 구성 탭에서 데이터 스트림 용량 모드를 편집할 수 있고, 데이터 보존 기간을 필요에 따라 수정할 수 있습니다. 기본값은 24시간이며 365시간 이내 임의의 시간으로 확장 가능합니다.

단계 2 : Amazon CloudFront에서 실시간 로그 설정

Amazon CloudFront 콘솔로 이동하여 원격측정 메뉴에서 로그를 선택하고 실시간 구성 메뉴를 클릭합니다. 구성 생성 버튼을 클릭하고 이름은 CloudFrontRealTimeConfigName으로 설정합니다.
샘플링 속도는 기본 값인 100을 유지합니다. 샘플링 속도를 사용하면 로그 레코드의 백분율을 받을 수 있습니다. 예를 들어 트래픽 볼륨이 높고 모든 로그 데이터를 처리하고 싶지 않을 경우 높은 수준의 추세 분석을 위해 전체 로그의 백분율을 샘플링 하도록 선택할 수 있습니다. 필드는 실시간 로그 구성에 포함할 정보를 선택하시면 됩니다. 다만, 해당 항목과 단계 3에서 적용할 Lambda 함수의 realtimelog_fields_dict 내용과 반드시 일치해야 합니다.
실시간 로그 활성화를 계속하려면 로그를 생성 할 배포 및 캐시 동작을 선택해야 합니다. 엔드포인트는 단계 1에서 생성한 Kinesis Data Streams으로 지정합니다. 배포는 별다른 옵션을 설정하지 않고 구성 생성 버튼을 클릭합니다.

배포하고자 하는 지점을 선택하고 동작 탭에서 실시간 로그 구성을 배포의 캐시 동작에 연결할 수 있습니다.

추가 설정을 클릭하셔서 확장 해보면 실시간 로그 활성화 제어가 가능합니다. Yes를 선택하고 실시간 로그 구성은 단계 2에서 생성한 로그 정보를 선택하시면 됩니다. (예: CloudFrontRealTimeConfigName) 기존 배포에서 캐시 동작 설정을 편집 또는 생성하거나 새 배포를 생성할 때 실시간 로그 구성을 첨부할 수 있습니다.

단계 3 : Amazon Kinesis Data Firehose 전처리용 Lambda 함수 생성하기

AWS Lambda 콘솔로 이동하여 Lambda 함수를 만들 수 있습니다. 새로 함수 만들기를 눌러 기본 정보 섹션에서 cf-real-time-logs-transformer 함수 이름으로 사용하고, 런타임으로 Python 3.8을 선택합니다. 권한 섹션에서 기본 Lambda 권한을 가진 새로운 역할 만들기를 선택한 후 함수를 만듭니다. 기본 설정 내 제한 시간에서 편집 버튼을 클릭하여 기본값인 3 초에서 1 분으로 변경합니다. 람다 함수 작성하는 과정에서 다음 두 가지를 꼭 명심하셔서 작성해주셔야 합니다.

1. 단계 1 에서 설정한 로그 내용과 동일한 항목으로 세팅합니다. 만약 변경이 필요한 경우 꼭 단계 2에서 로그의 필드 구성과 람다 함수의 realtimelog_fields_dict 정보가 일치해야 합니다. 함수의 코드는 아래를 복사해서 넣습니다.
2. Lambda에서 중요한 내용은 Date 항목을 OpenSearch에서 인덱스 처리가 가능할 수 있도록 전처리하는 과정입니다. 코드처럼 해당 timestamp 항목을 전처리하면 kibana에서 시간 별로 인덱스 처리가 가능합니다.
(예 : _date = datetime.datetime.fromtimestamp(float(payload_list[counter].strip())).strftime(‘%Y-%m-%dT%H:%M:%S’))

import json
import base64
import datetime

def lambda_handler(event, context):
    output = []
    realtimelog_fields_dict = {
        'timestamp' : 'float', 
        'c-ip' : 'str', 
        'time-to-first-byte' : 'str', 
        'sc-status' : 'int', 
        'sc-bytes' : 'int', 
        'cs-method' : 'str', 
        'cs-protocol' : 'str',
        'cs-host' : 'str', 
        'cs-uri-stem' : 'str', 
        'cs-bytes' : 'int',
        'x-edge-location' : 'str', 
        'x-edge-request-id' : 'str', 
        'x-host-header' : 'str', 
        'time-taken' : 'float', 
        'cs-protocol-version' : 'str',
        'c-ip-version' : 'str', 
        'cs-user-agent' : 'str', 
        'cs-referer' : 'str', 
        'cs-cookie' : 'str', 
        'cs-uri-query' : 'str', 
        'x-edge-detailed-result-type' : 'str',
        'x-forwarded-for' : 'str', 
        'ssl-protocol' : 'str', 
        'ssl-cipher' : 'str',
        'x-edge-result-type' : 'str', 
        'fle-encrypted-fields': 'str', 
        'fle-status' : 'str',
        'sc-content-type' : 'str', 
        'sc-content-len' : 'int', 
        'sc-range-start' : 'int', 
        'sc-range-end' : 'int', 
        'c-port' : 'int', 
        'x-edge-response-result-type' : 'str', 
        'c-country' : 'str', 
        'cs-accept-encoding' : 'str',
        'cs-accept' : 'str',
        'cache-behavior-path-pattern' : 'str',
        'cs-headers' : 'str', 
        'cs-header-names' : 'str', 
        'cs-headers-count' : 'int',
        'origin-fbl' : 'str',
        'origin-lbl' : 'str'
        }
        
    for record in event["records"]:
        payload_in_bytes = base64.b64decode(record['data'])
            
        # Converting the bytes payload to string
        payload = "".join(map(chr, payload_in_bytes))

        # dictionary where all the field and record value pairing will end up
        payload_dict = {}

        # counter to iterate over the record fields
        counter = 0

        # generate list from the tab-delimited log entry
        payload_list = payload.strip().split('\t')
        payload_list_len = len(payload_list) -1

        # perform the field, value pairing and any necessary type casting.
        for field, field_type in realtimelog_fields_dict.items():
            if(payload_list[counter].strip() == '-'):
                field_type = "str"
            if(field_type == "int"):
                payload_dict[field] = int(payload_list[counter].strip())
            elif(field_type == "float"):
                if(field == "timestamp"):
                    _date = datetime.datetime.fromtimestamp(float(payload_list[counter].strip())).strftime('%Y-%m-%dT%H:%M:%S')
                    payload_dict[field] = _date
                else:
                    payload_dict[field] = float(payload_list[counter].strip())
            else:
                payload_dict[field] = payload_list[counter].strip()
            if(counter < payload_list_len):
                counter = counter + 1
            else:
                break
                    
        # JSON version of the dictionary type
        payload_json = json.dumps(payload_dict)
        payload_json_ascii = payload_json.encode('ascii')
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload_json_ascii).decode("utf-8")
        }
        output.append(output_record)
     
    print('Successfully processed {} records.'.format(len(event['records'])))
     
    return {'records': output}

단계 4 : Amazon OpenSearch 생성

Amazon OpenSearch Service 콘솔로 이동하여 도메인 생성을 클릭하고, 아래 화면처럼 배포 유형은 프로덕션, 버전은 7.10버전, 도메인 이름은 cf-realtime-log-es-domain으로 입력합니다.

인스턴스 유형 및 노드 수에서 데이터 노드 섹션 등은 기본 설정으로 그대로 둡니다.

다른 항목들은 기본값 그대로 두고 네트워크 구성에서 퍼블릭 액세스를 선택합니다. (테스트용이므로 외부 접근을 허락하는 것이고, 필요에 따라 VPC를 선택하시기 바랍니다.) 세분화된 액세스 제어 활성화를 선택하고, 마스터 사용자 생성을 눌러 마스터 사용자 이름, 마스터 암호 및 마스터 암호 확인 필드를 입력합니다. 이는 Kibana 대시 보드에 접속하는 데 필요하므로 해당 정보를 꼭 기록해두셔야 합니다.

도메인 액세스 정책에서 세분화된 액세스 제어만 사용을 선택합니다. 고급 클러스터 설정을 클릭하고 최대 절 수를 100으로 설정합니다.

설정을 다시 확인하고 마무리합니다. OpenSearch 리소스를 만드는데 약 10 분이 걸립니다. 도메인이 Active 상태가 되면 Kinesis Data Firehose 리소스 프로비저닝을 진행합니다. Kinesis Data Firehose가 OpenSearch 클러스터로 전송하지 못할 수 있는 레코드를 백업하기위해 동일한 리전에 S3 버킷을 생성합니다. 버킷 이름은 다음 예시처럼 생성할 수 있습니다. cf-backup-xxx (사용자 임의명)

단계 5 : Amazon Kinesis Data Firehose를 통해 분석 파이프 라인 설정

Amazon Kinesis 콘솔 대시보드에서 전송 스트림 생성을 선택합니다. 소스 선택 섹션에서 Amazon Kinesis Data Streams를 선택하고 대상은 Amazon OpenSearch Service를 선택합니다. 소스 설정에서 Kinesis Data Streams는 단계 1에서 생성한 cloudfront-real-time-log-data-stream을 선택 합니다. 전송 스트림 이름은 cloudfront-real-time-log-data-kinesis-firehose-consumer으로 사용합니다.

레코드 변형 섹션에서 데이터 변환 활성화를 선택하고, 단계 3에서 만든 cf-real-time-logs-transformer 이름의 Lambda 함수 선택합니다. 나머지 값은 기본 설정으로 둡니다. Amazon OpenSearch Service 대상 섹션에서 cf-realtime-log-es-domain을 도메인으로 선택하고, 인덱스에 realtime이라고 입력합니다. S3 백업 섹션에서 실패한 데이터만 또는 모든 데이터중 워크로드에 따라 임의로 한개 선택하고 백업하시고자 하는 S3 버킷을 선택합니다. 모든 설정을 기본값으로 두고 진행합니다. 고급설정의 권한 섹션에서 Firehose가 다른 서비스와 상호 작용하는 동안 담당하는 IAM 역할을 생성한다는 것을 알 수 있습니다.이 역할의 예제 ARN은 다음과 같습니다.
(ex: arn:aws:iam::xxxxx:role/service-role/KinesisFirehoseServiceRole-coupang—ap-northeast-2-xxxxx) 해당 값은 Kibana에서 권한 설정을 위해 꼭 확인이 필요합니다.

단계 6 : Amazon OpenSearch에서 Kinesis Data Firehose 서비스 설정하기

OpenSearch 도메인에 대한 세분화된 접근 제어를 구성하는 동안, 이전에 설정한 마스터 사용자 이름과 암호를 사용하여 Kibana 대시 보드에 접속합니다. Kinesis Data Firehose를 허용하고 OpenSearch 도메인 인덱스를 생성하고 데이터를 쓰기 위한 서비스 역할 등 필요한 구성 변경을 수행합니다. Amazon OpenSearch Service 콘솔로 이동하셔서 단계 4에서 생성한 도메인을 선택하고 일반 정보의 Kibana URL에 접속합니다. 그리고 단계 4에서 저장한 마스터 사용자 이름, 마스터 암호를 입력하여 Kibana에 접속합니다. Kibana에서 왼쪽 Security 탭을 클릭하고 Roles와 Role Mappings을 각각 설정해야 합니다. 보안에서 역할을 선택 후 새 역할을 추가합니다. (예 : firehose-role). 클러스터 권한 탭에서 클러스터 전체 권한으로 cluster_composite_ops 및 cluster_monitor 액션 그룹을 추가합니다.

색인 권한 탭에서 추가를 눌러 Index Patterns를 선택하고 realtime*을 입력합니다. 액션 그룹 권한에는 crud, create_index, manage을 선택합니다.

Save Role Definition 버튼을 클릭하여 역할에 대한 권한 설정을 완료합니다. 이제 Role Mappings 권한 설정을 진행합니다. 단계 5에서 생성한 Kinesis Data Firehose가 사용하는 IAM 역할을 방금 생성한 역할에 Backend roles에서 매핑합니다.

단계 7 : Kibana 대시보드 설정

Kibana Home에서 Use Elasticsearch data 을 선택하여 index pattern을 정의합니다. 이는 단계 5에서 설정한 realtime 인덱스 정보를 입력하고 Next step으로 이동하여 설정을 완료합니다. Configure settings에서 Time Filter field name 항목은 단계 3에서 전처리를 한 timestamp 항목을 선택합니다.

Index 생성을 완료한 이후 Discover 탭으로 이동합니다. Index Pattern을 위에서 생성한 index(예제는 realtime)로 선택하면 다음과 같이 실시간으로 인입이 되는 로그 정보를 모니터링 할 수 있습니다. 또한 Search 영역에서 Kibana Query Language로 원하시는 필드 또는 값을 입력하여 퀴리로 원하는 정보 조회할 수 있습니다.

Amazon OpenSearch를 사용하여 데이터의 추세를 관찰하고 그래프를 대시 보드로 결합 할 수 있습니다. 예를 들어 서로 다른 리전의 실시간 요청 수를 확인할 수 있습니다. 이를 위해 Kibana 대시보드의 Visualizations 섹션으로 이동하여 새 시각화 메뉴 중 Pie Chart를 선택합니다. sc-status 항목에 대해서 filter를 추가하여 Miss 및 Hit 경우를 각자 필터링 하여 비례를 통계 할 수 있는 파이 차트를 구성할 수 있습니다. Buckets에 Add 버튼을 클릭하고 Split slices를 선택합니다. Aggregation 항목은 Filters를 입력하고 Filter1에는 sc-status:200 or sc-status:204, Filter 2에는 sc-status>=400 AND sc-status<50, Filter 3에는 sc-status>=500 AND sc-status<600를 입력합니다. Update 버튼을 클릭하면 다음과 같이 시각화 파이차트를 확인할 수 있습니다.

표준로그와 달리 실시간 로그에 오리진 지연 시간 관련 정보들이 2022년 4분기에 추가되었고 이를 유용하게 사용할 수 있습니다. origin-fbl (CloudFront와 오리진 간의 첫 바이트 지연 시간, 단위 : 초) 및 origin-lbl (CloudFront와 오리진 간의 마지막 바이트 지연 시간, 단위:초) 항목에 대한 분포를 통계 할 수 있는 파이 차트를 구성할 수 있습니다. Buckets에 Add 버튼을 클릭하고 Split slices를 선택합니다. Aggregation 항목은 Terms를 입력하고 Field는 origin-lbl.keyword를 입력합니다. Update 버튼을 클릭하면 다음과 같이 시각화 파이차트를 확인할 수 있습니다. 동일한 방법으로 origin-fbl 분포 파이차트를 생성할 수 있습니다.

단계 8 : Kibana 알람 설정

해당 블로그에서는 슬랙을 통한 실시간 알람을 받을 수 있는 솔루션을 구현합니다. 전체적인 흐름은 다음과 같고 세가지 작업이 필요합니다.
·       첫 번째 : Destinations 설정
·       두 번째 : Monitor 설정 (모니터링 하고자 하는 쿼리 입력, 해당 쿼리 수행 주기 설정)
·       세 번째 : Trigger 설정 (Destinations에 전달할 메시지 정보 설정)

첫 번째 : Destinations 설정 : 우선 Slack 앱에 접속하셔서 알람을 받고자 하는 workspace에서 More > Apps > WebHooks 를 선택합니다.

다음 kibana로 돌아와서 왼쪽에 있는 Alerting 섹션을 클릭하고 새로운 Destinations를 생성합니다. 위 단계에서 복사한 webhookURL 정보를 다음과 같이 입력합니다.

두 번째 : Monitor 설정 : Monitors 탭을 클릭하고 Create monitor 버튼을 클릭하여 새로운 monitor를 생성합니다. 그리고 Configure monitor섹션에서 monitor name은 CF-Demo-Alarm을 입력합니다.
Define monitor 섹션에서는 Method of definition을 Define using extraction query를 선택하고 index는 단계 7에서 설정한 인덱스(예: realtime)를 사용하고 query는 다음과 같은 코드를 복사합니다.
예제 : 해당 쿼리는 5분 간격으로 서버 응답의 HTTP 상태 코드 성공 (2XX) 및 실패 (4XX) 비율을 확인하는 쿼리 입니다.

{
    "size": 0,
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "timestamp": {
                            "from": "{{period_end}}||-5m",
                            "to": "{{period_end}}",
                            "include_lower": true,
                            "include_upper": true,
                            "format": "epoch_millis",
                            "boost": 1
                        }
                    }
                }
            ],
            "adjust_pure_negative": true,
            "boost": 1
        }
    },
    "aggregations": {
        "2": {
            "filters": {
                "filters": {
                    "Success": {
                        "bool": {
                            "filter": [
                                {
                                    "bool": {
                                        "should": [
                                            {
                                                "match": {
                                                    "sc-status": {
                                                        "query": "200",
                                                        "operator": "OR",
                                                        "prefix_length": 0,
                                                        "max_expansions": 50,
                                                        "fuzzy_transpositions": true,
                                                        "lenient": false,
                                                        "zero_terms_query": "NONE",
                                                        "auto_generate_synonyms_phrase_query": true,
                                                        "boost": 1
                                                    }
                                                }
                                            }
                                        ],
                                        "adjust_pure_negative": true,
                                        "minimum_should_match": "1",
                                        "boost": 1
                                    }
                                }
                            ],
                            "adjust_pure_negative": true,
                            "boost": 1
                        }
                    },
                    "Failed": {
                        "bool": {
                            "filter": [
                                {
                                    "bool": {
                                        "should": [
                                            {
                                                "match": {
                                                    "sc-status": {
                                                        "query": "404",
                                                        "operator": "OR",
                                                        "prefix_length": 0,
                                                        "max_expansions": 50,
                                                        "fuzzy_transpositions": true,
                                                        "lenient": false,
                                                        "zero_terms_query": "NONE",
                                                        "auto_generate_synonyms_phrase_query": true,
                                                        "boost": 1
                                                    }
                                                }
                                            }
                                        ],
                                        "adjust_pure_negative": true,
                                        "minimum_should_match": "1",
                                        "boost": 1
                                    }
                                }
                            ],
                            "adjust_pure_negative": true,
                            "boost": 1
                        }
                    }
                },
                "other_bucket": false,
                "other_bucket_key": "_other_"
            }
        }
    }
}

Define extraction query에 입력한 쿼리를 기반으로 오른쪽 Run 버튼을 클릭하면 결과를 바로 확인할 수 있습니다.
다음 Monitor Schedule을 설정합니다. Frequency는 By interval를 선택하고 주기는 1 분으로 설정합니다. 마지막으로 알람을 받을 수 있는 조건을 설정하고 알람을 받고자하는 destinations에 전달할 메시지 template를 설정할 수 있습니다.

세 번째 : Trigger 설정: 알람 전달 조건 및 slack에 전달할 메시지 정보 설정

예제2의 쿼리를 사용했고 Trigger condition은 다음과 같이 설정합니다. Http 상태코드가 404와 같이 실패 코드 비율이 전체 상태코드의 10%를 초과하는 경우 알람을 전송하도록 설정하였습니다. Run 버튼을 클릭하면 결과를 시뮬레이션 할 수 있습니다.
ctx.results[0].hits.total.value * 0.1 < ctx.results[0].aggregations.2.buckets.Failed.doc_count

Configure actions에서는 슬랙에 전달할 Message 템플릿을 만들 수 있습니다. 이는 원하시는 형태로 Message 섹션에서 변경하시면 되고 Message preview에서 손쉽게 확인이 가능합니다.

Monitor {{ctx.monitor.name}} just entered alert status. Please investigate the issue.
- Trigger: {{ctx.trigger.name}}
- Total Count: {{ctx.results.0.hits.total.value}}
- Count of success: {{ctx.results.0.aggregations.2.buckets.Success.doc_count}}
- Count of failed: {{ctx.results.0.aggregations.2.buckets.Failed.doc_count}}
- Period start: {{ctx.periodStart}}
- Period end: {{ctx.periodEnd}}

위의 모든 단계를 완료하게 되면 다음과 같이 슬랙 채널에서 kibana가 전달한 알람을 확인할 수 있습니다. 그리고 메세지로는 설정한 포맷으로 원하는 형태로 받을 수 있습니다.

결론

이 글에서는 미디어 서비스를 제공하는 고객사에서 Amazon CloudFront의 실시간 로그를 기반으로 다양한 AWS 솔루션을 사용하여 사용자가 손쉽게 쿼리를 통한 로그 분석, 실시간 로그 스트리밍 및 대시 보드를 통한 모니터링 체계를 구축하는 방법을 설명하였습니다. 또한 Amazon Opensearch에서 설정한 룰을 통한 알람을 활용하여 미디어 워크로드의 Observablity를 높이는 방법을 설명하였습니다. 이를 통해 고객사에서 미디어 서비스 어플리케이션에 대한 장애 포인트를 빠르게 분석할 수 있습니다.

추가 자료

Amazon CloudFront를 활용한 미디어 서비스 모니터링 방안, 1부: 표준 로그 기반 분석

Yongzhe Ren

Yongzhe Ren

렌(Yongzhe Ren) 솔루션즈 아키텍트는 다양한 분야의 엔지니어 경험을 바탕으로, 고객의 비즈니스 성과를 달성하도록 최적의 아키텍처를 구성하는 역할을 수행하고 있습니다.

Gabin Lee

Gabin Lee

이갑인 솔루션즈 아키텍트는 다양한 AWS 엣지 서비스를 활용하여 확장 가능하고 탄력적이며 안전한 아키텍처를 구축하는 것에 열정적이며, 특히 CloudFront, Global Accelerator, WAF, Shield 등과 같은 AWS 엣지 서비스를 활용하고 있는 한국 고객들을 집중적으로 지원하고 있습니다.