亚马逊AWS官方博客

使用 WebRTC 构建实时 AI 助手

背景

相较于 Alexa,Siri 之类的传统语音助手,新一代的基于大模型的 AI 助手,例如 GPT-4o, moshi 等,在多模态输入、支持语言、响应速度、推理能力等方面都有了很大的提升,也意味着 AI 助手从一问一答的半双工模式来到了实时对话的全双工模式。

这些能力的提升不仅需要支持流式输入输出的多模态能力的模型,在通信框架层面更多的会采用 WebRTC 这类实时通信框架来代替 http,websocket 等方式。相比于 websocket,WebRTC 通过 UDP 协议,动态调整采样率,peer-to-peer 通信等方式降低了端到端延迟,另外 WebRTC 的客户端一般提供了噪声控制,回声消除等针对语音通话的功能。

左:通过 http/websocket 的半双工模式  右:通过 WebRTC 的全双工模式

本文将会介绍使用 WebRTC 构建实时 AI 助手的整体架构方案,并且以 LiveKit WebRTC 为例,介绍如何构建基于 WebRTC 的 AI 实时翻译助手(Agent)。

整体架构

由于 WebRTC 是针对多人实时音视频通话的技术,因此 AI 助手需要作为一个对话的参与方(Participant)加入到通话的房间(Room)之中。

  • Room(房间):Room 是多个参与者进行音视频通信的虚拟空间。它允许多个用户加入同一个会话并进行实时交互。
  • Participant(参与者):Participant 指加入 Room 的单个用户。每个参与者可以发送和接收音视频流。

对于模型服务而言,根据其是否支持流式输入,可以选择使用 http 或者 websocket 协议提供服务。

整体架构

简单的演示程序

由于 LiveKit WebRTC 提供了较好的能力封装,Agent 接入方式也比较简单,下面以 LiveKit WebRTC 以及 LiveKit Agent 为例,搭建一个简单的 AI 实时翻译助手。

1. [WebRTC Server] 在 EC2 上安装部署 LiveKit Server,注意服务器需要公网 IP,如果下述的几个服务也安装在同一台服务器上,注意 python 环境的隔离。

# <https://docs.livekit.io/home/self-hosting/local/>
curl -sSL <https://get.livekit.io> | bash
# Node ip is required as cloud server doesn't know it's own public IP
livekit-server --dev --bind 0.0.0.0 --node-ip YOUR_EC2_PUBLIC_IP
# Security group should open 7880 for http/ws, 7881 for tcp, 50000-60000 for udp 
# and other ports according to <https://docs.livekit.io/home/self-hosting/deployment/> 

2. 使用 ALB 或者 Cloudfront 对外提供 https/wss 服务,对内转到 EC2 7880 端口,可以选择使用自定义域名。

3. [Client] 在本机安装 LiveKit Playground 模拟客户端。也可以安装在 EC2 上,但是需要把 3000 端口 port-forwarding 到本机。

# install the latest node-server <https://nodejs.org/en/download/package-manager>
# installs nvm (Node Version Manager)
curl -o- <https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.0/install.sh> | bash

# download and install Node.js (you may need to restart the terminal)
nvm install 20

# verifies the right Node.js version is in the environment
node -v # should print `v20.17.0`

# verifies the right npm version is in the environment
npm -v # should print `10.8.2`
# install LiveKit Playground as a mock frondend
# <https://docs.livekit.io/agents/playground/>
git clone <https://github.com/livekit/agent-playground.git>
cd agent-playground
cat << EOF > .env.local
LIVEKIT_API_KEY=devkey
LIVEKIT_API_SECRET=secret

# Public configuration
NEXT_PUBLIC_LIVEKIT_URL=wss://YOUR_LIVEKIT_SERVER_URL

NEXT_PUBLIC_APP_CONFIG="
title: 'LiveKit Agent Playground'
description: 'LiveKit Agent Playground allows you to test your LiveKit Agent integration by connecting to your LiveKit Cloud or self-hosted instance.'
github_link: '<https://github.com/livekit/agents-playground>'
video_fit: 'cover' # 'contain' or 'cover'
settings:
  editable: true # Should the user be able to edit settings in-app
  theme_color: 'cyan'
  chat: true  # Enable or disable chat feature
  outputs:
    audio: true # Enable or disable audio output
    video: true # Enable or disable video output
  inputs:
    mic: true    # Enable or disable microphone input
    camera: true # Enable or disable camera input
    sip: true    # Enable or disable SIP input
"
EOF
npm install
npm run dev

4. 打开浏览器,测试 http://localhost:3000 ,安装成功的话可以成功连接到房间。

5. [Agent Server] 在 EC2 上安装并测试样例 LiveKit Agent,安装成功的话可以看到 Agent 成功注册到 LiveKit Server 上,LiveKit Playground 页面上的 Video Track 区域会闪烁随机颜色。

# <https://github.com/livekit/agents>
# A simple example
git clone <https://github.com/livekit/agents>
cd agents/examples/simple-color
# Fix the requirements.txt
# livekit-agents>=0.8.12
# python-dotenv
pip install -r requirements.txt
cat << EOF > .env
LIVEKIT_API_KEY=devkey
LIVEKIT_API_SECRET=secret
LIVEKIT_URL=wss://YOUR_LIVEKIT_SERVER_URL
EOF

python agent.py dev

6. [Model Server] 使用 m4tv2 创建语音到翻译过文本的服务,简单起见我们使用 Agent Server 相同的服务器。这里的 Demo 程序使用了 Gradio,默认服务运行在 http://localhost:7860

git clone <https://github.com/facebookresearch/seamless_communication.git>
cd seamless_communication/
pip install .
cd demo/m4tv2
# add fastapi==0.112.2 to requirements.txt if you encountered compatibility issue
pip install -r requirements.txt
python app.py

7. [Agent] 创建AI翻译Agent

# .env
LIVEKIT_URL=wss://YOUR_LIVEKIT_SERVER_URL
LIVEKIT_API_KEY=devkey
LIVEKIT_API_SECRET=secret
# requirements.txt
livekit-agents>=0.8.12
livekit-plugins-silero
gradio-client
# m4t_translate.py
import asyncio
import logging
import wave

from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
    AutoSubscribe,
    JobContext,
    WorkerOptions,
    cli,
    stt,
    transcription,
    JobProcess,
)
from livekit.plugins import silero
from livekit.agents import stt
from livekit.agents.stt import StreamAdapter
from livekit.agents.utils import AudioBuffer
from gradio_client import Client
from gradio_client import handle_file
import tempfile

load_dotenv()

logger = logging.getLogger("m4t-translate-demo")
logger.setLevel(logging.INFO)

from livekit.agents import Plugin

__version__ = "0.0.1"

class M4tPlugin(Plugin):
    def __init__(self):
        super().__init__(__name__, __version__, __package__, logger)

Plugin.register_plugin(M4tPlugin())

async def _forward_transcription(
    stt_stream: stt.SpeechStream, stt_forwarder: transcription.STTSegmentsForwarder
):
    """Forward the transcription to the client and log the transcript in the console"""
    async for ev in stt_stream:
        stt_forwarder.update(ev)
        if ev.type == stt.SpeechEventType.INTERIM_TRANSCRIPT:
            print(ev.alternatives[0].text, end="")
        elif ev.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
            print("\\n")
            print(" -> ", ev.alternatives[0].text)

class M4tTranslate(stt.STT):
    def __init__(self) -> None:
        self.client =  Client("YOUR_GRADIO_URL")  # connecting to a Hugging Face Space
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=False, interim_results=False
            )
        )
    
    async def recognize(
        self, buffer: AudioBuffer, *, language: str | None = None
    ) -> stt.SpeechEvent:
        
        # io_buffer = io.BytesIO()
        temp = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
        filename = temp.name
        with wave.open(temp, "wb") as wav:
            wav.setnchannels(buffer.num_channels)
            wav.setsampwidth(2)  # 16-bit
            wav.setframerate(buffer.sample_rate)
            wav.writeframes(buffer.data)

        temp.close()
        # data = io_buffer.getvalue()
        text = ""
        try:
            text = self.client.predict(handle_file(filename), "Mandarin Chinese", "English", api_name="/s2tt")
        except Exception as e:
            logger.exception(f"Exception {e} when calling m4t")

        return stt.SpeechEvent(
            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
            alternatives=[
                stt.SpeechData(
                    language=language,
                    text=text,
                )
            ]
        )

def prewarm(proc: JobProcess):
    proc.userdata["vad"] = silero.VAD.load()

async def entrypoint(ctx: JobContext):
    logger.info("starting speech-to-text example")
    stt = M4tTranslate()
    vad=ctx.proc.userdata["vad"]
    stt = StreamAdapter(stt = stt, vad = vad)

    async def transcribe_track(participant: rtc.RemoteParticipant, track: rtc.Track):
        audio_stream = rtc.AudioStream(track)
        stt_forwarder = transcription.STTSegmentsForwarder(
            room=ctx.room, participant=participant, track=track
        )
        stt_stream = stt.stream()
        asyncio.create_task(_forward_transcription(stt_stream, stt_forwarder))

        async for ev in audio_stream:
            stt_stream.push_frame(ev.frame)

    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

    @ctx.room.on("track_subscribed")
    def on_track_subscribed(
        track: rtc.Track,
        publication: rtc.TrackPublication,
        participant: rtc.RemoteParticipant,
    ):
        if track.kind == rtc.TrackKind.KIND_AUDIO:
            asyncio.create_task(transcribe_track(participant, track))

if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))

pip install -r requirements.txt
python m4t_translate.py

8. 测试效果

livekit-agent-demo.mov

结论

本文主要介绍了通过 WebRTC 构建全双工实时交互 AI 助手的技术方案,相较于传统的 http/websocket 方案,该方案能实现打断功能,且具有更高的实时性,在交互体验上更接近真人交流。

本篇作者

施俊

亚马逊云科技解决方案架构师,负责基于 AWS 云计算方案的架构咨询与设计,同时致力于亚马逊云科技在各行业中的应用与推广,目前侧重于移动应用以及物联网领域的研究。

曹阳

亚马逊云科技解决方案架构师,负责基于 AWS 云计算方案的架构咨询与设计,同时致力于亚马逊云科技在各行业中的应用与推广,目前侧重于移动应用以及物联网领域的研究。