AWS 기술 블로그

Amazon Bedrock, AWS Chatbot을 이용한 ChatOps 모니터링 솔루션 구축하기

Background

Troubleshooting
고객들은 시스템을 운영하며 다양한 장애 상황을 마주합니다. 각 장애 상황에는 장애가 발생한 리소스에서 에러 로그가 발생하고, 이러한 에러들 중 긴급하게 처리되어야 하는 에러들은 실시간으로 보고되고 즉시 처리되어야 합니다.
개발자들은 서비스 장애 상황을 해결하기 위해, 리소스에서 발생한 에러 로그를 검색엔진등을 이용해 솔루션을 찾습니다. 또한 많은 회사들은 그들의 서비스를 운영하며 대표적인 장애 상황에 대한 문제 해결 방법을 기술하고 있는 컨플루언스와 같은 내부 위키페이지를 가지고 있습니다. 이를 이용해 장애 발생 시 문제 해결에 도움을 받고 있습니다.

Amazon Bedrock
최근에는 대규모 언어 모델(LLM)의 발전으로 장애 상황을 생성형 AI (Generative AI) 에게 질문하고 답변받으며 문제 해결의 생산성을 높이고 있습니다. 시중에 출시된 다양한 생성형 AI를 이용하면 기존에 직접 검색을 통한 해결 방법 대비 빠르게 문제해결이 가능하며,
특히 지난 10월 23일, Amazon Bedrock이 정식 출시되어, 강력한 생성형 AI의 기능을 API 호출 방식으로 활용할 수 있습니다.

RAG (Retrieval Augmented Generation)
RAG는 LLM에게 미리 질문과 관련된 참고자료를 알려주어, GPT의 가장 큰 단점인 환각을 줄이고 보다 정확하게 대답을 생성할 수 있습니다. 사용자의 질문이 주어지면 문서 저장소에서 가장 관련성이 높은 문서 또는 문단을 검색하고, 해당 정보를 바탕으로 답변을 생성합니다. 이러한 과정을 통해 기존 단독 LLM 보다 더 광범위한 지식을 활용할 수 있게 합니다. RAG에 대한 자세한 설명은 “검색 증강 세대 (RAG)” 문서를 참조하십시오.

What is ChatOps?
ChatOps는 채팅 플랫폼을 통해 IT 운영을 자동화하고 협업하는 접근 방식입니다. 이 접근 방식은 DevOps 원칙과 연관되어 있으며, 팀이 채팅 플랫폼 내에서 직접 명령을 입력하거나 결과를 볼 수 있게 함으로써 운영 생산성을 높일 수 있습니다. 운영중인 서비스에서 에러가 발생했을 때, 에러 상황과 관련된 알림을 채팅 플랫폼으로 자동으로 받아보는 것도 ChatOps의 대표적인 사례 중 하나입니다.

본 문서에서는 서비스 장애 상황 발생시, RAG 아키텍처를 이용해 빠른 대응을 할 수 있는 ChatOps 솔루션에 대해 소개하고자 합니다.
서비스 장애 상황에서 발생하는 에러 로그를 RAG를 통한 내부 위키 페이지 컨텍스트와 함께 Amazon Bedrock에게 질문하고, 장애에 대한 솔루션을 생성합니다. 에러 로그와 생성된 솔루션을 자동으로 Slack 채널이나 Microsoft Teams 메세지를 통해 알림 받아 서비스 운영 효율성을 높일 수 있습니다.


Overview of Solution

ChatOps

전체적인 워크플로우는 아래와 같습니다.

  1. EC2, Lambda, EKS와 같은 서비스가 운영되는 리소스에서 로그를 발생시킵니다.
  2. 발생된 로그가 CloudWatch의 Log Group에 저장됩니다.
  3. Log Group에 저장되는 로그 중에서, “[ERROR]”와 같이 에러 로그의 패턴을 가진 로그가 Subscription filter에 의해 kinesis로 전송됩니다.
  4. Eventbridge pipe에 의해 kinesis로 전송된 에러 로그가 lambda로 전송됩니다.
  5. lambda는 전송된 에러로그와 함께 trigger됩니다. kendra와 bedrock을 활용한 RAG 워크플로우를 거쳐 에러 로그에 대한 솔루션을 생성합니다. 생성된 솔루션을 에러로그와 함께 sns에 publish합니다.
  6. SNS를 구독하고 있는 AWS Chatbot은 사용자의 Slack 채널에 에러로그와 솔루션을 메세지로 보냅니다.

CloudWatch

  • CloudWatch Subscription Filter를 사용하여 CloudWatch Logs에 발생한 로그 이벤트의 실시간 피드에 액세스하고, 다른 시스템으로의 로드를 위해 다른 서비스(예: Amazon Kinesis 스트림, Amazon Kinesis Data Firehose 스트림 또는 AWS Lambda)에 이를 전송할 수 있습니다. 여기서는 Amazon Kinesis Stream으로 전송합니다.
  • 로그 이벤트는 base64로 인코딩되고 gzip 형식으로 압축되어 수신 서비스로 전송됩니다.

Amazon Kinesis

  • 여러개의 CloudWatch Log Group 으로부터 로그 이벤트를 실시간으로 수집합니다.

Amazon EventBridge Pipe

  • 이벤트 소스(Kinesis)로부터 수신한 로그를 필터링하고 전처리하여 타겟(lambda)으로 전송합니다.

Lambda

  • RAG아키텍쳐로 구성된 GenAI(Bedrock)에게 수신된 에러 로그를 질문하여 솔루션을 생성하고, 원본 로그 이벤트와 생성된 솔루션을 SNS토픽에 publish합니다.

Amazon Bedrock

  • Amazon Bedrock은 API를 통해 Amazon 및 주요 AI 스타트업의 파운데이션 모델(FM)을 사용할 수 있게 하는 완전관리형 서비스입니다. 로그 이벤트를 분석하여 솔루션을 제안합니다.

Amazon Kendra

  • Amazon Kendra 의 Web Crawler 2.0을 사용하여 wiki의 데이터를 crawling하고 data source로서 사용합니다.

Amazon SNS

  • SNS Topic에 이벤트 로그가 publish됩니다. SNS Topic은 AWS Chatbot에 의해 구독됩니다.

AWS Chatbot

  • SNS Topic에 메세지가 publish되면 이를 사용자의 Slack이나 Teams에 메세지로 전송합니다.
  • 최근, AWS Chatbot에서 사용자 지정 알림 기능이 정식 출시되었습니다. 이를 이용해 고객은 원하는 정보를 알림에 담아 Slack 채널 및 Microsoft Teams에 전송함으로서 다양하게 활용할 수 있습니다.

Prerequisites

본 블로그의 튜토리얼을 진행하기 전, 아래 항목이 준비되어 있어야 합니다.

  • AWS 계정
  • Slack 계정
  • Slack workspace ID 과 channel ID

* Slack workspace 관리자는 workspace와 channel에서 AWS Chatbot app을 사용할 수 있도록 허용해야 합니다.


Tutorial

[Create Kinesis Data Stream]
Kinesis data stream 을 생성합니다. “Error-log-flow”라는 이름의 Kinesis data stream을 생성합니다.

[Create Subscription filters on CloudWatch log group]
CloudWatch로 이동합니다. 에러 로그를 검출하고자 하는 log group에 Kinesis subscription filter를 생성 합니다. AWS lambda function에서 에러 로그를 생성한다고 가정하고, 해당 lambda의 log group에 Kinesis subscription filter를 생성 합니다.

* 스트림에 데이터를 입력하는 데 필요한 권한을 CloudWatch Logs에 부여하는 IAM 역할을 생성해야 합니다. IAM role 생성에 대한 자세한 내용은 “CloudWatch Logs 구독 필터 사용” 문서를 참조하십시오.

로그에서 Filtering하고자 하는 패턴을 입력하고, filter 이름을 입력 합니다. 패턴이 의도한 대로 동작 하는지 테스트 해볼 수 있습니다. 예시에서는 “ERROR” 또는 “Error” 또는 “error” 라는 문자열이 포함된 로그를 필터링 하는 것을 확인 하였습니다.

패턴 작성 방법에 대한 자세한 내용은 “지표 필터, 구독 필터 및 필터 로그 이벤트에 대한 필터 패턴 구문” 문서를 참조 하십시오. 패턴을 작성 한 후 “Start streaming” 버튼을 클릭 합니다.

[Create SNS Topic and AWS Chatbot]
Standard Type의 SNS Topic을 생성합니다. 이 Topic은 AWS Chatbot에 의해 구독되며, Topic에 게시된 메세지는 AWS Chatbot에 의해 Slack channel에 전송됩니다.

Slack Workspace에 접속 후, 새로운 Private channel을 하나 만듭니다. 그리고 “AWS”라는 이름의 AWS Chatbot를 초대합니다. AWS Chatbot을 사용하기 위해서는 관리자의 승인이 필요할 수 있습니다.

AWS Chatbot에서 Slack chat client를 생성합니다.

Slack Workspace URL을 입력합니다.

Chatbot의 configuration name과, 전 단계에서 생성한 Private channel의 id를 입력합니다.

전 단계에서 생성한 SNS Topic을 등록합니다.

[Model access – Claude v2]
Amazon Bedrock에서 Claude v2를 활성화합니다. (Claude v2는 현재us-east-1us-west-2리전에서 사용할 수 있습니다.)

Access granted가 되면, 이제 api를 통해 Claude를 사용 가능합니다.

[Create Kendra Web crawler 2.0]
Kendra index를 생성하고, index에 “Web Crawler 2.0” Data sources를 추가합니다.

Source URLs 에 위키 페이지 url을 추가하고, 나머지 항목을 작성하여 data sources를 생성을 완료합니다.

[Create Lambda for publish message on SNS Topic]
bedrock을 호출해서 solution을 만들고 sns topic에 에러 로그와 솔루션을 publish하는 람다함수를 생성합니다.

from langchain.retrievers import AmazonKendraRetriever
from langchain.prompts import PromptTemplate
from langchain.chains import ConversationalRetrievalChain
from langchain.llms.bedrock import Bedrock

import json
import boto3
import base64
import gzip
import io
import os

region = os.environ["AWS_REGION"]
kendra_index_id = os.environ["KENDRA_INDEX_ID"]
Topic_Arn = os.environ["TOPIC_ARN"]
AWS_ACCESS_KEY_ID = os.environ["ACCESS_KEY"]
AWS_SECRET_ACCESS_KEY = os.environ["SECRET_KEY"]

session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=region
)

def build_chain():
  BEDROCK_CLIENT = session.client("bedrock-runtime", "us-east-1")
  
  llm = Bedrock(
      region_name = region,
      model_kwargs={"max_tokens_to_sample":300,"temperature":1,"top_k":250,"top_p":0.999,"anthropic_version":"bedrock-2023-05-31"},
      model_id="anthropic.claude-v2",
      client=BEDROCK_CLIENT
  )
      
  retriever = AmazonKendraRetriever(index_id=kendra_index_id,top_k=5,region_name=region)


  prompt_template = """Human: This is a friendly conversation between a human and an AI. 
  The AI is talkative and provides specific details from its context but limits it to 240 tokens.
  If the AI does not know the answer to a question, it truthfully says it 
  does not know.

  Assistant: OK, got it, I'll be a talkative truthful AI assistant.

  Human: Here are a few documents in <documents> tags:
  <documents>
  {context}
  </documents>
  If you can't provide a detailed answer for {question} yourself, Please refer to the document above.
  If you are still unable to provide an answer, then answer 'I don't know'.

  Assistant:
  """
  
  PROMPT = PromptTemplate(
      template=prompt_template, input_variables=["context", "question"]
  )
  qa = ConversationalRetrievalChain.from_llm(
        llm=llm, 
        retriever=retriever, 
        return_source_documents=True, 
        combine_docs_chain_kwargs={"prompt":PROMPT},
        verbose=True)

  return qa

def run_chain(chain, prompt: str, history=[]):
    result = chain({"question": prompt, "chat_history": history})
    return {
        "answer": result['answer'],
        "source_documents": result['source_documents']
    }

def decode_event_data(event):
    encoded_data = event[0]['data']
    decoded_data = base64.b64decode(encoded_data)
    buffer = io.BytesIO(decoded_data)

    with gzip.GzipFile(fileobj=buffer, mode='rb') as f:
        decompressed_data = f.read()

    data_json = json.loads(decompressed_data.decode())
    
    return [log_event['message'] for log_event in data_json['logEvents']]

def publish_messages(messages):
    SNS_CLIENT = session.client('sns')
    
    chain = build_chain()
    for msg in messages:
        result = run_chain(chain, msg)
        sources = []
        if 'source_documents' in result:
            for d in result['source_documents']:
                sources.append(d.metadata['source'])
        message = {
            "version": "1.0",
            "source": "custom",
            "content": {
                  "textType": "client-markdown",
                  "title": ":warning:*"+msg+"*",
                  "description": ":robot_face:*Suggested Solution(by Claude v2)* : "+result['answer'],
                  "nextSteps": sources
            }
        }
        response = SNS_CLIENT.publish(TopicArn=Topic_Arn, Message=json.dumps(message))

def handler(event, context):
    print("Publish Message Function Started!")
    print("boto3 version:", boto3.__version__)
    messages = decode_event_data(event)
    publish_messages(messages)
    print("Message published!")
    return(messages)

*위 코드는 Langchain Library를 사용합니다. Langchain Library를 Lambda function에서 사용하려면, Container image를 통해 lambda를 생성해야 정상적으로 동작합니다. Container Image로 lambda를 생성하는 방법은 “컨테이너 이미지로 Python Lambda 함수 배포” 문서를 참조 하십시오.

아래와 같이, Environment variables를 등록해줍니다. KENDRA index id와 SNS Topic ARN 을 입력합니다. 아래 ACCESS_KEY와 SECRET_KEY는 SNS와 Bedrock에 access 가 가능한 iam User의 Access Key를 입력합니다.

[Create EventBridge Pipe]
Amazon EventBridge의 pipe를 생성합니다. Source에 전단계에서 생성한 Kinesis Stream을 지정하고, Target에 전단계에서 생성한 Lambda Function을 지정합니다.


Demo

에러를 발생시키고, 에러 로그와 솔루션이 Slack 메세지로 수신되는지 확인합니다.

먼저, Kendra data source에 source url로 위키 페이지 url을 등록합니다. 여기선 Lambda와 관련된 AWS 공식 문서를 등록해줬습니다.

에러 로그를 생성합니다. Lambda를 이용해, IAM 과 관련된 에러를 생성해 봅니다.
Question :

[ERROR] User: arn:AWS:iam::123456789012:user/developer is not authorized to perform: lambda:InvokeFunction on resource: my-function

Data Source에 등록한 문서에서 아래와 같이 관련된 솔루션을 찾을 수 있습니다.

Reference Document : https://docs.aws.amazon.com/en_us/lambda/latest/dg/troubleshooting-invocation.html

Answer :
Slack channel의 AWS Chatbot이 에러 로그와 data source의 문서를 기반으로 생성된 솔루션을 slack 메세지로 보내줍니다.


Conclusion

지금까지 Amazon Bedrock과 AWS Chatbot을 이용해서 ChatOps 모니터링 솔루션을 구축해보았습니다. 이 밖에도 Amazon Bedrock을 활용해 개발 및 운영 생산성을 높일 수 있는 방법은 다양하며, 그 가능성은 무궁무진합니다. 이제 Amazon Bedrock을 이용해서 운영 생산성을 높여 보시길 바랍니다!

Seungje Mun

Seungje Mun

AWS Professional Service팀에서 Application Migration & Modernization 과정을 돕고 있습니다. Application Developer로서 다양한 고객과 클라우드로의 마이그레이션 여정에 함께하며, 그 과정에서 새로운 기술 도입에 대한 가능성을 제안합니다. 또한 고객의 생산성을 높일 수 있는 Agile Coach 역할을 함께 하고 있습니다.