AWS 기술 블로그

티오더 DevOps팀의 Amazon Bedrock을 활용한 AWS 자원 감시 사례

티오더는 ‘테이블 오더’ 서비스를 시작으로, F&B 시장의 새로운 패러다임을 만들어나가고 있는 기업입니다. 누적 25만 대 이상의 태블릿 설치 대수와 매월 2,500만 명 이상의 사용자를 보유하며 태블릿 메뉴판 시장 점유율 1위의 위치에서 업계를 선도해 나가고 있습니다.

압도적인 시장 점유율을 기반으로 사용자의 데이터를 수집 및 가공하여 F&B 시장에 꼭 필요한 정보를 추출하고, 가공한 데이터를 다시 매장의 사장님께 제공함으로써 서로 상생해나가는 선순환 구조를 확립하였습니다. 또한, 단순한 태블릿 메뉴판의 기능을 넘어서 새로운 고객 경험 제공과 광고 매체로써 무궁무진한 활용 능력을 인정받으며 국내를 넘어 해외에서도 만나볼 수 있는 글로벌 기업으로 폭발적인 성장을 이루어 나가고 있습니다.

솔루션 개요

티오더는 국내 최대 규모의 태블릿 오더 서비스로서, 무엇보다 보안과 서비스 관리에 중점을 두고 ­­있습니다. 회사 규모가 해마다 커지고 서비스가 팀별로 나뉘어 운영됨에 따라 각 팀에서 AWS 서비스에 수많은 리소스가 추가되면서, 보안상의 문제가 있는 활동을 감시하는 리소스 관리가 더욱 중요해졌습니다.

프로덕트 별로 AWS 계정을 나누어 사용하고 있기 때문에, 리소스 관리를 중앙화하고 누가 어디서 어떤 행동을 했는지 쉽고 빠르게 확인하는 시스템을 구축하는 것이 주요 목표였습니다.

아키텍처

모든 AWS 서비스, 사용자 및 역할을 통해 수행되는 활동들은 AWS CloudTrail에 기록되기 때문에, 적재되는 이벤트를 활용하여 리소스 변경 사항을 감시하였습니다. 기존에는 CloudTrail 이벤트를 단순하게 파싱해서 모니터링하였으나, 이것은 비 개발자 또는 관리자가 한눈에 파악하기 어려웠으며, 이벤트 형식이 다양하기 때문에 모든 파싱 로직을 작성하여 예외 처리하는 것은 복잡한 문제였습니다.

이 문제를 유연하게 해결하기 위해 Amazon Bedrock의 Claude 3.5 Sonnet 모델을 사용하였습니다. Amazon Bedrock을 통해 CloudTrail 이벤트 내용을 분석함으로써 색상으로 표현되는 위험도와 함께 Generative AI가 자체적으로 해석해 주는 내용을 활용하여 감시 시스템을 구축하였습니다. 결과적으로 이벤트 해석이 쉬워지면서, 비 개발자 및 개발자 모두 인프라 변경 사항에 대한 모니터링 및 추적이 간편해졌습니다.

여러 계정에서 SNS 구독을 편하게 연결하기 위해 Lambda URL을 사용하였으며, 다른 AWS 계정 및 리전에서 Amazon EventBridge를 여러 개 만들어 전송하여도 해당 Lambda URL을 거쳐 손쉽게 연결하고 데이터를 처리할 수 있습니다. 이는 추후에 계정이 지속적으로 추가되어도 연결에 유리한 구조로 활용됩니다.

정보 수집

  • 이벤트를 추적 및 감시할 계정에 CloudTrail을 활성화하고 EventBridge와 Amazon SNS를 연결합니다. 이때 감시할 계정에서 중앙화 계정의 Lambda 함수 URL를 SNS에서 구독합니다. Amazon SQS에 메시지를 쌓아 5분 간격으로 메시지를 풀링하여 메시지를 처리합니다.
  • 여러 개의 AWS 계정을 추가로 생성하여도 CloudTrail, EventBridge, SNS의 3가지의 서비스만 활성화하여 쉽게 변경 사항을 수집할 수 있습니다.

정보 처리 및 알림

  • 슬랙으로 알림을 보내기 전에, 수집된 SQS 메시지를 Amazon Bedrock Claude 3.5 Sonnet 모델 API 전송하여 구조화된 메시지로 변환합니다.
  • 이후 메시지를 특정 알림 채널에 계정 별로 분류된 이벤트 메시지를 전송합니다.

솔루션 구현 방법

초기 단계부터 최적의 프롬프트를 만들기 위해 노력하였습니다. 초기 요구 사항이었던, 단순하게 요약하는 주문부터 Slack 메시지 포맷에 맞춰 나가는 과정을 거쳐 구현하였습니다.

  1. EventBridge를 사용하면 AWS 서비스에서 생성된 이벤트를 사용하여 AWS 환경을 모니터링 및 감사할 수 있도록 하는 애플리케이션을 구축할 수 있습니다. EventBridge Rules (규칙)에서 감시할 이벤트 패턴을 정의하면, 일치하는 이벤트에 대해 타겟으로 이벤트를 라우팅할 수 있습니다. 여기에서는 SNS를 타겟으로 하는 규칙을 생성합니다. EventBridge 이벤트에서 수신할 이벤트 목록은 다음과 같이 정의합니다. 추가한 이벤트들은 슬랙으로 알림을 받는 데에 사용됩니다. 모든 이벤트에 대해 수신 받는 대신, 필요한 리소스 및 보안 이슈와 관련된 이벤트만 선별하여 적시에 모니터링할 수 있도록 합니다. (예: AWS Identity and Access Management (IAM), Amazon Elastic Compute Cloud (Amazon EC2)) 이렇게 SNS에 게시된 이벤트들은 Lambda 함수를 트리거 하게 됩니다.
{
   "source":[
      "aws.iam",
      "aws.ec2",
      "s3.amazonaws.com",
      "aws.s3"
   ],
   "detail-type":[
      "AWS API Call via CloudTrail"
   ],
   "detail":{
      "eventName":[
         "AttachGroupPolicy",
         "AttachRolePolicy",
         "AttachUserPolicy",
         "ChangePassword",
         "CreateAccessKey",
         "CreateGroup",
         "CreatePolicy",
         "CreateRole",
         "CreateUser",
         "DeleteAccessKey",
         "DeleteGroup",
         "DeleteGroupPolicy",
         "DeletePolicy",
         "DeleteRole",
         "DeleteRolePolicy",
         "DeleteUser",
         "DeleteUserPolicy",
         "DetachGroupPolicy",
         "DetachRolePolicy",
         "DetachUserPolicy",
         "PutGroupPolicy",
         "PutRolePolicy",
         "PutUserPolicy",
         "CreateSecurityGroup",
         "DeleteSecurityGroup",
         "CreateBucket",
         "DeleteBucket",
         "StartInstances",
         "StopInstances",
         "RebootInstances",
         "terminated",
         "RunInstances"
      ]
   }
}
JSON
  1. 프로젝트의 목표인 영향도에 따른 색 표현 및 이벤트 발생 정보 전달을 수행하기 위해, 간단 명료하게 발전된 형태의 프롬프트 구문을 사용하였습니다. 초기에는 이벤트가 발생되면 바로 Lambda 함수로 받아 Slack 메시지를 보냈으나, Terraform 작업 및 대량 작업을 거쳐야 할 때에는 수많은 메시지가 쏟아지는 이슈가 생겨 SQS를 사용하여 여러 건의 메시지를 AWS Account 별로 목록화하여 나타낼 수 있도록 최적화 과정을 거쳤습니다.

첫 번째로 메시지를 수신 받은 Receiver 역할의 Lambda 함수에서는 메시지 검증을 거쳐 SQS로 전송하는 간단한 로직을 수행합니다. 다음은 Lambda 함수에서 사용한 코드입니다.

import json
import re
import os
import logging
import boto3
from botocore.exceptions import ClientError

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

def lambda_handler(event, context):
    LOGGER.info("Event : " + str(event))
    message = verify_message(event)
    send_message_to_sqs(message)

#sqs 전송
def send_message_to_sqs(message):
    try:
        # SQS 클라이언트 생성
        sqs = boto3.client('sqs', region_name='us-east-1')

        # 람다 환경 변수에서 SQS 큐 URL 가져오기
        queue_url = os.environ.get('SQS_QUEUE_URL')

        if not queue_url:
            raise ValueError("SQS_QUEUE_URL 환경 변수가 설정되지 않았습니다.")

        # 메시지 전송
        response = sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(message)
        )

        LOGGER.info(f"메시지가 SQS에 성공적으로 전송되었습니다. MessageId: {response['MessageId']}")
        return response
    except ClientError as e:
        LOGGER.error(f"SQS 메시지 전송 중 오류 발생: {e}")
        return None
    except ValueError as e:
        LOGGER.error(str(e))
        return None

#evenbridge로 날라온 메세지 검증
def verify_message(event):
    try:
        if not event or 'body' not in event:
            LOGGER.error("이벤트에 'body' 필드가 없습니다.")
            return
        
        body = json.loads(event['body'])
        message = body.get('Message')
        
        if not message:
            LOGGER.info("'Message' 필드를 찾을 수 없습니다.")
            return
        if isinstance(message, str):
            message = json.loads(message)
        
        if isinstance(message, dict) and 'source' in message:
            return message
        else:
            LOGGER.info("유효한 'source' 키를 찾을 수 없습니다.")
    except json.JSONDecodeError as e:
        LOGGER.error(f"JSON 디코딩 오류: {e}")
    except Exception as e:
        LOGGER.error(f"예상치 못한 오류 발생: {e}")
        raise
Python
  1. 두 번째 Process의 역할을 수행하는 Lambda 함수에서는, 메시지를 풀링하여 Bedrock에 질의한 뒤 Slack 채널로 응답하는 로직을 수행합니다. 프롬프트 명령은 아래 Lambda에서 확인 가능합니다.

다음 코드에서 사용한 환경 변수는 다음과 같습니다:

  • CHANNEL_ID: 수신 받을 슬랙 채널 ID
  • SLACK_BOT_TOKEN: 슬랙 봇 토큰
  • SQS_URL: SQS 주소
  • USE_BEDROCK: Bedrock 사용 여부
import json
import re
import os
import logging
from slack_sdk import WebClient
import boto3
from botocore.exceptions import ClientError
from string import Template

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

SQS = boto3.client('sqs')
SQS_URL = os.getenv('SQS_URL')
USE_BEDROCK = os.getenv('USE_BEDROCK')

def lambda_handler(event, context):
    process_messages()

def process_messages():
    message_count = get_message_count()
    messages = []
    
    while message_count > 0:
        # 메시지 수가 10 이하일 경우 한 번만 처리
        if message_count <= 10:
            pulled_messages = pull_messages() # 메시지를 배열에 추가
            if pulled_messages: # 반환값이 있을 경우에만 추가
                messages.extend(pulled_messages)
            break
        else:
            pulled_messages = pull_messages() # 메시지를 배열에 추가
            if pulled_messages: # 반환값이 있을 경우에만 추가
                messages.extend(pulled_messages)
            message_count -= 10 # 10개씩 줄여나감

    if messages:
        if USE_BEDROCK == "use":
            process_ai_process(messages)
        else:
            process_just_log(messages)

#큐 메세지 풀링
def pull_messages():
    # 최대 10개씩 메시지 수신 (SQS의 최대 수신 개수 제한)
    response = SQS.receive_message(
        QueueUrl=SQS_URL,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=2
    )
    if 'Messages' not in response:
        LOGGER.warning("No 'Messages' key in the response from SQS.")
        return # 'Messages' 키가 없을 경우 함수 종료
    
    messages = response.get('Messages', [])
    message_count = len(messages)
    LOGGER.info("Messages : " + str(response['Messages']))
    LOGGER.info("message_count : " + str(message_count))

    return messages

#큐의 메세지 카운트를 읽어옴
def get_message_count():
    # 큐 속성 가져오기
    response = SQS.get_queue_attributes(
        QueueUrl=SQS_URL,
        AttributeNames=['ApproximateNumberOfMessages']
    )

    # 메시지 수 출력
    message_count = response['Attributes']['ApproximateNumberOfMessages']
    LOGGER.info("get_message_count : " + str(message_count))
    return int(message_count)

#sqs 메시지 삭제
def delete_messages(messages):
    for message in messages:
        try:
            # 처리 완료 후 메시지 삭제
            SQS.delete_message(
            QueueUrl=SQS_URL,
            ReceiptHandle=message['ReceiptHandle']
        )
        except ClientError as e:
            LOGGER.error(f"ClientError: {e}")
        except Exception as e:
            LOGGER.error(f"Unexpected error while deleting message: {e}")

#베드락 미사용시 로그만 처리
def process_just_log(event):
    try:
        LOGGER.info("body message : " + str(event))
    except KeyError as e:
        LOGGER.error(f"KeyError: {e}")
    except json.JSONDecodeError as e:
        LOGGER.error(f"JSONDecodeError: {e}")
    except Exception as e:
        LOGGER.error(f"Unexpected error: {e}")

#ai로부터 포멧팅된 메세지를 받아 슬랙으로 메세지를 보내기
def process_ai_process(messages):
    try:
        summarized_message = message_sumerized(messages)
        LOGGER.info("summarized_message : " + str(summarized_message))
        #응답받은 slack 메세지 포멧을 배열로 받아 계정 알림 별 전송
        json_array = extract_json_from_string(summarized_message)
        for message in json_array:
            if message:
                #슬랙메세지 전송
                send_message(message)
        #처리완료된 메세지 삭제
        delete_messages(messages)
    except KeyError as e:
        LOGGER.error(f"process_ai_process KeyError: {e}")
    except json.JSONDecodeError as e:
        LOGGER.error(f"process_ai_process JSONDecodeError: {e}")
    except Exception as e:
        LOGGER.error(f"process_ai_process Unexpected error: {e}")

#프롬프트에서 json array만 추출
def extract_json_from_string(input_string):
    # JSON 배열을 찾기 위한 정규 표현식
    json_pattern = r'(\[.*\])' # 대괄호로 시작하고 끝나는 문자열을 찾음

    match = re.search(json_pattern, input_string, re.DOTALL)
    if match:
        json_string = match.group(1) # 첫 번째 그룹을 가져옴
        try:
            return json.loads(json_string) # JSON 파싱
        except json.JSONDecodeError as e:
            LOGGER.error(f"JSONDecodeError: {e} - JSON String: {json_string}")
            return None
    else:
        LOGGER.info("No valid JSON found in the input string.")
        return None

#request event message sumerized to bedrock
def message_sumerized(event_message):
    # Create a Bedrock Runtime client in the AWS Region you want to use.
    client = boto3.client("bedrock-runtime", region_name="us-east-1")

    # Set the model ID, e.g., Titan Text Premier.
    model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"

    #bedrock request message
    user_message = Template(
        """
        I will be sending a Slack explain message to monitor AWS resources.

        1.The response should be in Korean, and the message format should follow the provided example, with the
        necessary content filled in.
        2.If there are multiple messages, they should be displayed in a tree structure so the user can easily see
        everything at a glance.
        3.The message should clearly indicate who performed the action, what event occurred, when it happened,
        and what was affected do not missed event.
        4.If there is insufficient information, do not display any message. If the data is sufficient, list all
        events in an organized format.
        5.Match the information based on the AWS account number to complete the message as needed.
        111111111(A계정), 222222222(B계정), 33333333333(C계정)
        6.The event messages are sourced from SQS, and the content of those messages consists of events
        originating from EventBridge.
        7.Clearly distinguish between actions triggered by Terraform and actions performed by users. Ensure
        sensitive information is not displayed.
        8.If events occur across multiple AWS accounts, make json array and separate by aws account number.
        9.color-coded based on the impact on service, using red, yellow, and green to represent different levels.

        Here is the format example for the response:
        [{"attachments":[{"fallback":"","color":"#FF9900","blocks":[{"type":"section","fields": [{"type":"mrkdwn","text":"Performer\n alb-controller"},{"type":"mrkdwn","text":"Event Time (KST)\n 2024-09-05 14:09:17"},{"type":"mrkdwn","text":"Region\n ap-northeast-2"},{"type":"mrkdwn","text":"Account\n 111111111(A계정)"}]},{"type":"actions","elements":[{"type":"button","text":{"type":"plain_text","text":"View Details:waving_white_flag:"},"style":"primary","url":"https://ap-northeast-2.console.aws.amazon.com/cloudtrail/home?region=ap-northeast-2#/events"}]},{"type":"divider"},{"type":"rich_text","elements":[{"type":"rich_text_section","elements":[{"type":"text","text":"Two security group deletion events have occurred. Please check the affected resources.\n","style":{"bold":true}},{"type":"text","text":"\n"}]},{"type":"rich_text_list","style":"bullet","elements":[{"type":"rich_text_section","elements":[{"type":"text","text":"Security group deletion failed (sg-01231222222)\nDeleteSecurityGroup\n"},{"type":"text","text":"\n"}]},{"type":"rich_text_section","elements":[{"type":"text","text":"Security group deletion failed (sg-033422222222)\nDeleteSecurityGroup\n"},{"type":"text","text":"\n"}]}]}]}]}]}]

        and also here is the sqs messages
        ${event_message}
        """
        )

    request_message = user_message.substitute(event_message=event_message)
    
    try:
        native_request = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4000,
            "temperature": 0,
            "messages": [
                {
                    "role": "user",
                    "content": [{"type": "text", "text": request_message}],
                }
            ],
        }
        
        # Convert the native request to JSON.
        request = json.dumps(native_request)
        response = client.invoke_model(modelId=model_id, body=request)
        model_response = json.loads(response["body"].read())
        response_text = model_response["content"][0]["text"]
        return response_text

    except (ClientError, Exception) as e:
        LOGGER.error(f"ERROR: Can't invoke '{model_id}'. Reason: {e}")
        exit(1)

#슬랙 채널에 메세지 보내기
def send_message(message):
    LOGGER.info(message)
    #채널 아이디
    channel_id = os.getenv('CHANNEL_ID')
    #슬랙봇 토큰
    client = WebClient(os.getenv('SLACK_BOT_TOKEN'))
    if message:
        result = client.chat_postMessage(
            channel=channel_id,
            attachments=message['attachments']
        )
        return result
Python

마무리

시스템 구축 이전까지는 DevOps 팀의 부재와 컨벤션이 존재하지 않아 무작위성의 서비스 구축 및 운영이 이루어졌지만, AWS 인프라 감시 시스템을 구축한 뒤에는 손쉬운 자원 감시가 이루어졌습니다. 또한 자체 네이밍 컨벤션 및 프로세스를 확립하여 멀티 어카운트 환경에서 각 계정의 서비스 감시 추적이 용이하도록 구현하였습니다. 알림을 통해서 개발자 뿐만 아니라 비 개발자들도 메시지 이벤트의 내용을 파악하고 대응할 수 있게 되었습니다.

추후에는 DevOps에서만 사용하는 AI 서비스를 넘어서, 태블릿 오더 주문 서비스의 메뉴 추천 및 광고 서비스에도 AI 서비스를 도입하여 고객 경험 개선을 개선하고 고객중심 경영 서비스를 제공할 계획입니다.

이름

박중균

박중균 개발자는 데브옵스팀 팀원으로서 티오더 R&D그룹 소속되어 티오더 플랫폼의 서비스에 대한 배포 파이프라인 제공과 개발자의 생산성 향상을 위한 노력하고 있습니다.

이름

정태환

정태환 개발자는 데브옵스팀 팀장으로서 티오더 R&D그룹 소속되어 티오더 플랫폼의 안정적인 서비스를 제공하기위해 아키텍팅의 전반을 담당하고 있습니다.

이름

민지수

민지수 솔루션즈 아키텍트는 고객들이 AWS에서 워크로드를 성공적으로 설계하고 운영할 수 있도록 기술 지원을 드리고 있습니다. 그리고 AWS 서비스를 활용해 데이터를 기반으로 가치 있는 인사이트를 도출할 수 있는 분석 솔루션의 설계와 최적화를 전문으로 하고 있습니다.

이준우

이준우

이준우 어카운트 매니저는 스타트업 고객들이 직면하는 문제를 함께 고민하고 가장 효율적으로 해결할 수 있도록, 적합한 AWS 서비스 및 적용을 도와주는 다양한 프로그램 활용을 통해 기술 및 비즈니스 모두 지원드리고 있습니다.