亚马逊AWS官方博客

使用Python语言实现Transcribe Streaming的websocket协议

概述

Amazon Transcribe是自动语音识别(ASR)服务,可让开发人员轻松地为其应用程序添加语音转文本功能,Transcribe支持文件和流式Streaming的两种音频输入方式,Transcribe Streaming可以应用在会议记录,语音控制交互,语言实时翻译等场景,Streaming方式支持HTTP/2和WebSocket两种协议。本文介绍使用Python语言实现Transcribe Streaming的WebSocket协议。

Streaming transcription 接口介绍

Streaming transcription 接口可以接收音频流并且实时转换为文字,然后将结果返回客户端,同时返回数据中包含partial值,用来标示句子是否结束。

Streaming的数据是被编码的,由prelude和data组成。编码格式详见:https://docs.aws.amazon.com/transcribe/latest/dg/event-stream.html

Python语言的实现过程和示例

Python示例程序的运行环境是Python 3.7.9版本。

  • 添加IAM Policy到你使用到的IAM user
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "transcribestreaming",
            "Effect": "Allow",
            "Action": "transcribe:StartStreamTranscriptionWebSocket",
            "Resource": "*"
        }
    ]
}
  • 安装Python的程序包

Python示例程序需要安装三个程序包websocket-client,boto3和amazon_transcribe;其中boto3是AWS SDK for Python,amazon_transcribe是Amazon Transcribe Streaming SDK,这两个SDK简化了和Amazon Transcribe Service的集成过程。amazon_transcribe的详细说明见:https://github.com/awslabs/amazon-transcribe-streaming-sdk

安装程序包的命令:

pip3 install boto3
pip3 install amazon_transcribe
pip3 install websocket-client

Python程序的import部分:

import hashlib
import hmac
import urllib.parse
from datetime import datetime
import time
import ssl
import json
import websocket
import _thread
from amazon_transcribe.eventstream import EventStreamMessageSerializer
from amazon_transcribe.eventstream import EventStreamBuffer
from boto3.session import Session
  • 创建签名URL的函数

URL签名说明详见:https://docs.aws.amazon.com/transcribe/latest/dg/websocket.html#websocket-url

Python的实现示例:

下列代码中主体函数是create_pre_signed_url,它将生成访问Streaming transcription 接口的URL,其中包括必要的参数和签名,它需要传入4个参数:

  • 参数region代表将要调用的Amazon Web Service Region。可查看Streaming支持的region,详见Docs链接的Amazon Transcribe Streaming部分(https://docs.aws.amazon.com/general/latest/gr/transcribe.html
  • 参数language_code, media_encoding, sample_rate是stream-transcription-websocket接口的参数,定义见https://docs.aws.amazon.com/transcribe/latest/dg/websocket.html#websocket-url
def sign(key, msg):
    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()

def getSignatureKey(key, dateStamp, region, serviceName):
    kDate = sign(("AWS4" + key).encode("utf-8"), dateStamp)
    kRegion = sign(kDate, region)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, "aws4_request")
    return kSigning

def create_pre_signed_url(region, language_code, media_encoding, sample_rate):
    # 获得access key和secret key
    credentials = Session().get_credentials()
    access_key_id = credentials.access_key
    secret_access_key = credentials.secret_key

    method = "GET"
    service = "transcribe"
    endpoint = "wss://transcribestreaming." + region + ".amazonaws.com:8443"
    host = "transcribestreaming." + region + ".amazonaws.com:8443"
    algorithm = "AWS4-HMAC-SHA256"

    t = datetime.utcnow()
    amz_date =t.strftime('%Y%m%dT%H%M%SZ')
    datestamp =t.strftime('%Y%m%d')

    canonical_uri = "/stream-transcription-websocket"

    canonical_headers = "host:" + host + "\n"
    signed_headers = "host"

    credential_scope = datestamp + "/" + region + "/" + service + "/" + "aws4_request"

    canonical_querystring = "X-Amz-Algorithm=" + algorithm
    canonical_querystring += "&X-Amz-Credential=" + urllib.parse.quote_plus(access_key_id + "/" + credential_scope)
    canonical_querystring += "&X-Amz-Date=" + amz_date
    canonical_querystring += "&X-Amz-Expires=300"
    canonical_querystring += "&X-Amz-SignedHeaders=" + signed_headers
    canonical_querystring += "&language-code="+ language_code +"&media-encoding=" + media_encoding +"&sample-rate=" + sample_rate

    # Zero length string for connecting
    payload_hash = hashlib.sha256(("").encode('utf-8')).hexdigest()

    canonical_request = method + '\n' \
                        + canonical_uri + '\n' \
                        + canonical_querystring + '\n' \
                        + canonical_headers + '\n' \
                        + signed_headers + '\n' \
                        + payload_hash

    string_to_sign = algorithm + "\n" \
                     + amz_date + "\n" \
                     + credential_scope + "\n" \
                     + hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()

    signing_key = getSignatureKey(secret_access_key, datestamp, region, service)

    signature = hmac.new(signing_key, string_to_sign.encode("utf-8"),
                         hashlib.sha256).hexdigest()

    canonical_querystring += "&X-Amz-Signature=" + signature

    request_url = endpoint + canonical_uri + "?" + canonical_querystring

    return request_url
  • 编写main函数

下面代码中的loop_receiving和send_data函数,作用分别是从Amazon Transcribe Service接收消息,和向Amazon Transcribe Service发送消息。

def main():
    url = create_pre_signed_url("us-east-1", "en-US", "pcm", "16000")
    ws = websocket.create_connection(url, sslopt={"cert_reqs": ssl.CERT_NONE})

    _thread.start_new_thread(loop_receiving, (ws,))
    print("Receiving...")
    send_data(ws)

    while True:
        time.sleep(1)
main()
  • 编写loop_receiving函数

该函数位于main函数上方。它将接收Amazon Transcribe Streaming Service的返回数据,并且打印出来。

def loop_receiving(ws):
    try:
        while True:
            result = ws.recv()

            if result == '':
                continue

            eventStreamBuffer = EventStreamBuffer()

            eventStreamBuffer.add_data(result)
            eventStreamMessage = eventStreamBuffer.next()

            stream_payload = eventStreamMessage.payload

            transcript = json.loads(bytes.decode(stream_payload, "UTF-8"))

            print("response:",transcript)

            results = transcript['Transcript']['Results']
            if len(results)>0:
                for length in range(len(results)):
                    if 'IsPartial' in results[length]:
                        print('IsPartial:', results[length]['IsPartial'])

                    if 'Alternatives' in results[length]:
                        alternatives = results[length]['Alternatives']
                        if len(alternatives)>0:
                            for sublength in range(len(alternatives)):
                                if 'Transcript' in alternatives[sublength]:
                                    print('Transcript:', alternatives[sublength]['Transcript'])


    except Exception as e:
        if 'WebSocketConnectionClosedException' == e.__class__.__name__:
            print("Error: websocket connection is closed")
        else:
            print(f"Exception Name: {e.__class__.__name__}")
  • 编写send_data函数

该函数位于main函数上方。它将发送音频数据到Amazon Transcribe Streaming Service。其中testFile变量是测试音频文件地址,测试音频为pem格式,英语,采样率为16000。

def send_data(ws):

    testFile = "xxx.pem"

    bufferSize = 1024*16

    stream_headers = {
        ":message-type": "event",
        ":event-type": "AudioEvent",
        ":content-type": "application/octet-stream",
    }

    eventstream_serializer = EventStreamMessageSerializer()

    with open(testFile, "rb") as source:
        while True:
            audio_chunk = source.read(bufferSize)
            # 将音频数据进行编码
            event_bytes = eventstream_serializer.serialize(stream_headers, audio_chunk)

            ws.send(event_bytes, opcode = 0x2) # 0 x 2 send binary

            # end with b'' data bytes
            if len(audio_chunk) == 0:
                break

结论

在这篇文章中,介绍了如何使用Python语言实现Transcribe Streaming的WebSocket协议,提供了Python的例子供参考,包括签名URL、数据编码、数据流的发送和接收等部分。完整代码见:https://github.com/xuemark/transcribe/blob/master/transcribe_streaming_websocket.py

参考资料

本篇作者

薛召兵

AWS解决方案架构师,负责帮助客户进行上云架构的设计和咨询。同时致力于AWS容器服务、媒体服务和机器学习服务在国内和全球商业客户的应用和推广,推进企业服务迁移上云进程。有10年以上的软件开发、售前技术支持、系统架构设计等经验。