亚马逊AWS官方博客
使用 WebRTC 构建实时 AI 助手
背景
相较于 Alexa,Siri 之类的传统语音助手,新一代的基于大模型的 AI 助手,例如 GPT-4o, moshi 等,在多模态输入、支持语言、响应速度、推理能力等方面都有了很大的提升,也意味着 AI 助手从一问一答的半双工模式来到了实时对话的全双工模式。
这些能力的提升不仅需要支持流式输入输出的多模态能力的模型,在通信框架层面更多的会采用 WebRTC 这类实时通信框架来代替 http,websocket 等方式。相比于 websocket,WebRTC 通过 UDP 协议,动态调整采样率,peer-to-peer 通信等方式降低了端到端延迟,另外 WebRTC 的客户端一般提供了噪声控制,回声消除等针对语音通话的功能。
左:通过 http/websocket 的半双工模式 右:通过 WebRTC 的全双工模式
本文将会介绍使用 WebRTC 构建实时 AI 助手的整体架构方案,并且以 LiveKit WebRTC 为例,介绍如何构建基于 WebRTC 的 AI 实时翻译助手(Agent)。
整体架构
由于 WebRTC 是针对多人实时音视频通话的技术,因此 AI 助手需要作为一个对话的参与方(Participant)加入到通话的房间(Room)之中。
- Room(房间):Room 是多个参与者进行音视频通信的虚拟空间。它允许多个用户加入同一个会话并进行实时交互。
- Participant(参与者):Participant 指加入 Room 的单个用户。每个参与者可以发送和接收音视频流。
对于模型服务而言,根据其是否支持流式输入,可以选择使用 http 或者 websocket 协议提供服务。
整体架构
简单的演示程序
由于 LiveKit WebRTC 提供了较好的能力封装,Agent 接入方式也比较简单,下面以 LiveKit WebRTC 以及 LiveKit Agent 为例,搭建一个简单的 AI 实时翻译助手。
1. [WebRTC Server] 在 EC2 上安装部署 LiveKit Server,注意服务器需要公网 IP,如果下述的几个服务也安装在同一台服务器上,注意 python 环境的隔离。
# <https://docs.livekit.io/home/self-hosting/local/>
curl -sSL <https://get.livekit.io> | bash
# Node ip is required as cloud server doesn't know it's own public IP
livekit-server --dev --bind 0.0.0.0 --node-ip YOUR_EC2_PUBLIC_IP
# Security group should open 7880 for http/ws, 7881 for tcp, 50000-60000 for udp
# and other ports according to <https://docs.livekit.io/home/self-hosting/deployment/>
2. 使用 ALB 或者 Cloudfront 对外提供 https/wss 服务,对内转到 EC2 7880 端口,可以选择使用自定义域名。
3. [Client] 在本机安装 LiveKit Playground 模拟客户端。也可以安装在 EC2 上,但是需要把 3000 端口 port-forwarding 到本机。
# install the latest node-server <https://nodejs.org/en/download/package-manager>
# installs nvm (Node Version Manager)
curl -o- <https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.0/install.sh> | bash
# download and install Node.js (you may need to restart the terminal)
nvm install 20
# verifies the right Node.js version is in the environment
node -v # should print `v20.17.0`
# verifies the right npm version is in the environment
npm -v # should print `10.8.2`
# install LiveKit Playground as a mock frondend
# <https://docs.livekit.io/agents/playground/>
git clone <https://github.com/livekit/agent-playground.git>
cd agent-playground
cat << EOF > .env.local
LIVEKIT_API_KEY=devkey
LIVEKIT_API_SECRET=secret
# Public configuration
NEXT_PUBLIC_LIVEKIT_URL=wss://YOUR_LIVEKIT_SERVER_URL
NEXT_PUBLIC_APP_CONFIG="
title: 'LiveKit Agent Playground'
description: 'LiveKit Agent Playground allows you to test your LiveKit Agent integration by connecting to your LiveKit Cloud or self-hosted instance.'
github_link: '<https://github.com/livekit/agents-playground>'
video_fit: 'cover' # 'contain' or 'cover'
settings:
editable: true # Should the user be able to edit settings in-app
theme_color: 'cyan'
chat: true # Enable or disable chat feature
outputs:
audio: true # Enable or disable audio output
video: true # Enable or disable video output
inputs:
mic: true # Enable or disable microphone input
camera: true # Enable or disable camera input
sip: true # Enable or disable SIP input
"
EOF
npm install
npm run dev
4. 打开浏览器,测试 http://localhost:3000 ,安装成功的话可以成功连接到房间。
5. [Agent Server] 在 EC2 上安装并测试样例 LiveKit Agent,安装成功的话可以看到 Agent 成功注册到 LiveKit Server 上,LiveKit Playground 页面上的 Video Track 区域会闪烁随机颜色。
# <https://github.com/livekit/agents>
# A simple example
git clone <https://github.com/livekit/agents>
cd agents/examples/simple-color
# Fix the requirements.txt
# livekit-agents>=0.8.12
# python-dotenv
pip install -r requirements.txt
cat << EOF > .env
LIVEKIT_API_KEY=devkey
LIVEKIT_API_SECRET=secret
LIVEKIT_URL=wss://YOUR_LIVEKIT_SERVER_URL
EOF
python agent.py dev
6. [Model Server] 使用 m4tv2 创建语音到翻译过文本的服务,简单起见我们使用 Agent Server 相同的服务器。这里的 Demo 程序使用了 Gradio,默认服务运行在 http://localhost:7860。
git clone <https://github.com/facebookresearch/seamless_communication.git>
cd seamless_communication/
pip install .
cd demo/m4tv2
# add fastapi==0.112.2 to requirements.txt if you encountered compatibility issue
pip install -r requirements.txt
python app.py
7. [Agent] 创建AI翻译Agent
# .env
LIVEKIT_URL=wss://YOUR_LIVEKIT_SERVER_URL
LIVEKIT_API_KEY=devkey
LIVEKIT_API_SECRET=secret
# requirements.txt
livekit-agents>=0.8.12
livekit-plugins-silero
gradio-client
# m4t_translate.py
import asyncio
import logging
import wave
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
AutoSubscribe,
JobContext,
WorkerOptions,
cli,
stt,
transcription,
JobProcess,
)
from livekit.plugins import silero
from livekit.agents import stt
from livekit.agents.stt import StreamAdapter
from livekit.agents.utils import AudioBuffer
from gradio_client import Client
from gradio_client import handle_file
import tempfile
load_dotenv()
logger = logging.getLogger("m4t-translate-demo")
logger.setLevel(logging.INFO)
from livekit.agents import Plugin
__version__ = "0.0.1"
class M4tPlugin(Plugin):
def __init__(self):
super().__init__(__name__, __version__, __package__, logger)
Plugin.register_plugin(M4tPlugin())
async def _forward_transcription(
stt_stream: stt.SpeechStream, stt_forwarder: transcription.STTSegmentsForwarder
):
"""Forward the transcription to the client and log the transcript in the console"""
async for ev in stt_stream:
stt_forwarder.update(ev)
if ev.type == stt.SpeechEventType.INTERIM_TRANSCRIPT:
print(ev.alternatives[0].text, end="")
elif ev.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
print("\\n")
print(" -> ", ev.alternatives[0].text)
class M4tTranslate(stt.STT):
def __init__(self) -> None:
self.client = Client("YOUR_GRADIO_URL") # connecting to a Hugging Face Space
super().__init__(
capabilities=stt.STTCapabilities(
streaming=False, interim_results=False
)
)
async def recognize(
self, buffer: AudioBuffer, *, language: str | None = None
) -> stt.SpeechEvent:
# io_buffer = io.BytesIO()
temp = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
filename = temp.name
with wave.open(temp, "wb") as wav:
wav.setnchannels(buffer.num_channels)
wav.setsampwidth(2) # 16-bit
wav.setframerate(buffer.sample_rate)
wav.writeframes(buffer.data)
temp.close()
# data = io_buffer.getvalue()
text = ""
try:
text = self.client.predict(handle_file(filename), "Mandarin Chinese", "English", api_name="/s2tt")
except Exception as e:
logger.exception(f"Exception {e} when calling m4t")
return stt.SpeechEvent(
type=stt.SpeechEventType.FINAL_TRANSCRIPT,
alternatives=[
stt.SpeechData(
language=language,
text=text,
)
]
)
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
logger.info("starting speech-to-text example")
stt = M4tTranslate()
vad=ctx.proc.userdata["vad"]
stt = StreamAdapter(stt = stt, vad = vad)
async def transcribe_track(participant: rtc.RemoteParticipant, track: rtc.Track):
audio_stream = rtc.AudioStream(track)
stt_forwarder = transcription.STTSegmentsForwarder(
room=ctx.room, participant=participant, track=track
)
stt_stream = stt.stream()
asyncio.create_task(_forward_transcription(stt_stream, stt_forwarder))
async for ev in audio_stream:
stt_stream.push_frame(ev.frame)
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
@ctx.room.on("track_subscribed")
def on_track_subscribed(
track: rtc.Track,
publication: rtc.TrackPublication,
participant: rtc.RemoteParticipant,
):
if track.kind == rtc.TrackKind.KIND_AUDIO:
asyncio.create_task(transcribe_track(participant, track))
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))
pip install -r requirements.txt
python m4t_translate.py
8. 测试效果
结论
本文主要介绍了通过 WebRTC 构建全双工实时交互 AI 助手的技术方案,相较于传统的 http/websocket 方案,该方案能实现打断功能,且具有更高的实时性,在交互体验上更接近真人交流。