MicroPython Micro:bit で Weather:bit のコードの実装を試みた。
weatherbit.py でメモリ溢れした。
| from struct import unpack | |
| from microbit import i2c | |
| bme280_calib = {} | |
| def bme280_read(addr, length): | |
| i2c.write(0x76, bytes([addr]), repeat=True) | |
| return i2c.read(0x76, length) | |
| def bme280_write(*addr_and_data): | |
| assert len(addr_and_data) % 2 == 0 | |
| i2c.write(0x76, bytes(addr_and_data)) | |
| def bme280_init(): | |
| bme280_write(0xF2, 0x01) # ctrlHum | |
| bme280_write(0xF4, 0x27) # ctrlMeas | |
| bme280_write(0xF5, 0x00) # config | |
| t = unpack("<H2hH8h", bme280_read(0x88, 24)) | |
| bme280_calib["t"] = t[:3] | |
| bme280_calib["p"] = t[3:] | |
| t = unpack("B", bme280_read(0xA1, 1)) | |
| d = bme280_read(0xE1, 7) | |
| t += unpack("<hB", d) | |
| t += ((d[3]<<4 | d[4]&0x0f), | |
| (d[4]>>4 | d[5]<<4)) | |
| t += unpack("b", d[6:]) | |
| bme280_calib["h"] = t | |
| def bme280_reset(): | |
| bme280_write(0xE0, 0xB6) | |
| def bme280_compensate_t(adc_T): | |
| t1, t2, t3 = bme280_calib["t"] | |
| v1 = (((adc_T>>3) - (t1<<1)) * t2)>>11 | |
| v2 = (((((adc_T>>4) - t1) * ((adc_T>>4) - t1))>>12) * t3)>>14 | |
| t_fine = v1 + v2 | |
| return t_fine, (t_fine * 5 + 128) >> 8 | |
| def bme280_compensate_p(t_fine, adc_P): | |
| p1, p2, p3, p4, p5, p6, p7, p8, p9 = bme280_calib["p"] | |
| v1 = t_fine - 128000 | |
| v2 = v1 * v1 * p6 | |
| v2 = v2 + ((v1 * p5)<<17) | |
| v2 = v2 + (p4<<35) | |
| v1 = ((v1 * v1 * p3)>>8) + ((v1 * p2)<<12) | |
| v1 = ((1<<47) + v1) * p1 >> 33 | |
| if v1 == 0: | |
| return 0 | |
| p = 1048576 - adc_P | |
| p = ((p<<31) - v2) * 3125 // v1 | |
| v1 = (p9 * (p>>13) * (p>>13))>>25 | |
| v2 = (p8 * p)>>19 | |
| return ((p + v1 + v2)>>8) + (p7<<4) | |
| def bme280_compensate_h(t_fine, adc_H): | |
| h1, h2, h3, h4, h5, h6 = bme280_calib["h"] | |
| x1 = t_fine - 76800 | |
| x1 = ( | |
| ((adc_H<<14) - (h4<<20) - h5*x1 + 16384)>>15 | |
| ) * (((( | |
| (((x1*h6)>>10) * (((x1*h3)>>11) + 32768))>>10 | |
| ) + 2097152) * h2 + 8192 )>>14) | |
| x1 = x1 - (((((x1>>15) * (x1>>15))>>7) * h1)>>4) | |
| if x1 < 0: | |
| x1 = 0 | |
| if x1 > 419430400: | |
| x1 = 419430400 | |
| return x1>>12 | |
| def bme280_measure(): | |
| d = bme280_read(0xF7, 8) | |
| t_fine, T = bme280_compensate_t(d[3]<<12 | d[4]<<4 | d[5]>>4) | |
| P = bme280_compensate_p(t_fine, d[0]<<12 | d[1]<<4 | d[2]>>4) | |
| H = bme280_compensate_h(t_fine, d[6]<<8 | d[7]) | |
| return dict(T=T, P=P, H=H, t_fine=t_fine) |
| import utime | |
| from struct import unpack, unpack_from | |
| from microbit import i2c, pin1, pin2, pin8 | |
| class Bme280(object): | |
| def __init__(self, i2c_addr=0x76): | |
| self.i2c_addr = i2c_addr | |
| self.write(0xF2, 0x01) # ctrlHum | |
| self.write(0xF4, 0x27) # ctrlMeas | |
| self.write(0xF5, 0x00) # config | |
| self.prefetch_calibration() | |
| def read(self, addr, length): | |
| i2c.write(self.i2c_addr, bytes([addr]), repeat=True) | |
| return i2c.read(self.i2c_addr, length) | |
| def write(self, *addr_and_data): | |
| assert len(addr_and_data) % 2 == 0 | |
| i2c.write(self.i2c_addr, bytes(addr_and_data)) | |
| def reset(self): | |
| self.write(0xE0, 0xB6) | |
| def prefetch_calibration(self): | |
| t = unpack("<H2hH8h", self.read(0x88, 24)) | |
| self.calib_t = t[:3] | |
| self.calib_p = t[3:] | |
| t = unpack("B", self.read(0xA1, 1)) | |
| d = self.read(0xE1, 7) | |
| t += unpack("<hB", d) | |
| t += ((d[3]<<4 | d[4]&0x0f), | |
| (d[4]>>4 | d[5]<<4)) | |
| t += unpack_from("b", d, 6) | |
| self.calib_h = t | |
| def compensate_t(adc_T): | |
| """ | |
| @return 0.01 DegC "5123" equals 51.23 DegC. | |
| """ | |
| t1, t2, t3 = self.calib_t | |
| v1 = (((adc_T>>3) - (t1<<1)) * t2)>>11 | |
| v2 = (((((adc_T>>4) - t1) * ((adc_T>>4) - t1))>>12) * t3)>>14 | |
| t_fine = v1 + v2 | |
| return t_fine, (t_fine * 5 + 128) >> 8 | |
| def compensate_p(t_fine, adc_P): | |
| """ | |
| Q24.8 format (24 integer bits and 8 fractional) | |
| 24674867 represents 24674867/256 = 96386.2 Pa (963.862 hPa) | |
| """ | |
| p1, p2, p3, p4, p5, p6, p7, p8, p9 = self.calib_p | |
| v1 = t_fine - 128000 | |
| v2 = v1 * v1 * p6 | |
| v2 = v2 + ((v1 * p5)<<17) | |
| v2 = v2 + (p4<<35) | |
| v1 = ((v1 * v1 * p3)>>8) + ((v1 * p2)<<12) | |
| v1 = ((1<<47) + v1) * p1 >> 33 | |
| if v1 == 0: | |
| return 0 | |
| p = 1048576 - adc_P | |
| p = ((p<<31) - v2) * 3125 // v1 | |
| v1 = (p9 * (p>>13) * (p>>13))>>25 | |
| v2 = (p8 * p)>>19 | |
| return ((p + v1 + v2)>>8) + (p7<<4) | |
| def compensate_h(t_fine, adc_H): | |
| """ | |
| humidity in %RH as unsigned 32 bit integer in Q22.10 format (22 integer and 10 fractional bits). | |
| 47445 represents 47445/1024 = 46.333 %RH | |
| """ | |
| h1, h2, h3, h4, h5, h6 = self.calib_h | |
| x1 = t_fine - 76800 | |
| x1 = ( | |
| ((adc_H<<14) - (h4<<20) - h5*x1 + 16384)>>15 | |
| ) * (((( | |
| (((x1*h6)>>10) * (((x1*h3)>>11) + 32768))>>10 | |
| ) + 2097152) * h2 + 8192 )>>14) | |
| x1 = x1 - (((((x1>>15) * (x1>>15))>>7) * h1)>>4) | |
| if x1 < 0: | |
| x1 = 0 | |
| if x1 > 419430400: | |
| x1 = 419430400 | |
| return x1>>12 | |
| def measure(self): | |
| d = self.read(0xF7, 8) | |
| t_fine, T = self.compensate_t(d[3]<<12 | d[4]<<4 | d[5]>>4) | |
| P = self.compensate_p(t_fine, d[0]<<12 | d[1]<<4 | d[2]>>4) | |
| H = self.compensate_h(t_fine, d[6]<<8 | d[7]) | |
| return dict(T=T, P=P, H=H, t_fine=t_fine) | |
| class Rain(object): | |
| def __init__(self): | |
| self.pin = pin2.read_digital() | |
| self.dumps = 0 | |
| def spin(self): | |
| n = pin2.read_digital() | |
| if self.pin == 0 and n == 1: | |
| self.dumps += 1 | |
| self.pin = n | |
| def value(self): | |
| return (self.dumps * 11)/1000 | |
| class Wind(object): | |
| def __init__(self): | |
| self.pin = pin8.read_digital() | |
| self.turns = 0 | |
| self.turn32 = 0 | |
| self.window_mph() | |
| def spin(self): | |
| n = pin8.read_digital() | |
| if self.pin == 0 and n == 1: | |
| self.turns += 1 | |
| self.turn32 = (self.turn32+1) & 0xFFFFFFFF | |
| self.pin = n | |
| cur = utime.ticks_ms() | |
| if self.old < self.next: | |
| if cur > self.next: | |
| self.window_mph() | |
| else: | |
| if cur < self.old and cur > self.next: | |
| self.window_mph() | |
| def window_mph(self): | |
| self.mph = (self.turns/2) / (1492/1000) | |
| self.turns = 0 | |
| self.ms_old = utime.ticks_ms() | |
| self.ms_next = utime.ticks_add(self.ms, 2000) | |
| def wind_direction(): | |
| d = pin1.read_analog() | |
| if d < 906 and d > 886: | |
| s = "N" | |
| elif d < 712 and d > 692: | |
| s = "NE" | |
| elif d < 415 and d > 395: | |
| s = "E" | |
| elif d < 498 and d > 478: | |
| s = "SE" | |
| elif d < 584 and d > 564: | |
| s = "S" | |
| elif d < 819 and d > 799: | |
| s = "SW" | |
| elif d < 988 and d > 968: | |
| s = "W" | |
| elif d < 959 and d > 939: | |
| s = "NW" | |
| else: | |
| s = "???" | |
| return {"winDirRaw":d, "windDirStr":s} |