亚马逊AWS官方博客

Amazon SageMaker 为短剧换脸服务保驾护航

引言

在当今多元化的数字时代,基于人工智能的换脸技术为影视制作带来了全新的可能性。Amazon SageMaker 作为功能强大的机器学习平台,为换脸服务提供了高效、可扩展的解决方案。该技术不仅可应用于影视制作,还能为口播数字人提供丰富的口播素材,满足直播、广告等多种场景需求。与传统换脸方案相比,SageMaker 解决方案部署简单,能够高效、经济地满足业务需求。

FaceFusion 是一款开源的换脸工具,具备高质量的换脸效果和丰富功能,通过将其部署到 SageMaker 上,可以充分利用 AWS 的计算资源,实现高效、可扩展的换脸服务。Amazon SageMaker 作为一站式的机器学习解决方案,可以轻松部署和扩展换脸服务。

FaceFusion 使用

通过该 github 地址可以快速部署测试,同时支持通过 grok 实现代理方式访问 SageMaker notebook 上运行的环境。环境访问显示如下图,这样我们可以通过 UI 界面测试了。

FaceFusion 命令行模型

使用 python run.py 的命令行方式,也可以实现 UI 能够实现的功能。

python run.py [options]
options:
-h, —help show this help message and exit
-s SOURCE_PATHS, —source SOURCE_PATHS choose single or multiple source images or audios
-t TARGET_PATH, —target TARGET_PATH choose single target image or video
-o OUTPUT_PATH, —output OUTPUT_PATH specify the output file or directory
-v, —version show program

当我们测试完毕后,经常需要将这些功能集成到已有系统中,此时就需要一个稳定的、可扩展的、安全的后端服务来支撑模型的推理。

解决方案介绍

在这个解决方案里,我会重点介绍如何将 Facefusion 模型快速部署到 SageMaker,实现基于 SageMaker 的实时推理和异步推理。如果您想从训练阶段就开始使用 SageMaker,可以参考博客如何将开源项目迁移到 SageMaker。如果您对基于 SageMaker 的数据预处理和批量推理更感兴趣,可以参考 LightGBM 算法框架运行在 Amazon Sagemaker

在我们开始介绍部署方法之前,先来了解一下 SageMaker endpoint 是如何工作的。

  1. 发送请求给 SageMaker endpoint。
  2. 当 SageMaker endpoint 接收到请求后,可以针对请求进行预处理。
  3. 将预处理完的请求发送给模型推理。
  4. 推理完成后将输出结果返回,当然这里也可以做一些后处理工作。

按照如上的流程来看,SageMaker 帮我们做了很多事情,比如数据的前处理,后处理,模型的加载等。下面我们再深入地看下 SageMaker endpoint 内部。

  1. 当 SageMaker 接收到请求后,首先请求会被部署在 SageMaker endpoint 中的 nginx server 接收。
  2. nginx 接收到请求后,会转发给托管模型的 web 服务,这里使用的 gunicorn,一款支持 WSGI 的 HTTP 服务器,用于运行基于 Python 的 web 应用服务。
  3. gunicorn 接收到请求后,将请求转发给基于 python flask 开发的 web 服务中。

为了让 SageMaker endpoint 能够满足如上流程,我们采用 Docker 镜像的方式来集成。

项目结构介绍

需要在项目中添加三个文件,以便于满足上一节介绍的流程。

  1. 安装相关依赖。
  2. 添加 nginx.conf 配置文件,定义 nginx 部分。
  3. 添加 serve 和 wsgi.py 文件,定义 gunicorn 部分。
  4. 添加 predictor.py 文件,定义 web 服务部分,这部分是集成现有模型的关键步骤。
  5. 添加 Dockerfile,为了便于集成到 SageMaker 中,我们需要添加 Dockerfile 以便安装依赖和同步代码到镜像中。

Dockerfile 介绍

FROM python:3.10
ARG DEBIAN_FRONTEND=noninteractive
ARG FACEFUSION_VERSION=2.3.0
ENV GRADIO_SERVER_NAME=0.0.0.0
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"

WORKDIR /opt/program
RUN apt-get update
RUN apt-get install curl -y
RUN apt-get install ffmpeg -y
##安装sagemaker endpoint所需的组件
RUN apt-get install nginx -y 
RUN pip install --no-cache-dir boto3 flask gunicorn
# RUN git clone https://github.com/facefusion/facefusion.git --branch ${FACEFUSION_VERSION} --single-branch .
##拷贝包含sagemaker endpoint所需的python和配置文件
COPY facefusion /opt/program
RUN python install.py --torch cpu --onnxruntime default
WORKDIR /opt/program
ENTRYPOINT ["python"]
# serve is a python script under code/ directory that launches nginx and gunicorn processes
CMD [ "serve" ]

这里需要注意安装必要的依赖,包括 nginx,gunicorn,flask 等,另外需要将添加了新文件的项目,拷贝到镜像中。

配置文件介绍

nginx.conf,wsgi 和 serve 文件,一般保持默认即可,这里就不再赘述了。

推理挂钩函数 

上面提到的 predictor.py 就是推理服务的挂钩函数,默认情况下 predictor.py 需要实现两个方法:

/ping 将接收来自基础设施的 GET 请求。如果容器已启动并接受请求,您的程序将返回 200。
/invocations 是接收客户端推理 POST 请求的端点。请求和响应的格式由算法决定。如果客户端提供了 ContentType 和 Accept 头,这些也会被传递。

# This is the file that implements a flask server to do inferences. It's the file that you will modify to
# implement the scoring for your own algorithm.

from __future__ import print_function

import io
import json
import os
import sys
import flask
import subprocess
from facefusion import core

#https://github.com/aws-samples/sagemaker-stablediffusion-quick-kit/blob/main/inference/sagemaker/byoc_sdxl/code/inference.py
prefix = "/opt/ml/"
model_path = os.path.join(prefix, "model")

# The flask app for serving predictions
app = flask.Flask(__name__)

@app.route("/ping", methods=["GET"])
def ping():
health=200
status = 200 if health else 404
return flask.Response(response="\n", status=status, mimetype="application/json")

@app.route("/invocations", methods=["POST"])
def invocations():
# 获取命令行参数
input_json = flask.request.get_json()
args = input_json['input']
print(args)
# 执行 run.py,并传递命令行参数
process = subprocess.Popen(['python', 'run.py'] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# 获取输出
stdout, stderr = process.communicate()
print(stdout.decode())
# 检查是否有错误输出
if stderr:
print(f'Error: {stderr.decode()}')
return

# 获取返回的字符串
predictions = stdout.decode().strip()

# 将输出格式化为指定的格式
result = {'results': predictions}

# 返回结果
print(json.dumps(result))

return flask.Response(response=result, status=200, mimetype="application/json")

这里的 ping 方法可以用于检测模型部署端点的状态。invocations 方法通过调用现有代码的入口类实现请求的转发,因为 FaceFusion 这个模型包含了多个模型,并基于已有逻辑构建了 pipeline,通过 subprocess.Popen 集成更加简洁。如果推理模型是单独的模型文件,那么可以直接在这里加载模型文件来实现推理。

部署流程介绍

核心步骤如下:

  1. 修改 FaceFusion 的推理文件(Predictor.py),使其能够与 Flask 服务器集成,接收 HTTP 请求并执行换脸任务。
  2. 编辑 Docker 文件,将 FaceFusion 及其依赖项打包到容器镜像中,包括 Nginx、Gunicorn、Flask 等服务组件。
  3. 构建 CPU 和 GPU 两种镜像版本,以支持不同的硬件环境。
  4. 在本地测试镜像,验证换脸功能是否正常。
  5. 使用 AWS CLI 或 Python SDK,将镜像推送到 Amazon ECR,并在 SageMaker 上创建模型。
  6. 部署 SageMaker 端点,通过 HTTP 调用实现换脸请求。
  7. 客户端发送 POST 请求到 SageMaker 端点,传递源图像/视频和目标图像/视频路径等参数。
  8. SageMaker 端点运行 FaceFusion 模型,完成换脸处理,并将结果返回给客户端。

构建镜像

# 在项目目录执行如下命令,如上 docker file 是以 CPU 举例的,可以修改使用 GPU 可以参考 gpu_Dockerfile
!./build_and_push.sh faces-swap-on-sagemaker

本地启动镜像

# 镜像构建完毕后,使用如下命令在项目目录执行,本地启动 docker 镜像
./local_test/serve_local.sh facefusion-sagemaker

本地测试

# 新建 CLI 终端执行如下命令用于本地测试
curl -XPOST localhost:8080/invocations -H 'content-type:application/json' -d '{"input":"python run.py -s image1.jpg -t test.mp4 -o . —headless"}‘

创建 SageMaker 模型

import boto3
import sagemaker
from sagemaker import Model, image_uris, serializers, deserializers
role = sagemaker.get_execution_role() # execution role for the endpoint
sess = sagemaker.session.Session() # sagemaker session for interacting with different AWS APIs
region = sess._region_name # region name of the current SageMaker Studio environment
account_id = sess.account_id() # account_id of the current SageMaker Studio environment
bucket = sess.default_bucket()
image="facefusion-sagemaker"
s3_client = boto3.client("s3")
sm_client = boto3.client("sagemaker")
smr_client = boto3.client("sagemaker-runtime")
full_image_uri=f"{account_id}.dkr.ecr.{region}.amazonaws.com/{image}:latest"
print(full_image_uri)

创建 SageMaker endpoint 配置

variantName="facefusion-sagemaker"+strftime("%Y-%m-%d-%H-%M-%S", gmtime())+"-variant"

def create_endpoint_configuration():
endpointConfigName="facefusion-sagemaker-configuration"+strftime("%Y-%m-%d-%H-%M-%S", gmtime())
create_endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=endpointConfigName,
ProductionVariants=[
{
"ModelName":model_name,
"VariantName": variantName,
"InitialInstanceCount": 1,
"InitialVariantWeight": 1.0,
"InstanceType": "ml.g5.xlarge"
}
]
)
print(create_endpoint_config_response)
return endpointConfigName

创建 SageMaker endpoint

endpointName="facefusion-sagemaker-endpoint"+strftime("%Y-%m-%d-%H-%M-%S", gmtime())
def create_endpoint():
create_endpoint_response = sm_client.create_endpoint(
EndpointName=endpointName,
#EndpointConfigName="facefusion-sagemaker-configuration2024-03-28-04-03-53",
EndpointConfigName=endpointConfigName
)
print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])
resp = sm_client.describe_endpoint(EndpointName=endpointName)
print("Endpoint Status: " + resp["EndpointStatus"])
print("Waiting for {} endpoint to be in service".format(endpointName))
waiter = sm_client.get_waiter("endpoint_in_service")
waiter.wait(EndpointName=endpointName)

远程调用测试

import json
runtime_sm_client = boto3.client(service_name="sagemaker-runtime")
endpointName="endpoint-face-swap-test-gpu"
# request = {"input":'s3://sagemaker-us-west-2-687912291502/video/test_out2.mp4',"method":"get_status"} 
# request = {"method":"submit","input":['-s','image1.jpg','-t','test.mp4','-o','/tmp/','-u','s3://zuimei-poc/facefusion/input/test_out.mp4','--headless'],}

request ={
"method": "submit","input": {"s3_source_path": "s3://zuimei-poc/facefusion/input/1acc1a7a7e2a11ee9ce99e9536155493_cover.jpg","s3_target_path": "s3://zuimei-poc/facefusion/input/2bcd97ef6b864609b4a8076a68be9f4a.jpg","source":"/opt/program/input/1acc1a7a7e2a11ee9ce99e9536155493_cover.jpg","target": "/opt/program/input/2bcd97ef6b864609b4a8076a68be9f4a.jpg","output": "/opt/program/output/","execution-providers": "cpu","s3_output_path": "s3://zuimei-poc/facefusion/output/","face-detector-model":"scrfd","log-level":"debug"}}


def invoke_endpoint():
content_type = "application/json"
#request_body = {"input":['-s', 'taotao.jpeg', '-t', 'lht.jpg', '-o', '.', '--headless']} ##输入是s3地址
#request_body = {"method":"submit","input":['-s','image1.jpg','-t','test.mp4','-o','/tmp/','-u','s3://sagemaker-us-west-2-687912291502/video/test_out.mp4','--headless'],}
request_body = request
payload = json.dumps(request_body)
print(payload)
response = runtime_sm_client.invoke_endpoint(
EndpointName=endpointName,
ContentType=content_type,
Body=payload,
)
result = response['Body'].read().decode()
print('返回:',result)

异步推理配置

_time_tag = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
_variant_name = 'facusion-'+ _time_tag
endpoint_config_name = f'facefusion-{str(uuid.uuid4())}'

response = client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
'VariantName': _variant_name,
'ModelName': ,
'InitialInstanceCount': 1,
'InstanceType': 'ml.c5.large',
'InitialVariantWeight': 1
},
]
,
AsyncInferenceConfig={
'OutputConfig': {
'S3OutputPath': "s3://zuimei-poc/facefusion/output/"
}
}
)

异步推理测试

import time
def predict_async(endpoint_name,payload):
runtime_client = boto3.client('runtime.sagemaker')
input_file=str(uuid.uuid4())+".json"
s3 = boto3.resource('s3')

# 上传字节数据
bucket="zuimei-poc"
key = f'facefusion/params/{input_file}'

# 要上传的字符串内容
content = json.dumps(payload).encode('utf-8')

# 将字符串内容上传到 S3 对象
s3.Object(bucket, key).put(Body=content)
input_location=f's3://{bucket}/facefusion/params/{input_file}'
print(f'input_location: {input_location}')

response = runtime_client.invoke_endpoint_async(
EndpointName=endpoint_name,
InputLocation=input_location
)
result =response.get("OutputLocation",'')
wait_async_result(result)

def wait_async_result(output_location,timeout=60):
current_time=0
while current_time<timeout:
if s3_object_exists(output_location):
print("have async result")
draw_image(output_location)
break
else:
time.sleep(5)
def s3_object_exists(s3_path):
"""
s3_object_exists
"""
try:
s3 = boto3.client('s3')
base_name=os.path.basename(s3_path)
_,ext_name=os.path.splitext(base_name)
bucket,key=get_bucket_and_key(s3_path)

s3.head_object(Bucket=bucket, Key=key)
return True
except Exception as ex:
print("job is not completed, waiting...") 
return False

常见问题

Q:如何避免 FaceFusion 启动 UI 界面只执行处理代码?
A:在启动处理任务时,设置 --headless 参数可以关闭 UI 界面。

Q:如何简化命令行参数的传递?
A:可以使用 subprocess.Popen(data, shell=True) 直接将传递过来的命令作为参数执行,无需逐个解析参数。

Q:使用 SageMaker Serverless 可能出现什么问题?
A:Serverless 可能会出现超时问题,因为 SageMaker endpoint 默认超时时间是 60 秒,对于视频处理请使用异步推理方案。

Inference Endpoint 推理端点远程登录

  • 在机器学习模型的生命周期中,模型部署和推理阶段是至关重要的一环。无论是使用 SageMaker Training Job 训练完成的模型,还是从 HuggingFace 等开源库中获取的预训练模型,最终都需要在 GPU 机器上进行部署,以便于进行模型推理和性能调优。
  • 在部署 FaceFusion 模型之前,我们需要对模型进行全面的推理测试,以确保其在生产环境中可以正常工作。在生产环境中,模型可能会遇到各种意料之外的问题,例如内存泄漏、资源竞争等。以及在推理实例上进行各种性能测试和分析,包括 CPU/GPU 使用率、内存占用、延迟分布等。这些工程化及实施层面均需要登录和操作推理实例的 GPU 服务器,查看日志、监控系统指标,从而快速定位并解决问题。
  • 与 Training Job 一样,SageMaker 的 Inference Endpoint 可以通过 ssh helper utility 登陆到 SageMaker endpoint 推理实例的容器镜像中。
  • ssh helper 是 Amazon 提供的一个开源 utility 工具 lib,其中包了丰富的 api 和封装功能,比如 Training Job/Inference Endpoint 等的 ssh 连接。详见附录参考资料。

具体操作步骤如下:

  • 与 training Job 训练任务类似,在推理代码中,我们通过 ssh helper 的 sdk 启动 ssm 服务端。
    import os
    import sys
    
    ## for debug only
    import os
    import sagemaker_ssh_helper
    sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))
    sagemaker_ssh_helper.setup_and_start_ssh()
    
  • Model 创建的时候,指定 ssh helper 的 warpper。
  • Deploy model,与 training job 类似,使用 SSHModelWrapper 包装常规的 SageMaker endpoint 的 Model,代码示例如下:
    from sagemaker_ssh_helper.wrapper import SSHModelWrapper
    instance_type = "ml.g5.xlarge"
    endpoint_name = sagemaker.utils.name_from_base("facefusion-byoc")
    
    model = Model(image_uri=full_image_uri, 
                  model_data=model_data, 
                  role=role,dependencies=[SSHModelWrapper.dependency_dir()] )
    ssh_wrapper = SSHModelWrapper.create(model, connection_wait_time_seconds=0) 
    
    predictor = model.deploy(
        initial_instance_count=1,
        instance_type=instance_type,
        endpoint_name=endpoint_name,
        wait=True
    )
    
  • SageMaker Inference endpoint 部署成功后,终端节点的 instance id 同样可以通过 sshModelWrapper 包装器 API 获得,从而可通过 ssm 客户端登陆到 endpoint 推理终端节点。

SageMaker 实时推理和异步推理的选择

  1. 实时推理需要持续运行的端点,可快速响应,异步步推理根据工作负载自动扩展计算资源,按需付费更节省。
  2. 实时推理对低延迟有很高要求,适合交互式应用,异步推理可处理运行时间较长的任务,成本效率更高。

通过如上对比不难看出,本次实验的场景我们更倾向于使用异步推理,达到成本效益最佳。

总结

Amazon SageMaker 为短剧换脸服务提供了高效、可扩展的解决方案。通过将 FaceFusion 部署到 SageMaker 上,可以充分利用 AWS 的计算资源和弹性扩展能力,实现稳定的、高质量的换脸服务。尽管本文以 FaceFusion 来举例,但是整个推理的方案可以适配不同的模型。

参考链接

SageMaker ssh helper 远程登录:

https://github.com/aws-samples/sagemaker-ssh-helper

SageMaker Byoc 部署:

https://aws.amazon.com/blogs/machine-learning/build-and-deploy-ml-inference-applications-from-scratch-using-amazon-sagemaker/

本篇作者

刘恒涛

亚马逊云科技解决方案架构师,负责基于 AWS 的云计算方案架构咨询和设计。同时致力于 AWS 云服务在国内的应用和推广,当前重点关注机器学习以及 Serverless 领域。

唐清原

亚马逊云科技高级解决方案架构师,负责 Data Analytic & AIML 产品服务架构设计以及解决方案。10+数据领域研发及架构设计经验,历任 IBM 咨询顾问,Oracle 高级咨询顾问,澳新银行数据部领域架构师职务。在大数据 BI,数据湖,推荐系统,MLOps 等平台项目有丰富实战经验。