AWS IoT Greengrass很容易部署在设备侧/网关侧,同时也提供良好的运行时环境,针对安防监控厂商Camera设备可以结合AWS IoT Greengrass来实现边缘侧AI/ML场景。这里通过树莓派部署AWS IoT Greengrass跑dlib库从摄像机实时视频流中抽取视频帧来实现人脸识别和比对。
准备工作:
- 一台树莓派设备,本方案采用RaspberryPi 4B ,CSI摄像头。
- 将CSI摄像头处理为Raspbian OS能识别的设备,需开启V4l2 Module
- 树莓派上安装python3运行环境
- 安装AWS IoT Greengrass,参考官方文档,这里创建名称为“raspberrypiGroup”组
- 上传一张照片到树莓派指定目录下用于后续人脸比对。
架构图
创建存储桶
创建名称为”greengrass-detect-realtime-video”图片桶,区域选择”新加坡”
创建访问密钥
IAM服务中新建一个用户,选择编程访问,选择
选择创建策略,选择JSON,输入内容如下
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:ListAllMyBuckets",
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::greengrass-detect-realtime-video"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:GetObjectAcl",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::greengrass-detect-realtime-video/*"
}
]
}
选择安全证书,创建访问密钥
下载保存密钥后,回到服务界面选择AWS Secrets Manager服务,选择“存储新的密钥”,选择“其他类型的密钥”,密钥键/值新增1行,输入上面保存的密钥信息。密钥ID名称”access_key_id”,密钥名称“access_secret_key”。
输入密钥名称“greengrass-lambda-access-s3-secretkey”
创建Local Lambda函数
进入AWS Lambda控制台,创建名称为 “local_face_detection” Lambda函数,选择运行时环境Python 3.7,导入Greengrass Core Python SDK. SDK下载地址:https://github.com/aws/aws-greengrass-core-sdk-python.
localFaceDetection.py主要代码实现
初始化Greengrass 客户端
client = greengrasssdk.client('iot-data')
指定iot topic,face_recongnition用于上报人脸识别结果,recognition_failed用于识别错误日志
iotTopic = 'face_recognition'
errTopic = 'recognition_failed'
获取Secret Manager中存储的访问密钥,密钥内容写入secret
get_secret()
secret_json = json.loads(secret)
#提取访问密钥ID
access_key_id = secret_json['access_key_id']
#提取访问密钥KEY
access_secret_key = secret_json['access_secret_key']
初始化S3客户端
clientS3 = boto3.client(
's3',
aws_access_key_id=access_key_id,
aws_secret_access_key=access_secret_key
)
指定树莓派上传人脸图片的bucket名称
bucket='greengrass-detect-realtime-video'
指定树莓派上进行人脸比对的基准照片,这个基准照需要提前上传到树莓派,比如:/home/pi/XXX.jpg,基准照也可以从S3下载,这里略过。
filesUrl = ['<RASPBERRYPI_LOCAL_FACE_IMAGE>']
从Secrets Manager提取访问密钥
def get_secret():
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
print(e)
else:
if 'SecretString' in get_secret_value_response:
global secret
secret = get_secret_value_response['SecretString']
else:
decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
从Camera设备或RTSP Proxy获取实时视频流,将视频帧写入队列,队列中始终保持留存1帧,丢弃队列中其他帧,这里主要解决消费视频帧过慢导致视频帧积压处理不及时。
def frame_input(q):
cap = cv2.VideoCapture('/dev/video0')
print("Raspberry Pi 4B - connected")
while True:
st = time.time()
ret,frm = cap.read()
if not(ret):
cap.release()
cap = cv2.VideoCapture('/dev/video0')
#cap = cv2.VideoCapture("rtsp://localhost:8554/unicast")
print("total time lost due to reinitialization : ",time.time()-st)
continue
q.put(frm)
if q.qsize() > 1:
for i in range(q.qsize()-1):
q.get()
定义线程从Camera获取实时视频流
class Frame_Thread(Thread):
def __init__(self):
''' Constructor. '''
Thread.__init__(self)
def run(self):
print("start queue read frame")
mp.set_start_method('fork',True)
process = mp.Process(target=frame_input,args=(queue,))
process.daemon = True
process.start()
定义队列深度为4,启动线程读取视频帧
queue = mp.Queue(maxsize=4)
frame_thread=Frame_Thread()
frame_thread.start()
print("started read read-time video frame from src [camera device / rtsp server]")
提取基准照片的面部编码和文件路径的文件名key-value形式存入faces_dict中
def load_local_image(filesToLoad,newFile):
global filesUrl
global faces_dict
for url in filesToLoad:
img=scipy.misc.imread(url,mode='RGB')
faces_dict.update({remove_file_ext(url):face_recognition.face_encodings(img)[0]})
if(newFile):
filesUrl.append(url)
client.publish(topic=iotTopic, payload="images are loaded from local")
load_local_image(filesUrl,0)
从队列queue中获取视频帧,resize ¼ ,颜色转换从BGR转RGB
frame = queue.get()
if frame is None:
client.publish(topic=errTopic, payload="Failed to get frame from the stream")
continue
else:
# 为加速人脸识别处理这里将视频帧size缩小为原有1/4
small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
# 将图像从BGR颜色(OpenCV使用颜色)转换为RGB颜色(face_recognition使用颜色)
rgb_small_frame = small_frame[:, :, ::-1]
# 仅处理其他视频帧以节省时间
查找当前视频帧中的所有面部和面部编码,面部编码匹配人脸
face_locations = face_recognition.face_locations(rgb_small_frame)
face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)
print('process this frame start:',len(face_locations),len(face_encodings))
face_names = []
for face_encoding in face_encodings:
name = "Unknown"
images_encodings = list(faces_dict.values())
global dist
dist = 0
#面部编码匹配人脸
match_result = face_recognition.compare_faces(images_encodings,face_encoding,tolerance=0.45)
for idx, match in enumerate(match_result):
if match:
image_encoding = images_encodings[idx]
dist = face_recognition.face_distance([image_encoding],face_encoding)[0]
dist = (1.0 - dist) * 100
print("name : {} face_recogniton dist value : {}".format(list(faces_dict.keys())[idx],dist))
如果dist>70 上传图片 & 上报识别结果
global jpeg
if dist > 70.0 :
try:
imgID = time.strftime("%Y%m%d%H%M%S")+str(random.randint(0,99))
#将置信度>70的视频帧转成图片上传S3指定桶'greengrass-detect-realtime-video'
s3Resp =clientS3.put_object(Bucket='greengrass-detect-realtime-video', Key=imgID+'.jpg', Body=jpeg.tobytes(), ACL="private")
print(s3Resp)
#上报人脸识别结果和识别图片名称
msg = '{{"FaceName":"{0}","dist":"{1}","imageName":"{2}","time":"{3}","desc":"{4}"}}'.format(str(name),str(dist),(imgID+".jpg"),time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()),"Uploaded Rapsberry Pi face detection image.")
#上报IoT Core 人脸名称,置信度,S3图片名称,时间戳等
client.publish(topic=iotTopic, payload=msg)
local_face_detection.py的完整源代码可以从这里获取。
在local_face_detection函数console界面选择“操作”,选择“发布新版本”,这里版本号是37。选择“创建别名”,输入名称“prod”,选择版本37
AWS Lambda添加Greengrass组
将函数local_face_detection添加Greengrass组“raspberrypiGroup”
选择上面别名”prod”
添加完成后,选择“编辑”:
- 运行身份:使用默认组 (当前: ggc_user/ggc_group)
- 容器化:Greengrass 容器 (始终)
- 内存限制:512MB , 超时:默认3秒
- AWS Lambda 生命周期:使此函数长时间生存,保持其无限期运行
- 对 /sys 目录的只读访问权限:启用
- 输入负载数据类型:json
添加资源camera
- 资源名称:Camera
- 资源类型:设备
- 设备路径:/dev/video0
- 组拥有文件访问权限:自动添加拥有资源的 Linux 组的操作系统组权限
- 为此 AWS Lambda 函数选择权限:读写访问权限
添加资源tmp
- 资源名称:tmp
- 资源类型:卷
- 源路径:/tmp 目的地路径: /tmp
- 组拥有文件访问权限:自动添加拥有资源的 Linux 组的操作系统组权限
- 为此 AWS Lambda 函数选择权限:读写访问权限
创建Greengrass组角色
Identity and Access Management (IAM)控制台创建一个角色GreengrassRole,将策略AWSGreengrassResourceAccessRolePolicy添加到角色
选择“添加内联策略”
输入下面的策略
{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": "*"
}
}
输入策略名称GreengrassSecretValuePolicy
创建订阅
创建订阅界面,选择源为“local_face_detection”,选择目标“IoT Cloud”
主题筛选条件输入“#”
测试
测试界面选择订阅主题“face_recognition”
设备检测到人脸,从实时视频流获取frame生成图片推到S3。
从S3存储桶中可以看到设备传上来的图片
小结,本方案展示了树莓派系统上运行 AWS IoT Greengrass 实现人脸检测功能。很多客户希望通过边缘侧实现AI/ML 场景可以结合Greengrass+Lambda来实现。权限部分可以选择AWS Secret Manager来管理密钥,AWS IoT Greengrass角色添加AWS Secret Manager 访问权限后,设备可按需获取密钥从而避免本地硬编码。
本篇作者