基于M5Stack UnitV2 的口罩监控系统
使用M5Stack UnitV2建立用Python编辑口罩检测监控识别并且上传数据到云端
标签
M5Stack
Unit V2
Sholder
更新2021-07-26
828

硬件介绍
    本次项目使用的是M5Stack UnitV2 - AI摄像头 (SSD202D)。这是一个使用Sigmstar SSD202D做主控芯片,带Neno和FPU的ARM,双核 Cortex-A7 1.2Ghz 处理器,嵌入式128MB DDR3,512MB NAND Flash;使用GC2145 1080P Colored Sensor摄像头,还有麦克风、TF卡插槽、UART端口、USB-C、WiFi 2.4GHz等外设。在次硬件上跑着一个linux操作系统,支持Python。FnDuUdR_vNFGOp5uBdIQVxf0yEi2

规格参数

Sigmstar SSD202D Dual Cortex-A7 1.2Ghz Processor
Flash 512MB NAND
RAM 128MB-DDR3
Camera GC2145 1080P Colored Sensor
镜头 FOV 68° , DOF= 60cm- ∞
输入电压 5V @ 500mA
硬件外设 TypeC x1, UART x1, TFCard x1, Button x1, Microphone x1, Fan x1
指示灯 红, 白
WiFi 150Mbps 2.4GHz 802.11 b/g/n
工作温度 32°F to 104°F ( 0°C to 40°C )
净重 18g
毛重 62g
产品尺寸 48*18.5*24mm
包装尺寸 157*38*38mm
外壳材质 Plastic ( PC )


unitv2_06.webp


unitv2_07.webp

    任务介绍:
    本次活动我选择了口罩这个任务,在检测摄像头监控范围内识别人脸并判断是否佩戴口罩,如果没带口罩就报警,报警的办法有多种,选择适当合理的就行

    任务实现:

1、在内嵌网址中的照相功能直接照相就行,然后下载下来

FmSzItfDja8rnJMzpi61CaQinckn   

    2、

最复杂的训练部分,本次活动公司已经有了官方的训练网址 http://v-training.m5stack.com/build/index.html

在这里可以进行傻瓜式操作

然后训练好了界面如图,直接下载下来就可了

FqZNNFdVEX6bqNp1XfJK43kHhC3U

Fu0S6Nwfot2W8LN5OFTiKZhAPNX8


    3、然后下载回来就可以进行Python部分的事情了

Fp9xM3sthvUimg_hujw_QWpMoSv7

采用插件自己打开服务器然后输入跳转至目标网址

'''这个需要插件,自己下载相应的'''
driver = webdriver.Chrome('C:\Program Files\Google\Chrome\Application\chrome.exe')
# get 方法 打开指定网址
driver.get('http://10.254.239.1/')
html = driver.page_source
time.sleep(10)
driver.find_element_by_id("object_recognition")  
driver.find_element_by_id("object_recognition").click()  
time.sleep(10)
driver.find_element_by_id("yolo_run")  
driver.find_element_by_id("yolo_run").click()  
time.sleep(10)

然后上传到云端

productKey = '自己找,阿里云有'
deviceName = '自己找,阿里云有'
deviceSecret = '自己找,阿里云有'
# MQTT - 合成connect报文中使用的 ClientID、Username、Password
mqttClientId = deviceName + '|securemode=3,signmethod=hmacsha1|'
mqttUsername = deviceName + '&' + productKey
content = 'clientId' + deviceName + 'deviceName' + deviceName + 'productKey' + productKey
mqttPassword = hmac.new(deviceSecret.encode(), content.encode(), sha1).hexdigest()
# 接入的服务器地址
regionId = '阿里云左上角那个填这里'
# MQTT 接入点域名
brokerUrl = productKey + '.iot-as-mqtt.' + regionId + '.aliyuncs.com'
# Topic,post,客户端向服务器上报消息
topic_post = '/sys/' + productKey + '/' + deviceName + '/thing/event/property/post'
# Topic,set,服务器向客户端下发消息
topic_set = '/sys/' + productKey + '/' + deviceName + '/thing/service/property/set'
# 物模型名称的前缀(去除后缀的数字)
modelName = 'mark_person_'
# json合成上报开关状态的报文
def json_switch_set(num, status):
    switch_info = {}
    switch_data = json.loads(json.dumps(switch_info))
    switch_data['method'] = '/thing/event/property/post'
    switch_data['id'] = random.randint(0000, 9999)  
    switch_status = {modelName + num: status}
    switch_data['params'] = switch_status
    return json.dumps(switch_data, ensure_ascii=False)
# 戴口罩初始值0
mark_person: int = 0
nomark_person = 0
# 建立mqtt连接对象
client = mqtt.Client(mqttClientId, protocol=mqtt.MQTTv311, clean_session=True)
def on_log(client, userdata, level, buf):
    if level == MQTT_LOG_INFO:
        head = 'INFO'
    elif level == MQTT_LOG_NOTICE:
        head = 'NOTICE'
    elif level == MQTT_LOG_WARNING:
        head = 'WARN'
    elif level == MQTT_LOG_ERR:
        head = 'ERR'
    elif level == MQTT_LOG_DEBUG:
        head = 'DEBUG'
    else:
        head = level
    print('%s: %s' % (head, buf))
# MQTT成功连接到服务器的回调处理函数
def on_connect(client, userdata, flags, rc):
    print('Connected with result code ' + str(rc))
    # 与MQTT服务器连接成功,之后订阅主题
    client.subscribe(topic_post, qos=0)
    client.subscribe(topic_set, qos=0)
    # 向服务器发布测试消息
    client.publish(topic_post, payload='test msg', qos=0)
# MQTT接收到服务器消息的回调处理函数
def on_message(client, userdata, msg):
    print('recv:', msg.topic + ' ' + str(msg.payload))
def on_disconnect(client, userdata, rc):
    if rc != 0:
        print('Unexpected disconnection %s' % rc)
def mqtt_connect_aliyun_iot_platform():
    client.on_log = on_log
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.username_pw_set(mqttUsername, mqttPassword)
    print('clientId:', mqttClientId)
    print('userName:', mqttUsername)
    print('password:', mqttPassword)
    print('brokerUrl:', brokerUrl)
    # ssl设置,并且port=8883 client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
    # tls_version=ssl.PROTOCOL_TLS, ciphers=None)
    try:
        client.connect(brokerUrl, 1883, 60)
    except:
        print('阿里云物联网平台MQTT服务器连接错误,请检查设备证书三元组、及接入点的域名!')
    client.loop_forever()

 然后进行判断步骤,判断完直接报警

def publish_loop():
    global mark_person, nomark_person
    mark_person = 0
    while 1:
        element_keyword = driver.find_element_by_id('func-result-pre')
        json_date = element_keyword.text
        json_jiexi = json.loads(json_date)
        try:
            person_num: object = json_jiexi['num']
            assert isinstance(person_num, object)
            print(person_num)
            if person_num == 1:
                nomark_person += 1
                print('1000')
                ##这里选择你喜欢的报警模式
            elif person_num == 2:
                mark_person += 1
                print('1001')
                ##这里选择你喜欢的报警模式
            time.sleep(5)
            switchPost = json_switch_set('1', mark_person)
            client.publish(topic_post, payload=switchPost, qos=0)
        except:
            time.sleep(1)
if __name__ == '__main__':
    # 建立线程t1:mqtt连接阿里云物联网平台
    # 建立线程t2:定时向阿里云发布消息
    t1: Thread = threading.Thread(target=mqtt_connect_aliyun_iot_platform, )
    t2 = threading.Thread(target=publish_loop, )
    t1.start()
    t2.start()

因为我没有多个摄像头,后来表达报警方式改成了钉钉机器人报警,设置的是关键词加密

我就单po出来了

import requests
import json

def getDingMes():

    baseUrl = "https://oapi.dingtalk.com/robot/send?access_token=你自己的"

    # please set charset= utf-8
    HEADERS = {
        "Content-Type": "application/json ;charset=utf-8"
    }

# 这里的message是你想要推送的文字消息
    message = "有人没带口罩"
    stringBody ={
        "msgtype": "text",
        "text": {"content": message},
        "at": {
            "atMobiles": [""],
               "isAtAll":1    #@所有人  时为true,上面的atMobiles就失效了
        }
 }
    MessageBody = json.dumps(stringBody)
    result = requests.post(url=baseUrl, data=MessageBody, headers=HEADERS)
    print(result.text)

if __name__ == '__main__':
    getDingMes()

  下面是网址识别时的界面

 

Fh4OBmGkHPLbQHWp52rWvD5qH78J

然后下面是云端接受到的数据

Fo_yg0bhKY_FsVvbJcYbsKQJRblK

报警

FmF_Qe3T2ABgP_O6MmfQ8yMpabDK

非常nice

    活动感想:

非常感谢电子森林举办的这次活动,能够有机会接触到这样的智能摄像头。

也是我第一次接触Python语言的事情

不过就是这个摄像头发热太严重了,建议改一下散热方案

附件下载
unitv2.py
新建文件夹 (2).rar
训练的模型
团队介绍
评论
0 / 100
查看更多
目录
硬禾服务号
关注最新动态
0512-67862536
info@eetree.cn
江苏省苏州市苏州工业园区新平街388号腾飞创新园A2幢815室
苏州硬禾信息科技有限公司
Copyright © 2023 苏州硬禾信息科技有限公司 All Rights Reserved 苏ICP备19040198号