差别
这里会显示出您选择的修订版和当前版本之间的差别。
两侧同时换到之前的修订记录 前一修订版 后一修订版 | 前一修订版 | ||
pulse_oximeter_sensor [2023/07/27 10:28] wangzihao [5. 参考案例] |
pulse_oximeter_sensor [2023/08/08 16:06] (当前版本) zili [5. 参考案例] |
||
---|---|---|---|
行 68: | 行 68: | ||
### 5. 参考案例 | ### 5. 参考案例 | ||
- | 在Thonny使用Mircopython编写程序控制RP2040控制Max30102读取心率数据 | + | **在Thonny使用Mircopython编写程序控制RP2040控制Max30102读取心率数据** |
####Max30102芯片介绍 | ####Max30102芯片介绍 | ||
行 83: | 行 83: | ||
####程序代码 | ####程序代码 | ||
+ | |||
+ | 程序文件需要文件 | ||
+ | |||
+ | - _init_.py | ||
+ | |||
+ | - circular_buffer.py | ||
+ | |||
+ | - spo2cal.py | ||
+ | |||
+ | - HR_SpO2.py | ||
+ | |||
+ | 前面两个文件可在github上面下载https://github.com/n-elia/MAX30102-MicroPython-driver/tree/main/max30102 | ||
+ | |||
+ | spo2cal.py程序如下 | ||
+ | |||
+ | <code verilog> | ||
+ | |||
+ | # -*-coding:utf-8 | ||
+ | |||
+ | # 25 samples per second (in algorithm.h) | ||
+ | SAMPLE_FREQ = 25 | ||
+ | # taking moving average of 4 samples when calculating HR | ||
+ | # in algorithm.h, "DONOT CHANGE" comment is attached | ||
+ | MA_SIZE = 4 | ||
+ | # sampling frequency * 4 (in algorithm.h) | ||
+ | BUFFER_SIZE = 100 | ||
+ | |||
+ | |||
+ | # this assumes ir_data and red_data as np.array | ||
+ | def calc_hr_and_spo2(ir_data, red_data): | ||
+ | """ | ||
+ | By detecting peaks of PPG cycle and corresponding AC/DC | ||
+ | of red/infra-red signal, the an_ratio for the SPO2 is computed. | ||
+ | """ | ||
+ | # get dc mean | ||
+ | ir_mean = int(sum(ir_data) / len(ir_data)) | ||
+ | |||
+ | # remove DC mean and inver signal | ||
+ | # this lets peak detecter detect valley | ||
+ | x = [ir_mean - x for x in ir_data] | ||
+ | |||
+ | # 4 point moving average | ||
+ | # x is np.array with int values, so automatically casted to int | ||
+ | for i in range(len(x) - MA_SIZE): | ||
+ | x[i] = sum(x[i:i + MA_SIZE]) / MA_SIZE | ||
+ | |||
+ | # calculate threshold | ||
+ | n_th = int(sum(x) / len(x)) | ||
+ | n_th = 30 if n_th < 30 else n_th # min allowed | ||
+ | n_th = 60 if n_th > 60 else n_th # max allowed | ||
+ | |||
+ | ir_valley_locs, n_peaks = find_peaks(x, BUFFER_SIZE, n_th, 4, 15) | ||
+ | # print(ir_valley_locs[:n_peaks], ",", end="") | ||
+ | peak_interval_sum = 0 | ||
+ | if n_peaks >= 2: | ||
+ | for i in range(1, n_peaks): | ||
+ | peak_interval_sum += (ir_valley_locs[i] - ir_valley_locs[i - 1]) | ||
+ | peak_interval_sum = int(peak_interval_sum / (n_peaks - 1)) | ||
+ | hr = int(SAMPLE_FREQ * 60 / peak_interval_sum) | ||
+ | hr_valid = True | ||
+ | else: | ||
+ | hr = -999 # unable to calculate because # of peaks are too small | ||
+ | hr_valid = False | ||
+ | |||
+ | # ---------spo2--------- | ||
+ | |||
+ | # find precise min near ir_valley_locs (???) | ||
+ | exact_ir_valley_locs_count = n_peaks | ||
+ | |||
+ | # find ir-red DC and ir-red AC for SPO2 calibration ratio | ||
+ | # find AC/DC maximum of raw | ||
+ | |||
+ | # FIXME: needed?? | ||
+ | for i in range(exact_ir_valley_locs_count): | ||
+ | if ir_valley_locs[i] > BUFFER_SIZE: | ||
+ | spo2 = -999 # do not use SPO2 since valley loc is out of range | ||
+ | spo2_valid = False | ||
+ | return hr, hr_valid, spo2, spo2_valid | ||
+ | |||
+ | i_ratio_count = 0 | ||
+ | ratio = [] | ||
+ | |||
+ | # find max between two valley locations | ||
+ | # and use ratio between AC component of Ir and Red DC component of Ir and Red for SpO2 | ||
+ | red_dc_max_index = -1 | ||
+ | ir_dc_max_index = -1 | ||
+ | for k in range(exact_ir_valley_locs_count - 1): | ||
+ | red_dc_max = -16777216 | ||
+ | ir_dc_max = -16777216 | ||
+ | if ir_valley_locs[k + 1] - ir_valley_locs[k] > 3: | ||
+ | for i in range(ir_valley_locs[k], ir_valley_locs[k + 1]): | ||
+ | if ir_data[i] > ir_dc_max: | ||
+ | ir_dc_max = ir_data[i] | ||
+ | ir_dc_max_index = i | ||
+ | if red_data[i] > red_dc_max: | ||
+ | red_dc_max = red_data[i] | ||
+ | red_dc_max_index = i | ||
+ | |||
+ | red_ac = int((red_data[ir_valley_locs[k + 1]] - red_data[ir_valley_locs[k]]) * (red_dc_max_index - ir_valley_locs[k])) | ||
+ | red_ac = red_data[ir_valley_locs[k]] + int(red_ac / (ir_valley_locs[k + 1] - ir_valley_locs[k])) | ||
+ | red_ac = red_data[red_dc_max_index] - red_ac # subtract linear DC components from raw | ||
+ | |||
+ | ir_ac = int((ir_data[ir_valley_locs[k + 1]] - ir_data[ir_valley_locs[k]]) * (ir_dc_max_index - ir_valley_locs[k])) | ||
+ | ir_ac = ir_data[ir_valley_locs[k]] + int(ir_ac / (ir_valley_locs[k + 1] - ir_valley_locs[k])) | ||
+ | ir_ac = ir_data[ir_dc_max_index] - ir_ac # subtract linear DC components from raw | ||
+ | |||
+ | nume = red_ac * ir_dc_max | ||
+ | denom = ir_ac * red_dc_max | ||
+ | if (denom > 0 and i_ratio_count < 5) and nume != 0: | ||
+ | # original cpp implementation uses overflow intentionally. | ||
+ | # but at 64-bit OS, Pyhthon 3.X uses 64-bit int and nume*100/denom does not trigger overflow | ||
+ | # so using bit operation ( &0xffffffff ) is needed | ||
+ | ratio.append(int(((nume * 100) & 0xffffffff) / denom)) | ||
+ | i_ratio_count += 1 | ||
+ | |||
+ | # choose median value since PPG signal may vary from beat to beat | ||
+ | ratio = sorted(ratio) # sort to ascending order | ||
+ | mid_index = int(i_ratio_count / 2) | ||
+ | |||
+ | ratio_ave = 0 | ||
+ | if mid_index > 1: | ||
+ | ratio_ave = int((ratio[mid_index - 1] + ratio[mid_index]) / 2) | ||
+ | else: | ||
+ | if len(ratio) != 0: | ||
+ | ratio_ave = ratio[mid_index] | ||
+ | |||
+ | # why 184? | ||
+ | # print("ratio average: ", ratio_ave) | ||
+ | if ratio_ave > 2 and ratio_ave < 184: | ||
+ | # -45.060 * ratioAverage * ratioAverage / 10000 + 30.354 * ratioAverage / 100 + 94.845 | ||
+ | spo2 = -45.060 * (ratio_ave ** 2) / 10000.0 + 30.054 * ratio_ave / 100.0 + 94.845 | ||
+ | spo2_valid = True | ||
+ | else: | ||
+ | spo2 = -999 | ||
+ | spo2_valid = False | ||
+ | |||
+ | return hr - 20, hr_valid, spo2, spo2_valid | ||
+ | |||
+ | |||
+ | def find_peaks(x, size, min_height, min_dist, max_num): | ||
+ | """ | ||
+ | Find at most MAX_NUM peaks above MIN_HEIGHT separated by at least MIN_DISTANCE | ||
+ | """ | ||
+ | ir_valley_locs, n_peaks = find_peaks_above_min_height(x, size, min_height, max_num) | ||
+ | ir_valley_locs, n_peaks = remove_close_peaks(n_peaks, ir_valley_locs, x, min_dist) | ||
+ | |||
+ | n_peaks = min([n_peaks, max_num]) | ||
+ | |||
+ | return ir_valley_locs, n_peaks | ||
+ | |||
+ | |||
+ | def find_peaks_above_min_height(x, size, min_height, max_num): | ||
+ | """ | ||
+ | Find all peaks above MIN_HEIGHT | ||
+ | """ | ||
+ | |||
+ | i = 0 | ||
+ | n_peaks = 0 | ||
+ | ir_valley_locs = [] # [0 for i in range(max_num)] | ||
+ | while i < size - 1: | ||
+ | if x[i] > min_height and x[i] > x[i - 1]: # find the left edge of potential peaks | ||
+ | n_width = 1 | ||
+ | # original condition i+n_width < size may cause IndexError | ||
+ | # so I changed the condition to i+n_width < size - 1 | ||
+ | while i + n_width < size - 1 and x[i] == x[i + n_width]: # find flat peaks | ||
+ | n_width += 1 | ||
+ | if x[i] > x[i + n_width] and n_peaks < max_num: # find the right edge of peaks | ||
+ | # ir_valley_locs[n_peaks] = i | ||
+ | ir_valley_locs.append(i) | ||
+ | n_peaks += 1 # original uses post increment | ||
+ | i += n_width + 1 | ||
+ | else: | ||
+ | i += n_width | ||
+ | else: | ||
+ | i += 1 | ||
+ | |||
+ | return ir_valley_locs, n_peaks | ||
+ | |||
+ | |||
+ | def remove_close_peaks(n_peaks, ir_valley_locs, x, min_dist): | ||
+ | """ | ||
+ | Remove peaks separated by less than MIN_DISTANCE | ||
+ | """ | ||
+ | |||
+ | # should be equal to maxim_sort_indices_descend | ||
+ | # order peaks from large to small | ||
+ | # should ignore index:0 | ||
+ | sorted_indices = sorted(ir_valley_locs, key=lambda i: x[i]) | ||
+ | sorted_indices.reverse() | ||
+ | |||
+ | # this "for" loop expression does not check finish condition | ||
+ | # for i in range(-1, n_peaks): | ||
+ | i = -1 | ||
+ | while i < n_peaks: | ||
+ | old_n_peaks = n_peaks | ||
+ | n_peaks = i + 1 | ||
+ | # this "for" loop expression does not check finish condition | ||
+ | # for j in (i + 1, old_n_peaks): | ||
+ | j = i + 1 | ||
+ | while j < old_n_peaks: | ||
+ | n_dist = (sorted_indices[j] - sorted_indices[i]) if i != -1 else (sorted_indices[j] + 1) # lag-zero peak of autocorr is at index -1 | ||
+ | if n_dist > min_dist or n_dist < -1 * min_dist: | ||
+ | sorted_indices[n_peaks] = sorted_indices[j] | ||
+ | n_peaks += 1 # original uses post increment | ||
+ | j += 1 | ||
+ | i += 1 | ||
+ | |||
+ | sorted_indices[:n_peaks] = sorted(sorted_indices[:n_peaks]) | ||
+ | |||
+ | return sorted_indices, n_peaks | ||
+ | |||
+ | |||
+ | if __name__ == "__main__": | ||
+ | hr, hrb, sp, spb = calc_hr_and_spo2([12853, 15573, 15580, 15586, 15587, 15567, 15520, 15480, 15464, 15460, 15462, 15466, 15473, 15479, 15485, 15490, 15495, 15503, 15512, 15518, 15521, 15521, 15518, 15517, 15522, 15527, 15536, 15547, 15558, 15568, 15577, 15587, 15594, 15604, 15610, 15616, 15620, 15624, 15625, 15615, 15576, 15531, 15508, 15500, 15502, 15509, 15516, 15523, 15528, 15533, 15538, 15547, 15556, 15564, 15564, 15560, 15556, 15556, 15559, 15564, 15570, 15579, 15588, 15599, 15610, 15619, 15628, 15635, 15642, 15649, 15655, 15662, 15669, 15672, 15661, 15621, 15571, 15546, 15537, 15538, 15545, 15553, 15560, 15565, 15570, 15577, 15585, 15593, 15600, 15601, 15597, 15592, 15591, 15594, 15600, 15608, 15617, 15626, 15633, 15640], [12258, 14318, 14322, 14324, 14326, 14317, 14299, 14284, 14280, 14279, 14280, 14283, 14285, 14288, 14292, 14294, 14297, 14299, 14302, 14304, 14305, 14305, 14304, 14304, 14306, 14308, 14311, 14316, 14321, 14325, 14329, 14333, 14329, 14329, 14332, 14335, 14336, 14338, 14338, 14333, 14315, 14295, 14286, 14283, 14285, 14288, 14292, 14295, 14297, 14298, 14301, 14305, 14309, 14312, 14312, 14310, 14308, 14308, 14309, 14312, 14315, 14318, 14322, 14327, 14332, 14336, 14341, 14344, 14347, 14350, 14351, 14354, 14357, 14359, 14353, 14335, 14313, 14304, 14300, 14302, 14305, 14309, 14312, 14314, 14316, 14319, 14323, 14326, 14329, 14329, 14326, 14325, 14324, 14326, 14328, 14332, 14336, 14341, 14345, 14349]) | ||
+ | |||
+ | print("hr detected:", hrb) | ||
+ | print("sp detected:", spb) | ||
+ | |||
+ | if (hrb == True and hr != -999): | ||
+ | hr2 = int(hr) | ||
+ | print("Heart Rate : ", hr2) | ||
+ | if (spb == True and sp != -999): | ||
+ | sp2 = int(sp) | ||
+ | print("SPO2 : ", sp2) | ||
+ | |||
+ | </code> | ||
+ | | ||
+ | HR_SpO2.py程序如下: | ||
+ | |||
+ | <code verilog> | ||
+ | |||
+ | from machine import SoftI2C, Pin, Timer | ||
+ | from utime import ticks_diff, ticks_us | ||
+ | from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM | ||
+ | from spo2cal import calc_hr_and_spo2 | ||
+ | |||
+ | |||
+ | BEATS = 0 # 存储心率 | ||
+ | FINGER_FLAG = False # 默认表示未检测到手指 | ||
+ | | ||
+ | SPO2 = 0 # 存储血氧 | ||
+ | TEMPERATURE = 0 # 存储温度 | ||
+ | |||
+ | |||
+ | def display_info(t): | ||
+ | # 如果没有检测到手指,那么就不显示 | ||
+ | if FINGER_FLAG is False: | ||
+ | return | ||
+ | | ||
+ | print('Heart Rate: ', BEATS, " SpO2:", SPO2, " Temperture:", TEMPERATURE) | ||
+ | |||
+ | |||
+ | def main(): | ||
+ | global BEATS, FINGER_FLAG, SPO2, TEMPERATURE # 如果需要对全局变量修改,则需要global声明 | ||
+ | | ||
+ | # 创建I2C对象(检测MAX30102) | ||
+ | i2c = SoftI2C(sda=Pin(16), scl=Pin(17), freq=400000) # Fast: 400kHz, slow: 100kHz | ||
+ | |||
+ | # 创建传感器对象 | ||
+ | sensor = MAX30102(i2c=i2c) | ||
+ | |||
+ | # 检测是否有传感器 | ||
+ | if sensor.i2c_address not in i2c.scan(): | ||
+ | print("没有找到传感器") | ||
+ | return | ||
+ | elif not (sensor.check_part_id()): | ||
+ | # 检查传感器是否兼容 | ||
+ | print("检测到的I2C设备不是MAX30102或者MAX30105") | ||
+ | return | ||
+ | else: | ||
+ | print("传感器已识别到") | ||
+ | |||
+ | # 配置 | ||
+ | sensor.setup_sensor() | ||
+ | sensor.set_sample_rate(400) | ||
+ | sensor.set_fifo_average(8) | ||
+ | sensor.set_active_leds_amplitude(MAX30105_PULSE_AMP_MEDIUM) | ||
+ | |||
+ | t_start = ticks_us() # Starting time of the acquisition | ||
+ | |||
+ | MAX_HISTORY = 32 | ||
+ | history = [] | ||
+ | beats_history = [] | ||
+ | beat = False | ||
+ | |||
+ | red_list = [] | ||
+ | ir_list = [] | ||
+ | |||
+ | while True: | ||
+ | sensor.check() | ||
+ | if sensor.available(): | ||
+ | # FIFO 先进先出,从队列中取数据。都是整形int | ||
+ | red_reading = sensor.pop_red_from_storage() | ||
+ | ir_reading = sensor.pop_ir_from_storage() | ||
+ | | ||
+ | if red_reading < 1000: | ||
+ | print('No finger') | ||
+ | FINGER_FLAG = False # 表示没有放手指 | ||
+ | continue | ||
+ | else: | ||
+ | FINGER_FLAG = True # 表示手指已放 | ||
+ | |||
+ | # 计算心率 | ||
+ | history.append(red_reading) | ||
+ | | ||
+ | # 为了防止列表过大,这里取列表的后32个元素 | ||
+ | history = history[-MAX_HISTORY:] | ||
+ | | ||
+ | # 提取必要数据 | ||
+ | minima, maxima = min(history), max(history) | ||
+ | threshold_on = (minima + maxima * 3) // 4 # 3/4 | ||
+ | threshold_off = (minima + maxima) // 2 # 1/2 | ||
+ | | ||
+ | if not beat and red_reading > threshold_on: | ||
+ | beat = True | ||
+ | t_us = ticks_diff(ticks_us(), t_start) | ||
+ | t_s = t_us/1000000 | ||
+ | f = 1/t_s | ||
+ | bpm = f * 60 | ||
+ | if bpm < 500: | ||
+ | t_start = ticks_us() | ||
+ | beats_history.append(bpm) | ||
+ | beats_history = beats_history[-MAX_HISTORY:] # 只保留最大30个元素数据 | ||
+ | BEATS = round(sum(beats_history)/len(beats_history), 2) # 四舍五入 | ||
+ | if beat and red_reading < threshold_off: | ||
+ | beat = False | ||
+ | | ||
+ | # 计算血氧 | ||
+ | red_list.append(red_reading) | ||
+ | ir_list.append(ir_reading) | ||
+ | # 最多 只保留最新的100个 | ||
+ | red_list = red_list[-100:] | ||
+ | ir_list = ir_list[-100:] | ||
+ | # 计算血氧值 | ||
+ | if len(red_list) == 100 and len(ir_list) == 100: | ||
+ | hr, hrb, sp, spb = calc_hr_and_spo2(red_list, ir_list) | ||
+ | if hrb is True and spb is True: | ||
+ | if sp != -999: | ||
+ | SPO2 = int(sp) | ||
+ | |||
+ | # 计算温度 | ||
+ | TEMPERATURE = sensor.read_temperature() | ||
+ | |||
+ | |||
+ | if __name__ == '__main__': | ||
+ | |||
+ | tim = Timer(period=1000, mode=Timer.PERIODIC, callback=display_info) | ||
+ | |||
+ | main() | ||
+ | |||
+ | </code> | ||
+ | |||
####运行效果 | ####运行效果 | ||
{{ :spo2_test.png?700 |}} | {{ :spo2_test.png?700 |}} |