M5Stack UnitV2 -检查是否戴口罩
用M5Stack UnitV2识别办公室/宿舍出入人员是否有正常佩戴口罩,发现异常立即报警,并将每日的数据上传到PC端,生成报表。
标签
嵌入式系统
网络与通信
lxb
更新2021-07-23
701

硬件介绍:
      本次项目使用的是M5Stack UnitV2 - AI摄像头 (SSD202D)。它是M5Stack推出的一款高效率的AI识别模块,采用Sigmstar SSD202D(集成双核Cortex-A7 1.2Ghz处理器)控制核心,集成128MB-DDR3内存,512MB NAND Flash, 1080P摄像头。还有麦克风、TF卡插槽、UART端口、USB-C、WiFi 2.4GHz等外设。在次硬件上跑着一个linux操作系统,支持Python。本次任务使用Python来实现。

原理框图如下:FnDuUdR_vNFGOp5uBdIQVxf0yEi2

任务介绍:
      任务4:识别办公室/宿舍出入人员是否有正常佩戴口罩,发现异常立即报警,并将每日的数据上传到PC端,生成报表。

      任务流程如下:AI摄像头端,通过收集数据,训练模型,使用模型判断人员是否戴口罩,将是否戴口罩的信息上传至PC同时如果未佩戴口罩则亮起红灯警示。

流程图如下:

Fs0lg61PV1RbiR73A50sizZehDRE

方案思路:

      1.拍摄照片用作生成模型的基础数据,模型生成:可以使用 V-Training (m5stack.com) 提供的在线模型生成,也可以离线生成。

      这里拍摄照片是按照老师直播中那样一个一个点那个拍摄再保存的按钮,挺麻烦的,后来意识到其实可以用它内部的micropython识别到人后直接拍照再存到SD卡中,会方便快捷的多。

FmxeHfLQ-CW23YWgv7jbfF2X68e5

若损失曲线显示一张下降的曲线,则说明模型训练成功。

FjCwQYgIrputPT9pY1OP_DmwdoLA

2.生成后的压缩包可以直接上传到 UnitV2 中进行使用。

FjwMxSrv_GQsjaG93Ea8OHCpZzMi

3.使用模型来识别口罩佩戴情况,根据佩戴情况决定是否亮灯警示(录屏所以放了图片),将情况反馈至PC端(使用python爬虫获取json格式的数据),PC端根据反馈的数据生成相应的弹窗提示,并将数据记录至txt文件中。

戴口罩时:

FrLISphsH0be77cWBt-ph7gTbFRt

没有戴口罩时:

Ft-_afjFks0nP2rC39KoqoOeYY7T

 

设备端:

from json.decoder import JSONDecodeError
import subprocess
import json
import time
import sys
import base64
import requests


def control_white_led(value):
    open('/sys/class/gpio/export', 'w').write('0')  # Export the GPIO0
    open('/sys/class/gpio/gpio0/direction', 'w').write('out')  # Set the direction of the GPIO
    open('/sys/class/gpio/gpio0/value', 'w').write(str(value))  # Set the calute, '1' or '0'


def control_red_led(value):
    open('/sys/class/gpio/export', 'w').write('1')  # Export the GPIO0
    open('/sys/class/gpio/gpio1/direction', 'w').write('out')  # Set the direction of the GPIO
    open('/sys/class/gpio/gpio1/value', 'w').write(str(value))  # Set the calute, '1' or '0'


reconizer = subprocess.Popen(
    ['/home/m5stack/payload/bin/object_recognition', '/home/m5stack/payload/uploads/models/v2model_0b5431c82a9b1d32'],
    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

reconizer.stdin.write("_{\"stream\":0}\r\n".encode('utf-8'))  # 不需要获得图片
reconizer.stdin.flush()


def ismask(objdict):
    try:
        if objdict.get('num') == 2:
            obj0 = objdict["obj"][0]
            obj1 = objdict["obj"][1]
            if obj0["type"] == "person":
                res = obj1
            else:
                res = obj0
            for objnode in res.get('obj'):
                if objnode.get('prob') and objnode.get('type') :
                    prob = objnode.get('prob')
                    if objnode.get('type') == 'Mask' and prob > 0.6:
                        control_white_led(0)
                        control_red_led(1)
                    else:
                        control_white_led(1)
                        control_red_led(0)
    except json.decoder.JSONDecodeError:
        print("数据异常!")
        

while (True):
    objdict = json.loads(reconizer.stdout.readline().decode('utf-8'))
    #print(objdict)
    if objdict.get('num') and objdict.get('obj') and objdict["running"] == "Object Recognition":
        ismask(objdict)

 

桌面的提醒:

无人时:

Fu-d7cCe0D-T7QuX8ZlY1i0pjqbk

 

未佩戴口罩时:

FtjQzX7ICx35FPCQOlz4VGOtZDcy

 

佩戴口罩时:

Fs3sxWr212t8ZbHFSwPPP8IJRn0O

 

txt文本记录:

FndPIEBkxYeDUV7RhR_P76qd_43x

 

PC端:

import ctypes
import json
import threading
import time
import urllib.request
import requests
from tkinter import *

ctypes.windll.shcore.SetProcessDpiAwareness(1)

remote = "http://10.254.239.1"
lastTime = 0

status = "NoBody"
count = 0

stoptime = 3000
width = 380
height = 300
screenwidth = 1920
screenheight = 1080
alignstr = '%dx%d+%d+%d' % (width, height, (screenwidth - width) / 2, (screenheight - height) / 2)


def writeDict2File(fileName,dict_data):
    js = json.dumps(dict_data)
    file = open(fileName, 'a', encoding='utf-8')
    file.write('\n'+js)
    file.close()


def analyze2(s):
    global lastTime, status, count
    try:  # 处理解析异常的问题
        dict_data = json.loads(s)  # 字符串转换成字典
        print(dict_data)
        if dict_data["running"] == "Object Recognition" :
            if dict_data["num"] == 1 and dict_data["obj"][0]["type"] == "person":
                status = "NoMask"
                dict_target = {"date": "", "type": ""}
                date = time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())
                dict_target["date"] = date
                dict_target["type"] = "NoMask"
                writeDict2File('daily_mask_wearing_data.txt', dict_target)
                print("NoMask_1")
                lastTime = time.time()
            elif dict_data["num"] == 1 and dict_data["obj"][0]["type"] != "person":
                status = "NoBody"
                dict_target = {"date": "", "type": ""}
                date = time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())
                dict_target["date"] = date
                dict_target["type"] = "NoBody"
                writeDict2File('daily_mask_wearing_data.txt', dict_target)
                print("NoBody")
                lastTime = time.time()
            elif dict_data["num"] == 2 :
                obj0 = dict_data["obj"][0]
                obj1 = dict_data["obj"][1]
                if obj0["type"] == "person":
                    res = obj1
                else:
                    res = obj0
                if res["prob"] > 0.6:  # 概率值为60%以上才认为识别到口罩
                    status = "Mask"
                    dict_target = {"date": "", "type": ""}
                    date = time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())
                    dict_target["date"] = date
                    if res["type"] == "NoMask":
                        status = "NoMask"
                        print("NoMask")
                        dict_target["type"] = "NoMask"
                        print("NoMask_2")
                        writeDict2File('daily_mask_wearing_data.txt', dict_target)
                        lastTime = time.time()
                    elif res["type"] == "Mask":
                        print("Mask")
                        status = "Mask"
                        dict_target["type"] = "Mask"
                        writeDict2File('daily_mask_wearing_data.txt', dict_target)
                        print("Mask")
                        lastTime = time.time()
    except json.decoder.JSONDecodeError:
        print("数据异常!")


def timecalc():
    global status, count
    while True:
        if (time.time() - lastTime > 2):
            if status != "NoBody":
                status = "NoBody"
                count = 0


# 流读取
def stream(url):
    bytes = b''
    with urllib.request.urlopen(url, data=b'') as f:
        while True:
            bytes += f.read(100)
            str = bytes.decode('utf-8')
            strobj = str.split('|')
            if len(strobj) > 1:
                for i in range(len(strobj)-1):
                    analyze2(strobj[i])
            try:
                json.loads(strobj[-1])
                analyze2(strobj[-1])
                bytes = b''
            except:
                bytes = strobj[-1].encode('utf-8')


def res(url, data=None, headers=None, json=True):
    res = requests.post(url, json=data, headers=headers)
    return res.json() if json else res.text


def switchFunction():
    return res(remote + "/func", {"type_id": "1", "type_name": "object_recognition", "args": ["v2model_0b5431c82a9b1d32"]})


def getResult():
    return stream(remote + "/func/result")


def tixing():
    global lastTime, status, count
    myWindow = Tk()
    myWindow.title('当前口罩佩戴状态')
    myWindow.geometry(alignstr)
    myWindow.resizable(width=False, height=True)
    if status == "NoBody" :
        neirong = "暂未识别到人"
    elif status == "NoMask" :
        neirong = "请尽快佩戴口罩"
    else:
        neirong = "Good Job!"
    Message(myWindow, text=neirong, padx=20, pady=20).pack()
    myWindow.after(stoptime, myWindow.destroy)
    myWindow.mainloop()


if __name__ == '__main__':
    switchFunction()
    print("Ready")
    while(True):
        # 定时器
        th1 = threading.Thread(target=timecalc)
        th1.start()
        # 数据获取
        th2 = threading.Thread(target=getResult)
        th2.start()
        th3 = threading.Thread(target=tixing)
        th3.start()

项目心得:

(1)对于json数据格式有了更为明确地掌握

(2)学习了python爬虫获取网页的json数据

(3)在AI模型的训练上还要选取更多更为优质的训练集;

(4)应该适当选取阈值,不然结果会很不理想

 

困难:

(1)设备与PC的通信

(2)如何免密ssh远程登录

(3)如何将数据上传至云端

(4)提高识别的准确度

 

未来展望:

(1)更智能化,让拍照到保存图片至SD卡再到上传至V-training平台实现一体化

(2)多多学习ssh远程登录

(3)学习如何利用python实现更为人性化的提醒措施

(4)学习如何得到AI模型

 

 

 

附件下载
tasks.json
v2model_0b5431c82a9b1d32.tar
main.py
团队介绍
哈尔滨工业大学电信学院学生
团队成员
lxb
哈尔滨工业大学电信学院信息对抗专业
评论
0 / 100
查看更多
目录
硬禾服务号
关注最新动态
0512-67862536
info@eetree.cn
江苏省苏州市苏州工业园区新平街388号腾飞创新园A2幢815室
苏州硬禾信息科技有限公司
Copyright © 2023 苏州硬禾信息科技有限公司 All Rights Reserved 苏ICP备19040198号