差别

这里会显示出您选择的修订版和当前版本之间的差别。

到此差别页面的链接

两侧同时换到之前的修订记录 前一修订版
后一修订版
前一修订版
pulse_oximeter_sensor [2023/07/27 10:08]
wangzihao [5. 参考案例]
pulse_oximeter_sensor [2023/08/08 16:06] (当前版本)
zili [5. 参考案例]
行 68: 行 68:
 ### 5. 参考案例 ### 5. 参考案例
  
-在Thonny使用Mircopython编写程序控制RP2040控制Max30102读取心率数据+**在Thonny使用Mircopython编写程序控制RP2040控制Max30102读取心率数据**
  
 ####​Max30102芯片介绍 ####​Max30102芯片介绍
行 81: 行 81:
  
 {{ :​rp2040-max30102.png?​900 |}} {{ :​rp2040-max30102.png?​900 |}}
 +
 +####​程序代码
 +
 +程序文件需要文件
 +
 +- _init_.py
 +
 +- circular_buffer.py
 +
 +- spo2cal.py
 +
 +- HR_SpO2.py
 +
 +前面两个文件可在github上面下载https://​github.com/​n-elia/​MAX30102-MicroPython-driver/​tree/​main/​max30102
 +
 +spo2cal.py程序如下
 +
 +<code verilog>
 +
 +    # -*-coding:​utf-8
 +
 +    # 25 samples per second (in algorithm.h)
 +    SAMPLE_FREQ = 25
 +    # taking moving average of 4 samples when calculating HR
 +    # in algorithm.h,​ "DONOT CHANGE"​ comment is attached
 +    MA_SIZE = 4
 +    # sampling frequency * 4 (in algorithm.h)
 +    BUFFER_SIZE = 100
 +
 +
 +    # this assumes ir_data and red_data as np.array
 +    def calc_hr_and_spo2(ir_data,​ red_data):
 +        """​
 +        By detecting ​ peaks of PPG cycle and corresponding AC/DC
 +        of red/​infra-red signal, the an_ratio for the SPO2 is computed.
 +        """​
 +        # get dc mean
 +        ir_mean = int(sum(ir_data) / len(ir_data))
 +
 +        # remove DC mean and inver signal
 +        # this lets peak detecter detect valley
 +        x = [ir_mean - x for x in ir_data]
 +
 +        # 4 point moving average
 +        # x is np.array with int values, so automatically casted to int
 +        for i in range(len(x) - MA_SIZE):
 +            x[i] = sum(x[i:i + MA_SIZE]) / MA_SIZE
 +
 +        # calculate threshold
 +        n_th = int(sum(x) / len(x))
 +        n_th = 30 if n_th < 30 else n_th  # min allowed
 +        n_th = 60 if n_th > 60 else n_th  # max allowed
 +
 +        ir_valley_locs,​ n_peaks = find_peaks(x,​ BUFFER_SIZE,​ n_th, 4, 15)
 +        # print(ir_valley_locs[:​n_peaks],​ ",",​ end=""​)
 +        peak_interval_sum = 0
 +        if n_peaks >= 2:
 +            for i in range(1, n_peaks):
 +                peak_interval_sum += (ir_valley_locs[i] - ir_valley_locs[i - 1])
 +            peak_interval_sum = int(peak_interval_sum / (n_peaks - 1))
 +            hr = int(SAMPLE_FREQ * 60 / peak_interval_sum)
 +            hr_valid = True
 +        else:
 +            hr = -999  # unable to calculate because # of peaks are too small
 +            hr_valid = False
 +
 +        # ---------spo2---------
 +
 +        # find precise min near ir_valley_locs (???)
 +        exact_ir_valley_locs_count = n_peaks
 +
 +        # find ir-red DC and ir-red AC for SPO2 calibration ratio
 +        # find AC/DC maximum of raw
 +
 +        # FIXME: needed??
 +        for i in range(exact_ir_valley_locs_count):​
 +            if ir_valley_locs[i] > BUFFER_SIZE:​
 +                spo2 = -999  # do not use SPO2 since valley loc is out of range
 +                spo2_valid = False
 +                return hr, hr_valid, spo2, spo2_valid
 +
 +        i_ratio_count = 0
 +        ratio = []
 +
 +        # find max between two valley locations
 +        # and use ratio between AC component of Ir and Red DC component of Ir and Red for SpO2
 +        red_dc_max_index = -1
 +        ir_dc_max_index = -1
 +        for k in range(exact_ir_valley_locs_count - 1):
 +            red_dc_max = -16777216
 +            ir_dc_max = -16777216
 +            if ir_valley_locs[k + 1] - ir_valley_locs[k] > 3:
 +                for i in range(ir_valley_locs[k],​ ir_valley_locs[k + 1]):
 +                    if ir_data[i] > ir_dc_max:
 +                        ir_dc_max = ir_data[i]
 +                        ir_dc_max_index = i
 +                    if red_data[i] > red_dc_max:
 +                        red_dc_max = red_data[i]
 +                        red_dc_max_index = i
 +
 +                red_ac = int((red_data[ir_valley_locs[k + 1]] - red_data[ir_valley_locs[k]]) * (red_dc_max_index - ir_valley_locs[k]))
 +                red_ac = red_data[ir_valley_locs[k]] + int(red_ac / (ir_valley_locs[k + 1] - ir_valley_locs[k]))
 +                red_ac = red_data[red_dc_max_index] - red_ac ​ # subtract linear DC components from raw
 +
 +                ir_ac = int((ir_data[ir_valley_locs[k + 1]] - ir_data[ir_valley_locs[k]]) * (ir_dc_max_index - ir_valley_locs[k]))
 +                ir_ac = ir_data[ir_valley_locs[k]] + int(ir_ac / (ir_valley_locs[k + 1] - ir_valley_locs[k]))
 +                ir_ac = ir_data[ir_dc_max_index] - ir_ac  # subtract linear DC components from raw
 +
 +                nume = red_ac * ir_dc_max
 +                denom = ir_ac * red_dc_max
 +                if (denom > 0 and i_ratio_count < 5) and nume != 0:
 +                    # original cpp implementation uses overflow intentionally.
 +                    # but at 64-bit OS, Pyhthon 3.X uses 64-bit int and nume*100/​denom does not trigger overflow
 +                    # so using bit operation ( &​0xffffffff ) is needed
 +                    ratio.append(int(((nume * 100) & 0xffffffff) / denom))
 +                    i_ratio_count += 1
 +
 +        # choose median value since PPG signal may vary from beat to beat
 +        ratio = sorted(ratio) ​ # sort to ascending order
 +        mid_index = int(i_ratio_count / 2)
 +
 +        ratio_ave = 0
 +        if mid_index > 1:
 +            ratio_ave = int((ratio[mid_index - 1] + ratio[mid_index]) / 2)
 +        else:
 +            if len(ratio) != 0:
 +                ratio_ave = ratio[mid_index]
 +
 +        # why 184?
 +        # print("​ratio average: ", ratio_ave)
 +        if ratio_ave > 2 and ratio_ave < 184:
 +            # -45.060 * ratioAverage * ratioAverage / 10000 + 30.354 * ratioAverage / 100 + 94.845
 +            spo2 = -45.060 * (ratio_ave ** 2) / 10000.0 + 30.054 * ratio_ave / 100.0 + 94.845
 +            spo2_valid = True
 +        else:
 +            spo2 = -999
 +            spo2_valid = False
 +
 +        return hr - 20, hr_valid, spo2, spo2_valid
 +
 +
 +    def find_peaks(x,​ size, min_height, min_dist, max_num):
 +        """​
 +        Find at most MAX_NUM peaks above MIN_HEIGHT separated by at least MIN_DISTANCE
 +        """​
 +        ir_valley_locs,​ n_peaks = find_peaks_above_min_height(x,​ size, min_height, max_num)
 +        ir_valley_locs,​ n_peaks = remove_close_peaks(n_peaks,​ ir_valley_locs,​ x, min_dist)
 +
 +        n_peaks = min([n_peaks,​ max_num])
 +
 +        return ir_valley_locs,​ n_peaks
 +
 +
 +    def find_peaks_above_min_height(x,​ size, min_height, max_num):
 +        """​
 +        Find all peaks above MIN_HEIGHT
 +        """​
 +
 +        i = 0
 +        n_peaks = 0
 +        ir_valley_locs = []  # [0 for i in range(max_num)]
 +        while i < size - 1:
 +            if x[i] > min_height and x[i] > x[i - 1]:  # find the left edge of potential peaks
 +                n_width = 1
 +                # original condition i+n_width < size may cause IndexError
 +                # so I changed the condition to i+n_width < size - 1
 +                while i + n_width < size - 1 and x[i] == x[i + n_width]: ​ # find flat peaks
 +                    n_width += 1
 +                if x[i] > x[i + n_width] and n_peaks < max_num: ​ # find the right edge of peaks
 +                    # ir_valley_locs[n_peaks] = i
 +                    ir_valley_locs.append(i)
 +                    n_peaks += 1  # original uses post increment
 +                    i += n_width + 1
 +                else:
 +                    i += n_width
 +            else:
 +                i += 1
 +
 +        return ir_valley_locs,​ n_peaks
 +
 +
 +    def remove_close_peaks(n_peaks,​ ir_valley_locs,​ x, min_dist):
 +        """​
 +        Remove peaks separated by less than MIN_DISTANCE
 +        """​
 +
 +        # should be equal to maxim_sort_indices_descend
 +        # order peaks from large to small
 +        # should ignore index:0
 +        sorted_indices = sorted(ir_valley_locs,​ key=lambda i: x[i])
 +        sorted_indices.reverse()
 +
 +        # this "​for"​ loop expression does not check finish condition
 +        # for i in range(-1, n_peaks):
 +        i = -1
 +        while i < n_peaks:
 +            old_n_peaks = n_peaks
 +            n_peaks = i + 1
 +            # this "​for"​ loop expression does not check finish condition
 +            # for j in (i + 1, old_n_peaks):​
 +            j = i + 1
 +            while j < old_n_peaks:​
 +                n_dist = (sorted_indices[j] - sorted_indices[i]) if i != -1 else (sorted_indices[j] + 1)  # lag-zero peak of autocorr is at index -1
 +                if n_dist > min_dist or n_dist < -1 * min_dist:
 +                    sorted_indices[n_peaks] = sorted_indices[j]
 +                    n_peaks += 1  # original uses post increment
 +                j += 1
 +            i += 1
 +
 +        sorted_indices[:​n_peaks] = sorted(sorted_indices[:​n_peaks])
 +
 +        return sorted_indices,​ n_peaks
 +
 +
 +    if __name__ == "​__main__":​
 +        hr, hrb, sp, spb = calc_hr_and_spo2([12853,​ 15573, 15580, 15586, 15587, 15567, 15520, 15480, 15464, 15460, 15462, 15466, 15473, 15479, 15485, 15490, 15495, 15503, 15512, 15518, 15521, 15521, 15518, 15517, 15522, 15527, 15536, 15547, 15558, 15568, 15577, 15587, 15594, 15604, 15610, 15616, 15620, 15624, 15625, 15615, 15576, 15531, 15508, 15500, 15502, 15509, 15516, 15523, 15528, 15533, 15538, 15547, 15556, 15564, 15564, 15560, 15556, 15556, 15559, 15564, 15570, 15579, 15588, 15599, 15610, 15619, 15628, 15635, 15642, 15649, 15655, 15662, 15669, 15672, 15661, 15621, 15571, 15546, 15537, 15538, 15545, 15553, 15560, 15565, 15570, 15577, 15585, 15593, 15600, 15601, 15597, 15592, 15591, 15594, 15600, 15608, 15617, 15626, 15633, 15640], [12258, 14318, 14322, 14324, 14326, 14317, 14299, 14284, 14280, 14279, 14280, 14283, 14285, 14288, 14292, 14294, 14297, 14299, 14302, 14304, 14305, 14305, 14304, 14304, 14306, 14308, 14311, 14316, 14321, 14325, 14329, 14333, 14329, 14329, 14332, 14335, 14336, 14338, 14338, 14333, 14315, 14295, 14286, 14283, 14285, 14288, 14292, 14295, 14297, 14298, 14301, 14305, 14309, 14312, 14312, 14310, 14308, 14308, 14309, 14312, 14315, 14318, 14322, 14327, 14332, 14336, 14341, 14344, 14347, 14350, 14351, 14354, 14357, 14359, 14353, 14335, 14313, 14304, 14300, 14302, 14305, 14309, 14312, 14314, 14316, 14319, 14323, 14326, 14329, 14329, 14326, 14325, 14324, 14326, 14328, 14332, 14336, 14341, 14345, 14349])
 +
 +        print("​hr detected:",​ hrb)
 +        print("​sp detected:",​ spb)
 +
 +        if (hrb == True and hr != -999):
 +            hr2 = int(hr)
 +            print("​Heart Rate : ", hr2)
 +        if (spb == True and sp != -999):
 +            sp2 = int(sp)
 +            print("​SPO2 ​      : ", sp2)
 +
 +  </​code>​
 +  ​
 +HR_SpO2.py程序如下:
 +
 +<code verilog>
 +
 +    from machine import SoftI2C, Pin, Timer
 +    from utime import ticks_diff, ticks_us
 +    from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM
 +    from spo2cal import calc_hr_and_spo2
 +
 +
 +    BEATS = 0  # 存储心率
 +    FINGER_FLAG = False  # 默认表示未检测到手指
 +    ​
 +    SPO2 = 0  # 存储血氧
 +    TEMPERATURE = 0  # 存储温度
 +
 +
 +    def display_info(t):​
 +        # 如果没有检测到手指,那么就不显示
 +        if FINGER_FLAG is False:
 +            return
 +            ​
 +        print('​Heart Rate: ', BEATS, " SpO2:",​ SPO2, " Temperture:",​ TEMPERATURE)
 +
 +
 +    def main():
 +        global BEATS, FINGER_FLAG,​ SPO2, TEMPERATURE ​ # 如果需要对全局变量修改,则需要global声明
 +        ​
 +        # 创建I2C对象(检测MAX30102)
 +        i2c = SoftI2C(sda=Pin(16),​ scl=Pin(17),​ freq=400000) ​ # Fast: 400kHz, slow: 100kHz
 +
 +        # 创建传感器对象
 +        sensor = MAX30102(i2c=i2c)
 +
 +        # 检测是否有传感器
 +        if sensor.i2c_address not in i2c.scan():
 +            print("​没有找到传感器"​)
 +            return
 +        elif not (sensor.check_part_id()):​
 +            # 检查传感器是否兼容
 +            print("​检测到的I2C设备不是MAX30102或者MAX30105"​)
 +            return
 +        else:
 +            print("​传感器已识别到"​)
 +
 +        # 配置
 +        sensor.setup_sensor()
 +        sensor.set_sample_rate(400)
 +        sensor.set_fifo_average(8)
 +        sensor.set_active_leds_amplitude(MAX30105_PULSE_AMP_MEDIUM)
 +
 +        t_start = ticks_us() ​ # Starting time of the acquisition
 +
 +        MAX_HISTORY = 32
 +        history = []
 +        beats_history = []
 +        beat = False
 +
 +        red_list = []
 +        ir_list = []
 +
 +        while True:
 +            sensor.check()
 +            if sensor.available():​
 +                # FIFO 先进先出,从队列中取数据。都是整形int
 +                red_reading = sensor.pop_red_from_storage()
 +                ir_reading = sensor.pop_ir_from_storage()
 +                ​
 +                if red_reading < 1000:
 +                    print('​No finger'​)
 +                    FINGER_FLAG = False  # 表示没有放手指
 +                    continue
 +                else:
 +                    FINGER_FLAG = True  # 表示手指已放
 +
 +                # 计算心率
 +                history.append(red_reading)
 +                ​
 +                # 为了防止列表过大,这里取列表的后32个元素
 +                history = history[-MAX_HISTORY:​]
 +                ​
 +                # 提取必要数据
 +                minima, maxima = min(history),​ max(history)
 +                threshold_on = (minima + maxima * 3) // 4   # 3/4
 +                threshold_off = (minima + maxima) // 2      # 1/2
 +                ​
 +                if not beat and red_reading > threshold_on:​
 +                    beat = True                    ​
 +                    t_us = ticks_diff(ticks_us(),​ t_start)
 +                    t_s = t_us/​1000000
 +                    f = 1/t_s
 +                    bpm = f * 60
 +                    if bpm < 500:
 +                        t_start = ticks_us()
 +                        beats_history.append(bpm) ​                   ​
 +                        beats_history = beats_history[-MAX_HISTORY:​] ​  # 只保留最大30个元素数据
 +                        BEATS = round(sum(beats_history)/​len(beats_history),​ 2)  # 四舍五入
 +                if beat and red_reading < threshold_off:​
 +                    beat = False
 +                    ​
 +                # 计算血氧
 +                red_list.append(red_reading)
 +                ir_list.append(ir_reading)
 +                # 最多 只保留最新的100个
 +                red_list = red_list[-100:​]
 +                ir_list = ir_list[-100:​]
 +                # 计算血氧值
 +                if len(red_list) == 100 and len(ir_list) == 100:
 +                    hr, hrb, sp, spb = calc_hr_and_spo2(red_list,​ ir_list)
 +                    if hrb is True and spb is True:
 +                        if sp != -999:
 +                            SPO2 = int(sp)
 +
 +                # 计算温度
 +                TEMPERATURE = sensor.read_temperature()
 +
 +
 +    if __name__ == '​__main__':​
 +
 +        tim = Timer(period=1000,​ mode=Timer.PERIODIC,​ callback=display_info)
 +
 +        main()
 +
 +  </​code>​
 +
 +
 +####​运行效果
 +
 +{{ :​spo2_test.png?​700 |}}