AWS 기술 블로그

Amazon Bedrock을 이용하여 Stream 방식의 한국어 Chatbot 구현하기

2023년 9월 Amazon Bedrock이 정식버전을 출시하면서 Amazon TitanAnthropic Claude등의 다양한 LLM (Large Language Model)을 AWS 환경에서 편리하게 사용할 수 있게 되었습니다. 특히 Anthropic의 Claude 모델은 한국어를 비교적 잘 지원하고 있습니다. Chatbot과 원활한 대화를 위해서는 사용자의 질문(Question)에 대한 답변(Answer)을 완전히 얻을 때까지 기다리기 보다는 Stream 형태로 대화하듯이 보여주는 것이 사용성에서 좋습니다. 본 게시글은 Amazon Bedrock을 사용하여 Stream을 지원하는 한국어 Chatbot을 만드는 방법을 설명합니다.

Stream 방식은 하나의 요청에 여러 번의 응답을 얻게 되므로, HTTP 방식보다는 세션을 통해 메시지를 교환하는 WebSocket 방식이 유용합니다. 또한 서버리스(Serverless) 아키텍처를 사용하면 인프라의 유지보수에 대한 부담 없이 인프라를 효율적으로 관리할 수 있습니다. 여기서는 서버리스 서비스인 Amazon API Gateway를 이용해 Client와 서버를 WebSocket로 연결하고 AWS Lambda를 이용하여 세션을 관리합니다. 본 게시글이 사용하는 Client는 Web으로 제공되고, 채팅 이력은 로컬 디바이스가 아니라 서버에 저장되게 됩니다. Amazon DynamoDB는 Json형태로 채팅 이력을 저장하는데 유용합니다. 이와 같이 Client에서는 로그인 시에 DynamoDB에 저장된 채팅 이력을 보여줍니다. 또한 채팅 이력은 LLM에 질문할 때에도 문맥(Context)을 파악하기 위하여 유용하게 사용되므로, 사용자 아이디를 이용하여 DynamoDB에서 채팅 이력을 읽어서 로컬 메모리에 저장하여 활용합니다.

Architecture 개요

전체적인 Architecture는 아래와 같습니다. 사용자가 Web Client를 이용하여 로그인하면, DynamoDB에서 이전 대화 이력을 가져와 채팅 화면에서 확인할 수 있습니다. 이후 질문을 입력하면 WebSocket를 이용하여 API Gateway를 거쳐 Lambda로 질문이 전달됩니다. Lambda는 DynamoDB에서 채팅이력을 확인하여 채팅을 위한 메모리를 할당합니다. 이후 질문과 채팅이력을 포함한 메시지를 Amazon Bedrock에 전달하여 질문에 대한 답변을 요청합니다. 이때 Bedrock의 Anthropic Claude 모델이 답변을 생성하면, Lambda와 API Gateway를 거쳐서 사용자에게 전달되게 됩니다. 전체 인프라는 AWS CDK를 이용해 쉽게 배포되고 관리할 수 있습니다.

상세하게 단계별로 설명하면 아래와 같습니다.

  • 단계1: 브라우저를 이용하여 사용자가 CloudFront 주소로 접속하면, Amazon S3에서 HTML, CSS, JS등의 파일을 전달합니다. 이때 로그인을 수행하고 채팅 화면으로 진입합니다.
  • 단계2: Client는 사용자 아이디를 이용하여 ‘/history’ API로 채팅이력을 요청합니다. 이 요청은 API Gateway를 거쳐서 lambda-history에 전달됩니다. 이후 DynamoDB에서 채팅 이력을 조회한 후에 다시 API Gateway와 lambda-history를 통해 사용자에게 전달합니다.
  • 단계3: Client가 API Gateway로 WebSocket 연결을 시도하면, API Gateway를 거쳐서 lambda-chat-ws로 WebSocket connection event가 전달됩니다. 이후 사용자가 메시지를 입력하면, API Gateway를 거쳐서 lambda-chat-ws로 메시지가 전달됩니다.
  • 단계4: lambda-chat-ws는 사용자 아이디를 이용하여 DynamoDB의 기존 채팅이력을 읽어와서, 채팅 메모리에 저장합니다.
  • 단계5: lambda-chat-ws는 사용자의 질문(Question)과 채팅 이력(Chat history)을 Amazon Bedrock의 Enpoint로 전달합니다.
  • 단계6: Amazon Bedrock의 사용자의 질문과 채팅이력이 전달되면, Anthropic LLM을 이용하여 적절한 답변(Answer)을 사용자에게 전달합니다. 이때, Stream을 사용하여 답변이 완성되기 전에 답변(Answer)를 사용자에게 보여줄 수 있습니다.

이때의 시퀀스 다이어그램(Sequence diagram)은 아래와 같습니다.

주요 시스템 구성

전체 시스템의 상세 구현에 대하여 아래와 같이 설명합니다.

서버리스 기반으로 WebSocket 연결하기

Client는 서버리스 서비스인 API Gateway를 이용하여 WebSocket과 연결합니다. 이 때 Client가 연결하는 Endpoint는 API Gateway 주소입니다. 아래와 같이 WebSocket을 선언한 후에 onmessage로 메시지가 들어오면, Event의 ‘data’에서 메시지를 추출합니다. 세션을 유지하기 위해 일정간격으로 keep alive 동작을 수행합니다.

const ws = new WebSocket(endpoint);

ws.onmessage = function (event) {        
    response = JSON.parse(event.data)

    if(response.request_id) {
        addReceivedMessage(response.request_id, response.msg);
    }
};

ws.onopen = function () {
    isConnected = true;
    if(type == 'initial')
        setInterval(ping, 57000); 
};

ws.onclose = function () {
    isConnected = false;
    ws.close();
};

발신 메시지는 JSON 포맷으로 아래와 같이 userId, 요청시간, 메시지 타입과 메시지를 포함합니다. 발신할 때에 WebSocket의 send()을 이용하여 아래와 같이 발신합니다. 만약 발신시점에 세션이 연결되어 있지 않다면 연결하고 재시도하도록 알림을 표시합니다.

sendMessage({
    "user_id": userId,
    "request_id": requestId,
    "request_time": requestTime,        
    "type": "text",
    "body": message.value
})

WebSocket = connect(endpoint, 'initial');
function sendMessage(message) {
    if(!isConnected) {
        WebSocket = connect(endpoint, 'reconnect');
        
        addNotifyMessage("재연결중입니다. 잠시후 다시시도하세요.");
    }
    else {
        WebSocket.send(JSON.stringify(message));     
    }     
}

Stream 사용하기

lambda-chat-ws는 Bedrock을 사용하기 위하여 아래와 같이 Boto3로 Bedrock client를 정의합니다. 여기서는 Chatbot과 관련된 인프라는 서울 리전을 사용하고, Bedrock은 N.Virginia (us-east-1)을 사용합니다.

import boto3

boto3_bedrock = boto3.client(
    service_name='bedrock-runtime',
    region_name=bedrock_region,
)

아래와 같이 LLM에서 어플리케이션을 편리하게 만드는 프레임워크인 LangChain을 사용하여 Bedrock을 정의합니다. 이때 Stream으로 출력을 보여줄 수 있도록 streaming을 True로 설정합니다. 또한 StreamingStdOutCallbackHandler을 callback으로 등록 합니다.

from langchain.llms.bedrock import Bedrock
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

llm = Bedrock(
    model_id=modelId, 
    client=boto3_bedrock, 
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    model_kwargs=parameters)

채팅 이력은 ConversationBufferMemory을 이용하여 chat_memory로 저장합니다.

from langchain.memory import ConversationBufferMemory
chat_memory = ConversationBufferMemory(human_prefix='Human', ai_prefix='Assistant')

채팅 이력까지 고려한 답변을 구하기 위하여, ConversationChain을 이용합니다. 사용자가 WebSocket를 이용하여 API Gateway로 보낸 메시지가 lambda-chat-ws에 전달되면, 아래와 같이 event에서 connectionId와 routeKey를 추출할 수 있습니다. routeKey가 “default”일 때 사용자에게 보낸 메시지가 들어오는데 여기서 ‘body”를 추출하여, json포맷의 데이터에서 사용자의 입력인 ‘text’를 추출합니다. 이후 아래와 같이 conversation.predict()을 이용하여 LLM에 답변을 요청합니다.

from langchain.chains import ConversationChain
conversation = ConversationChain(llm=llm, verbose=False, memory=chat_memory)

def lambda_handler(event, context):
    if event['requestContext']: 
        connectionId = event['requestContext']['connectionId']
        routeKey = event['requestContext']['routeKey']

        if routeKey == '$connect':
            print('connected!')
        elif routeKey == '$disconnect':
            print('disconnected!')
        else:   # $default
            jsonBody = json.loads(event.get("body", ""))
            text = jsonBody['body']

        stream = conversation.predict(input=text)
        msg = readStreamMsg(connectionId, requestId, msg)

LLM의 답변은 Stream으로 들어오는데, 아래와 같이 “stream”에서 “event”를 추출한 후에 sendMessage() 이용하여 Client로 답변을 전달합니다. 또한, Client에서 답변 메시지를 구분하여 표시하기 위해서, “request_id”를 함께 전달합니다.

def readStreamMsg(connectionId, requestId, stream):
    msg = ""
    if stream:
        for event in stream:
            msg = msg + event

            result = {
                'request_id': requestId,
                'msg': msg
            }
            sendMessage(connectionId, result)
    return msg

아래와 같이 sendMessage()는 Boto3의 post_to_connection을 이용하여 client로 응답을 전송 합니다. 이 때 lambda-chat-ws가 메시지를 전달하는 Endpoint는 WebSocket를 지원하는 API Gateway 주소입니다.

import boto3
client = boto3.client('apigatewaymanagementapi', endpoint_url=connection_url)

def sendMessage(id, body):
    try:
        client.post_to_connection(
            ConnectionId=id, 
            Data=json.dumps(body)
        )
    except: 
        raise Exception ("Not able to send a message")

대화 이력의 관리

Client는 이전 채팅 이력을 가져오기 위하여 ‘/history’ API와 연결된 lambda-gethistory로 아래와 같이 사용자 아이디(userId)와 얻어올 시간(allowTime)을 포함한 요청을 전달합니다. 이에 대한 전달받은 history를 풀어서 채팅창에 표시합니다.

getHistory(userId, allowTime);

function getHistory(userId, allowTime) {
    const uri = "history";
    const xhr = new XMLHttpRequest();

    xhr.open("POST", uri, true);
    xhr.onreadystatechange = () => {
        if (xhr.readyState === 4 && xhr.status === 200) {
            let response = JSON.parse(xhr.responseText);
            let history = JSON.parse(response['msg']);
                        
            for(let i=0; i<history.length; i++) {
                if(history[i].type=='text') {                
                    let requestId = history[i].request_id;
                    let timestr = history[i].request_time;
                    let body = history[i].body;
                    let msg = history[i].msg;

                    addSentMessage(requestId, timestr, body)
                    addReceivedMessage(requestId, msg);                            
                }                 
            }         
        }
    };
    
    var requestObj = {
        "userId": userId,
        "allowTime": allowTime
    }
    var blob = new Blob([JSON.stringify(requestObj)], {type: 'application/json'});

    xhr.send(blob);            
}

lambda-gethistory은 아래와 같이 userId와 allowTime을 이용하여 DynamoDB로부터 채팅 이력을 query하여, 결과를 Client로 전달합니다.

const aws = require('aws-sdk');
var dynamo = new aws.DynamoDB();

let queryParams = {
    TableName: tableName,
    KeyConditionExpression: "user_id = :userId and request_time > :allowTime",
    ExpressionAttributeValues: {
        ":userId": {'S': userId},
        ":allowTime": {'S': allowTime}
    }
};

try {
    let result = await dynamo.query(queryParams).promise();

    let history = [];
    for(let item of result['Items']) {
        console.log('item: ', item);
        let request_time = item['request_time']['S'];
        let request_id = item['request_id']['S'];
        let body = item['body']['S'];
        let msg = item['msg']['S'];
        let type = item['type']['S'];

        history.push({
            'request_time': request_time,
            'request_id': request_id,
            'type': type,
            'body': body,
            'msg': msg,
        });
    }

    const response = {
        statusCode: 200,
        msg: JSON.stringify(history)
    };
    return response;        
}

lambda-chat-ws는 아래와 같이 채팅 이력을 저장하는 map을 관리합니다. 사용자의 요청에서 userId를 추출하여 lambda-chat-ws가 채팅 이력을 가지고 있지 않은 경우에 chat_memory를 생성하여, 기존 이력은 load_chatHistory()로 읽어서 ConversationChain으로 관리합니다.

map = dict() # Conversation

if userId in map:  
    chat_memory = map[userId]
else: 
    chat_memory = ConversationBufferMemory(human_prefix='Human', ai_prefix='Assistant')
    map[userId] = chat_memory

    allowTime = getAllowTime()
    load_chatHistory(userId, allowTime, chat_memory)

    conversation = ConversationChain(llm=llm, verbose=False, memory=chat_memory)

load_chatHistory()은 아래와 같이 userId와 allowTime를 이용하여 DynamoDB에서 채팅 이력을 조회하여 chat_memory에 저장합니다.

def load_chatHistory(userId, allowTime, chat_memory):
    dynamodb_client = boto3.client('dynamodb')

    response = dynamodb_client.query(
        TableName=callLogTableName,
        KeyConditionExpression='user_id = :userId AND request_time > :allowTime',
        ExpressionAttributeValues={
            ':userId': {'S': userId},
            ':allowTime': {'S': allowTime}
        }
    )

    for item in response['Items']:
        text = item['body']['S']
        msg = item['msg']['S']
        type = item['type']['S']

        if type == 'text':
            chat_memory.save_context({"input": text}, {"output": msg})    

WebSocket를 지원하는 API Gateway를 구현하기

cdk-stream-chatbot-stack.ts에서는 아래와 같이 WebSocket를 지원하는 API Gateway를 정의합니다. 여기서 Client가 접속하는 API Gateway의 Endpoint는 wss_url이고, lambda-chat-ws가 접속하는 API Gateway의 endpoint는 connection_url입니다.

const WebSocketapi = new apigatewayv2.CfnApi(this, `ws-api-for-${projectName}`, {
    description: 'API Gateway for chatbot using WebSocket',
    apiKeySelectionExpression: "$request.header.x-api-key",
    name: 'ws-api-for-' + projectName,
    protocolType: "WebSocket", 
    routeSelectionExpression: "$request.body.action",
});
const wss_url = `wss://${WebSocketapi.attrApiId}.execute-api.${region}.amazonaws.com/${stage}`;
const connection_url = `https://${WebSocketapi.attrApiId}.execute-api.${region}.amazonaws.com/${stage}`;

직접 실습 해보기

여기서는 AWS Cloud9에서 AWS CDK를 이용하여 인프라를 설치합니다. 편의상 서울 리전을 통해 실습합니다.

사전 준비 사항

이 솔루션을 사용하기 위해서는 사전에 AWS Account가 준비되어 있어야 합니다.

Bedrock 사용 권한 설정하기

본 실습에서 Bedrock은 US East (N. Virginia)리전을 사용합니다. Model access에 접속해서 [Edit]를 선택하여 모든 모델을 사용할 수 있도록 설정합니다. 특히 Anthropic Claude는 반드시 사용이 가능해야 합니다.

CDK를 이용한 인프라 설치

  1. 서울 리전의 Cloud9 Console에 접속하여 [Create environment]-[Name]에서 “chatbot”으로 이름을 입력하고, EC2 instance는 “m5.large”를 선택합니다. 나머지는 기본값을 유지하고, 하단으로 스크롤하여 [Create]를 선택합니다.
  2. Environment에서 “chatbot”을 [Open]한 후에 아래와 같이 터미널을 실행합니다.
  3. EBS 크기 변경
    아래와 같이 스크립트를 다운로드 합니다.
    curl https://raw.githubusercontent.com/aws-samples/generative-ai-demo-using-amazon-sagemaker-jumpstart-kr/main/blogs/stream-chatbot-for-amazon-bedrock/resize.sh -o resize.sh이후 아래 명령어로 용량을 80G로 변경합니다.
    chmod a+rx resize.sh && ./resize.sh 80 
  4. 소스를 다운로드합니다.
    curl https://raw.githubusercontent.com/aws-samples/generative-ai-demo-using-amazon-sagemaker-jumpstart-kr/main/blogs/stream-chatbot-for-amazon-bedrock/stream-chatbot-for-amazon-bedrock.zip -o stream-chatbot-for-amazon-bedrock.zip && unzip stream-chatbot-for-amazon-bedrock.zip
  5. cdk 폴더로 이동하여 필요한 라이브러리를 설치합니다.
    cd stream-chatbot-for-amazon-bedrock/cdk-stream-chatbot/ && npm install
  6. CDK 사용을 위해 Bootstrapping을 수행합니다.
    아래 명령어로 Account ID를 확인합니다.
    aws sts get-caller-identity --query Account --output text

    아래와 같이 bootstrap을 수행합니다. 여기서 “account-id”는 상기 명령어로 확인한 12자리의 Account ID입니다. bootstrap 1회만 수행하면 되므로, 기존에 cdk를 사용하고 있었다면 bootstrap은 건너뛰어도 됩니다.cdk bootstrap aws://account-id/ap-northeast-2
  7. 인프라를 설치합니다.
    cdk deploy --all
  8. 아래와 같이 webSocketUrl을 확인합니다. 여기서는 “wss://etl2hxx4la.execute-api.ap-northeast-1.amazonaws.com/dev” 입니다.
  9. 아래와 같이 “/html/chat.js”파일을 열어서, endpoint를 업데이트합니다.
  10. 아래와 같이 ” UpdateCommendforstreamchatbot”에 있는 명령어를 확인합니다. 여기서는 “aws s3 cp ../html/chat.js s3://cdkstreamchatbotstack-storagestreamchatbote10ee90-sh19etaljvog”입니다.
    터미널에서 아래와 같이 명령어를 입력하여 chat.js 파일을 S3에 복사합니다.
  11. 복사가 성공하면 아래와 같이 WebUrlforstreamchatbot의 URL을 이용하여 브라우저로 접속합니다. 접속시 “User Id”를 입력하고 [Submit] 버튼을 선택합니다.

실행결과

채팅창에서 “서울을 여행하고 싶어. 무엇을 타고 여행하는것이 좋을까?”로 입력하면 서울의 정보를 알려 줍니다. 대용량 언어 모델(LLM)의 특성상 실습의  답변은 블로그의 화면과 조금 다를 수 있습니다. 만약, 첫번째 접속시 “재접속중입니다”로 나오면 웹페이지를 새로고침 합니다.

대명사를 이용해 “그럼 그 도시에서 지하철 요금은 얼마야?”로 문의하면 아래와 같이 서울 지하철 요금을 알려줍니다. 2023년 10월에 지하철 요금이 150원이 올라서 1400원(교통카드 기준)이므로 최신 정보를 반영하지 못하고 있지만, 대화 이력을 통해 서울의 지하철 요금을 확인할 수 있었습니다.

‘서울’과 ‘지하철’이라는 단어를 넣지 않고 “그럼 환승도 가능해?”로 물었을 때에 아래와 같이 이전 대화 이력(Chat history)을 이용하여 서울 지하철의 환승에 대해 설명하여 줍니다.

리소스 정리하기

더이상 인프라를 사용하지 않는 경우에 아래처럼 모든 리소스를 삭제할 수 있습니다.

  1. API Gateway Console로 접속하여 “rest-api-for-stream-chatbot”, “ws-api-for-stream-chatbot”을 삭제합니다.
  2. Cloud9의 터미널로 복귀하여 아래의 명령어로 전체 삭제를 합니다.
    cdk destroy --all

결론

Amazon Bedrock이 정식 버전을 출시하면서, AWS 환경에서 한국어 Chatbot을 쉽게 구현할 수 있게 되었습니다. 본 게시글은 Anthropic의 Claude LLM과 대표적인 LLM 어플리케이션 개발 프레임워크인 LangChain을 이용하여 한국어 Chatbot을 만들었습니다. 이때 WebSocket를 이용하여 Client와 서버를 연결하였고, Stream 형태로 답변을 표시할 수 있었습니다. Amazon Bedrock은 다양한 Foundation Model로 쉽게 생성형 AI 어플리케이션을 개발할 수 있도록 도와줍니다. 또한 한국어 Chatbot을 위한 인프라는 서버리스로 구성해서 인프라 관리에 대한 부담을 줄였고, AWS CDK를 이용하여 인프라의 배포 및 관리를 쉽게 할 수 있도록 하였습니다. 대용량 언어 모델을 이용한 한국어 Chatbot은 기존 Rule 방식과 비교하여, 훨씬 개선된 대화 능력을 보여줍니다. 향후 Amazon Bedrock을 이용하여 다양한 어플리케이션을 쉽고 효과적으로 개발할 수 있을 것으로 기대됩니다.

실습 코드 및 도움이 되는 참조 블로그

아래의 링크에서 실습 소스 파일 및 기계 학습(ML)과 관련된 자료를 확인하실 수 있습니다.

Kyoung-Su Park

Kyoung-Su Park

박경수 솔루션즈 아키텍트는 다양한 워크로드에 대한 개발 경험을 바탕으로 고객이 최적의 솔루션을 선택하여 비즈니스 성과를 달성할 수 있도록 고객과 함께 효율적인 아키텍처를 구성하는 역할을 수행하고 있습니다. 현재 AWS의 Machine Learning, IoT, Analytics TFC에서 활동하고 있습니다.

Juheon Choi

Juheon Choi

최주헌 솔루션즈 아키텍트는 IoT 및 데이터 분석 경험을 바탕으로고객이 클라우드를 통한 비즈니스의 가치 창출을 달성 할 수 있도록고객과 함께 효율적인 아키텍처를 구성하는 역할을 수행하고 있습니다.