AWS IoT Greengrass很容易部署在设备侧/网关侧,同时也提供良好的运行时环境,针对安防监控厂商Camera设备可以结合AWS IoT Greengrass来实现边缘侧AI/ML场景。这里通过树莓派部署AWS IoT Greengrass跑dlib库从摄像机实时视频流中抽取视频帧来实现人脸识别和比对。
 
        
 
       准备工作:
 
        
        - 一台树莓派设备,本方案采用RaspberryPi 4B ,CSI摄像头。
- 将CSI摄像头处理为Raspbian OS能识别的设备,需开启V4l2 Module
- 树莓派上安装python3运行环境
- 安装AWS IoT Greengrass,参考官方文档,这里创建名称为“raspberrypiGroup”组
- 上传一张照片到树莓派指定目录下用于后续人脸比对。

 
       架构图
 
 
       创建存储桶
 
       创建名称为”greengrass-detect-realtime-video”图片桶,区域选择”新加坡”
 
       
 创建访问密钥
 
       IAM服务中新建一个用户,选择编程访问,选择
 
 
 选择创建策略,选择JSON,输入内容如下
 
        
        {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "s3:ListAllMyBuckets",
            "Resource": "arn:aws:s3:::*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::greengrass-detect-realtime-video"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl",
                "s3:GetObject",
                "s3:GetObjectAcl",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::greengrass-detect-realtime-video/*"
        }
    ]
}
 
         
       选择安全证书,创建访问密钥
 
 下载保存密钥后,回到服务界面选择AWS Secrets Manager服务,选择“存储新的密钥”,选择“其他类型的密钥”,密钥键/值新增1行,输入上面保存的密钥信息。密钥ID名称”access_key_id”,密钥名称“access_secret_key”。
  输入密钥名称“greengrass-lambda-access-s3-secretkey”
输入密钥名称“greengrass-lambda-access-s3-secretkey”
 
       
 
        创建Local Lambda函数
 
       进入AWS Lambda控制台,创建名称为 “local_face_detection” Lambda函数,选择运行时环境Python 3.7,导入Greengrass Core Python SDK. SDK下载地址:https://github.com/aws/aws-greengrass-core-sdk-python.
 
       localFaceDetection.py主要代码实现
 
        初始化Greengrass 客户端
 
       client = greengrasssdk.client('iot-data')
 
       指定iot topic,face_recongnition用于上报人脸识别结果,recognition_failed用于识别错误日志
 
       iotTopic = 'face_recognition'
 
       errTopic = 'recognition_failed'
 
       获取Secret Manager中存储的访问密钥,密钥内容写入secret
 
        
        get_secret()
secret_json = json.loads(secret)
#提取访问密钥ID
access_key_id = secret_json['access_key_id']
#提取访问密钥KEY
access_secret_key = secret_json['access_secret_key']
 
         
       初始化S3客户端
 
        
        clientS3 = boto3.client(
    's3',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=access_secret_key
)
 
         
       指定树莓派上传人脸图片的bucket名称
 
        
        bucket='greengrass-detect-realtime-video'
 
         
       指定树莓派上进行人脸比对的基准照片,这个基准照需要提前上传到树莓派,比如:/home/pi/XXX.jpg,基准照也可以从S3下载,这里略过。
 
        
        filesUrl = ['<RASPBERRYPI_LOCAL_FACE_IMAGE>']
 
         
       从Secrets Manager提取访问密钥
 
        
        def get_secret():
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    except ClientError as e:
        print(e)
    else:
        if 'SecretString' in get_secret_value_response:
            global secret
            secret = get_secret_value_response['SecretString']
        else:
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
 
         
       从Camera设备或RTSP Proxy获取实时视频流,将视频帧写入队列,队列中始终保持留存1帧,丢弃队列中其他帧,这里主要解决消费视频帧过慢导致视频帧积压处理不及时。
 
        
        def frame_input(q):
    cap = cv2.VideoCapture('/dev/video0')
    print("Raspberry Pi 4B - connected")
    while True:
        st = time.time()
        ret,frm = cap.read()
        if not(ret):
            cap.release()
            cap = cv2.VideoCapture('/dev/video0')
            #cap = cv2.VideoCapture("rtsp://localhost:8554/unicast")
            print("total time lost due to reinitialization : ",time.time()-st)
            continue
        q.put(frm)
        if q.qsize() > 1:
            for i in range(q.qsize()-1):
                q.get()
 
         
       定义线程从Camera获取实时视频流
 
        
        class Frame_Thread(Thread):
    def __init__(self):
        ''' Constructor. '''
        Thread.__init__(self)
        
    def run(self):
        print("start queue read frame")
        mp.set_start_method('fork',True)
        process = mp.Process(target=frame_input,args=(queue,))
        process.daemon = True
        process.start()
 
         
       定义队列深度为4,启动线程读取视频帧
 
        
        queue = mp.Queue(maxsize=4)
frame_thread=Frame_Thread()
frame_thread.start()
print("started read read-time video frame from src [camera device / rtsp server]")
 
         
       提取基准照片的面部编码和文件路径的文件名key-value形式存入faces_dict中
 
        
        def load_local_image(filesToLoad,newFile):
    global filesUrl
    global faces_dict
    for url in filesToLoad:
        img=scipy.misc.imread(url,mode='RGB')
        faces_dict.update({remove_file_ext(url):face_recognition.face_encodings(img)[0]})
        if(newFile):
            filesUrl.append(url)
        client.publish(topic=iotTopic, payload="images are loaded from local")
load_local_image(filesUrl,0)
 
         
       从队列queue中获取视频帧,resize ¼ ,颜色转换从BGR转RGB
 
        
        frame = queue.get()
if frame is None:
    client.publish(topic=errTopic, payload="Failed to get frame from the stream")
    continue
else:
    # 为加速人脸识别处理这里将视频帧size缩小为原有1/4 
    small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
    # 将图像从BGR颜色(OpenCV使用颜色)转换为RGB颜色(face_recognition使用颜色)
    rgb_small_frame = small_frame[:, :, ::-1]
# 仅处理其他视频帧以节省时间
 
         
       查找当前视频帧中的所有面部和面部编码,面部编码匹配人脸
 
        
        face_locations = face_recognition.face_locations(rgb_small_frame)
face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)
print('process this frame start:',len(face_locations),len(face_encodings))
face_names = []
for face_encoding in face_encodings:
    name = "Unknown"
    images_encodings = list(faces_dict.values())
    global dist
    dist = 0
    #面部编码匹配人脸
    match_result = face_recognition.compare_faces(images_encodings,face_encoding,tolerance=0.45)
    for idx, match in enumerate(match_result):
        if match:
            image_encoding = images_encodings[idx]
            dist = face_recognition.face_distance([image_encoding],face_encoding)[0]
            dist = (1.0 - dist) * 100
            print("name : {} face_recogniton dist value : {}".format(list(faces_dict.keys())[idx],dist))
 
         
        
 
       如果dist>70 上传图片 & 上报识别结果
 
        
        global jpeg
if dist > 70.0 :
    try:
        imgID = time.strftime("%Y%m%d%H%M%S")+str(random.randint(0,99))
        #将置信度>70的视频帧转成图片上传S3指定桶'greengrass-detect-realtime-video'
        s3Resp =clientS3.put_object(Bucket='greengrass-detect-realtime-video', Key=imgID+'.jpg', Body=jpeg.tobytes(), ACL="private")
        print(s3Resp)
        #上报人脸识别结果和识别图片名称
        msg = '{{"FaceName":"{0}","dist":"{1}","imageName":"{2}","time":"{3}","desc":"{4}"}}'.format(str(name),str(dist),(imgID+".jpg"),time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()),"Uploaded Rapsberry Pi face detection image.")
        #上报IoT Core 人脸名称,置信度,S3图片名称,时间戳等
        client.publish(topic=iotTopic, payload=msg)
 
         
       local_face_detection.py的完整源代码可以从这里获取。
 
       在local_face_detection函数console界面选择“操作”,选择“发布新版本”,这里版本号是37。选择“创建别名”,输入名称“prod”,选择版本37
 
 AWS Lambda添加Greengrass组
 
       将函数local_face_detection添加Greengrass组“raspberrypiGroup”
 
 选择上面别名”prod”
 
 添加完成后,选择“编辑”:
 
        
        - 运行身份:使用默认组 (当前: ggc_user/ggc_group)
- 容器化:Greengrass 容器 (始终)
- 内存限制:512MB , 超时:默认3秒
- AWS Lambda 生命周期:使此函数长时间生存,保持其无限期运行
- 对 /sys 目录的只读访问权限:启用
- 输入负载数据类型:json
添加资源camera
 
        
        - 资源名称:Camera
- 资源类型:设备
- 设备路径:/dev/video0
- 组拥有文件访问权限:自动添加拥有资源的 Linux 组的操作系统组权限
- 为此 AWS Lambda 函数选择权限:读写访问权限
添加资源tmp
 
        
        - 资源名称:tmp
- 资源类型:卷
- 源路径:/tmp 目的地路径: /tmp
- 组拥有文件访问权限:自动添加拥有资源的 Linux 组的操作系统组权限
- 为此 AWS Lambda 函数选择权限:读写访问权限
 
 
       创建Greengrass组角色
 
       Identity and Access Management (IAM)控制台创建一个角色GreengrassRole,将策略AWSGreengrassResourceAccessRolePolicy添加到角色
 
 选择“添加内联策略”
 
       
 
 
       输入下面的策略
 
        
        {
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "secretsmanager:GetSecretValue",
        "Resource": "*"
    }
}
 
         
        
 
        
 
       输入策略名称GreengrassSecretValuePolicy
 
       
 
       创建订阅
 
       创建订阅界面,选择源为“local_face_detection”,选择目标“IoT Cloud”
 
       
 主题筛选条件输入“#”
 
       
 
       测试
 
       测试界面选择订阅主题“face_recognition”
 
       
 
       设备检测到人脸,从实时视频流获取frame生成图片推到S3。
 
       从S3存储桶中可以看到设备传上来的图片
 
 
       小结,本方案展示了树莓派系统上运行 AWS IoT Greengrass 实现人脸检测功能。很多客户希望通过边缘侧实现AI/ML 场景可以结合Greengrass+Lambda来实现。权限部分可以选择AWS Secret Manager来管理密钥,AWS IoT Greengrass角色添加AWS Secret Manager 访问权限后,设备可按需获取密钥从而避免本地硬编码。
 
       本篇作者