亚马逊AWS官方博客

基于 AWS IoT Greengrass 端侧实时人脸检测

AWS IoT Greengrass很容易部署在设备侧/网关侧,同时也提供良好的运行时环境,针对安防监控厂商Camera设备可以结合AWS IoT Greengrass来实现边缘侧AI/ML场景。这里通过树莓派部署AWS IoT Greengrass跑dlib库从摄像机实时视频流中抽取视频帧来实现人脸识别和比对。

 

准备工作:

  • 一台树莓派设备,本方案采用RaspberryPi 4B ,CSI摄像头。
  • 将CSI摄像头处理为Raspbian OS能识别的设备,需开启V4l2 Module
  • 树莓派上安装python3运行环境
  • 安装AWS IoT Greengrass,参考官方文档,这里创建名称为“raspberrypiGroup”组
  • 上传一张照片到树莓派指定目录下用于后续人脸比对。

架构图

创建存储桶

创建名称为”greengrass-detect-realtime-video”图片桶,区域选择”新加坡”


创建访问密钥

IAM服务中新建一个用户,选择编程访问,选择


选择创建策略,选择JSON,输入内容如下

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "s3:ListAllMyBuckets",
            "Resource": "arn:aws:s3:::*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::greengrass-detect-realtime-video"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl",
                "s3:GetObject",
                "s3:GetObjectAcl",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::greengrass-detect-realtime-video/*"
        }
    ]
}

选择安全证书,创建访问密钥

下载保存密钥后,回到服务界面选择AWS Secrets Manager服务,选择“存储新的密钥”,选择“其他类型的密钥”,密钥键/值新增1行,输入上面保存的密钥信息。密钥ID名称”access_key_id”,密钥名称“access_secret_key”。
输入密钥名称“greengrass-lambda-access-s3-secretkey”

 创建Local Lambda函数

进入AWS Lambda控制台,创建名称为 “local_face_detection” Lambda函数,选择运行时环境Python 3.7,导入Greengrass Core Python SDK. SDK下载地址:https://github.com/aws/aws-greengrass-core-sdk-python.

localFaceDetection.py主要代码实现

 初始化Greengrass 客户端

client = greengrasssdk.client('iot-data')

指定iot topic,face_recongnition用于上报人脸识别结果,recognition_failed用于识别错误日志

iotTopic = 'face_recognition'

errTopic = 'recognition_failed'

获取Secret Manager中存储的访问密钥,密钥内容写入secret

get_secret()
secret_json = json.loads(secret)
#提取访问密钥ID
access_key_id = secret_json['access_key_id']
#提取访问密钥KEY
access_secret_key = secret_json['access_secret_key']

初始化S3客户端

clientS3 = boto3.client(
    's3',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_secret_key
)

指定树莓派上传人脸图片的bucket名称

bucket='greengrass-detect-realtime-video'

指定树莓派上进行人脸比对的基准照片,这个基准照需要提前上传到树莓派,比如:/home/pi/XXX.jpg,基准照也可以从S3下载,这里略过。

filesUrl = ['<RASPBERRYPI_LOCAL_FACE_IMAGE>']

从Secrets Manager提取访问密钥

def get_secret():
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    except ClientError as e:
        print(e)
    else:
        if 'SecretString' in get_secret_value_response:
            global secret
            secret = get_secret_value_response['SecretString']
        else:
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])

从Camera设备或RTSP Proxy获取实时视频流,将视频帧写入队列,队列中始终保持留存1帧,丢弃队列中其他帧,这里主要解决消费视频帧过慢导致视频帧积压处理不及时。

def frame_input(q):
    cap = cv2.VideoCapture('/dev/video0')
    print("Raspberry Pi 4B - connected")
    while True:
        st = time.time()
        ret,frm = cap.read()
        if not(ret):
            cap.release()
            cap = cv2.VideoCapture('/dev/video0')
            #cap = cv2.VideoCapture("rtsp://localhost:8554/unicast")
            print("total time lost due to reinitialization : ",time.time()-st)
            continue
        q.put(frm)
        if q.qsize() > 1:
            for i in range(q.qsize()-1):
                q.get()

定义线程从Camera获取实时视频流

class Frame_Thread(Thread):
    def __init__(self):
        ''' Constructor. '''
        Thread.__init__(self)
        
    def run(self):
        print("start queue read frame")
        mp.set_start_method('fork',True)
        process = mp.Process(target=frame_input,args=(queue,))
        process.daemon = True
        process.start()

定义队列深度为4,启动线程读取视频帧

queue = mp.Queue(maxsize=4)
frame_thread=Frame_Thread()
frame_thread.start()
print("started read read-time video frame from src [camera device / rtsp server]")

提取基准照片的面部编码和文件路径的文件名key-value形式存入faces_dict中

def load_local_image(filesToLoad,newFile):
    global filesUrl
    global faces_dict
    for url in filesToLoad:
        img=scipy.misc.imread(url,mode='RGB')
        faces_dict.update({remove_file_ext(url):face_recognition.face_encodings(img)[0]})
        if(newFile):
            filesUrl.append(url)
        client.publish(topic=iotTopic, payload="images are loaded from local")

load_local_image(filesUrl,0)

从队列queue中获取视频帧,resize ¼ ,颜色转换从BGR转RGB

frame = queue.get()
if frame is None:
    client.publish(topic=errTopic, payload="Failed to get frame from the stream")
    continue
else:
    # 为加速人脸识别处理这里将视频帧size缩小为原有1/4 
    small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
    # 将图像从BGR颜色(OpenCV使用颜色)转换为RGB颜色(face_recognition使用颜色)
    rgb_small_frame = small_frame[:, :, ::-1]
# 仅处理其他视频帧以节省时间

查找当前视频帧中的所有面部和面部编码,面部编码匹配人脸

face_locations = face_recognition.face_locations(rgb_small_frame)
face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)
print('process this frame start:',len(face_locations),len(face_encodings))
face_names = []
for face_encoding in face_encodings:
    name = "Unknown"
    images_encodings = list(faces_dict.values())
    global dist
    dist = 0
    #面部编码匹配人脸
    match_result = face_recognition.compare_faces(images_encodings,face_encoding,tolerance=0.45)
    for idx, match in enumerate(match_result):
        if match:
            image_encoding = images_encodings[idx]
            dist = face_recognition.face_distance([image_encoding],face_encoding)[0]
            dist = (1.0 - dist) * 100
            print("name : {} face_recogniton dist value : {}".format(list(faces_dict.keys())[idx],dist))

 

如果dist>70 上传图片 & 上报识别结果

global jpeg
if dist > 70.0 :
    try:
        imgID = time.strftime("%Y%m%d%H%M%S")+str(random.randint(0,99))
        #将置信度>70的视频帧转成图片上传S3指定桶'greengrass-detect-realtime-video'
        s3Resp =clientS3.put_object(Bucket='greengrass-detect-realtime-video', Key=imgID+'.jpg', Body=jpeg.tobytes(), ACL="private")
        print(s3Resp)
        #上报人脸识别结果和识别图片名称
        msg = '{{"FaceName":"{0}","dist":"{1}","imageName":"{2}","time":"{3}","desc":"{4}"}}'.format(str(name),str(dist),(imgID+".jpg"),time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()),"Uploaded Rapsberry Pi face detection image.")
        #上报IoT Core 人脸名称,置信度,S3图片名称,时间戳等
        client.publish(topic=iotTopic, payload=msg)

local_face_detection.py的完整源代码可以从这里获取。

在local_face_detection函数console界面选择“操作”,选择“发布新版本”,这里版本号是37。选择“创建别名”,输入名称“prod”,选择版本37

AWS Lambda添加Greengrass

将函数local_face_detection添加Greengrass组“raspberrypiGroup

选择上面别名”prod”

添加完成后,选择“编辑”:

  • 运行身份:使用默认组 (当前: ggc_user/ggc_group)
  • 容器化:Greengrass 容器 (始终)
  • 内存限制:512MB , 超时:默认3秒
  • AWS Lambda 生命周期:使此函数长时间生存,保持其无限期运行
  • 对 /sys 目录的只读访问权限:启用
  • 输入负载数据类型:json

添加资源camera

  • 资源名称:Camera
  • 资源类型:设备
  • 设备路径:/dev/video0
  • 组拥有文件访问权限:自动添加拥有资源的 Linux 组的操作系统组权限
  • 为此 AWS Lambda 函数选择权限:读写访问权限

添加资源tmp

  • 资源名称:tmp
  • 资源类型:卷
  • 源路径:/tmp 目的地路径: /tmp
  • 组拥有文件访问权限:自动添加拥有资源的 Linux 组的操作系统组权限
  • 为此 AWS Lambda 函数选择权限:读写访问权限

 

创建Greengrass组角色

Identity and Access Management (IAM)控制台创建一个角色GreengrassRole,将策略AWSGreengrassResourceAccessRolePolicy添加到角色

选择“添加内联策略”


输入下面的策略

{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "secretsmanager:GetSecretValue",
        "Resource": "*"
    }
}

 

 

输入策略名称GreengrassSecretValuePolicy

创建订阅

创建订阅界面,选择源为“local_face_detection”,选择目标“IoT Cloud”


主题筛选条件输入“#”

测试

测试界面选择订阅主题“face_recognition”

设备检测到人脸,从实时视频流获取frame生成图片推到S3。

从S3存储桶中可以看到设备传上来的图片

小结,本方案展示了树莓派系统上运行 AWS IoT Greengrass 实现人脸检测功能。很多客户希望通过边缘侧实现AI/ML 场景可以结合Greengrass+Lambda来实现。权限部分可以选择AWS Secret Manager来管理密钥,AWS IoT Greengrass角色添加AWS Secret Manager 访问权限后,设备可按需获取密钥从而避免本地硬编码。

本篇作者

周晓明

AWS GCR解决方案架构师。