Created
March 10, 2026 17:26
-
-
Save jedie/03d760504dd5a4a2773b42decf49ef8d to your computer and use it in GitHub Desktop.
lora_test2.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| print('Starting LoRa test script...') | |
| """ | |
| Minimal MicroPython example to test machine.SPI communication with the RFM95W LoRa module on the ESP32 Unphone. | |
| On European frequencies (868 MHz), the RFM95W should be able to receive packets from nearby LoRa devices. | |
| """ | |
| # mpremote fs cp lora_test2.py :mpos/board/unphone.py && mpremote | |
| import sys | |
| import machine | |
| from micropython import const | |
| import time | |
| import urandom | |
| # Hardware constraints based on unPhone spin9 | |
| SPI_HOST = const(1) | |
| SDA = const(3) | |
| SCL = const(4) | |
| SCK = const(39) | |
| MOSI = const(40) | |
| MISO = const(41) | |
| LORA_CS = const(44) | |
| LORA_RESET = const(42) | |
| LORA_IRQ = const(10) # Connected to DIO0 (RxDone, TxDone) | |
| LORA_DIO1 = const(11) # Connected to DIO1 (RxTimeout, CAD) | |
| # RadioHead specific flags | |
| RH_FLAGS_ACK = const(0x80) | |
| print('defining Payload structure...') | |
| class Payload: | |
| def __init__(self, message, header_to, header_from, header_id, header_flags, rssi, snr): | |
| self.message = message | |
| self.header_to = header_to | |
| self.header_from = header_from | |
| self.header_id = header_id | |
| self.header_flags = header_flags | |
| self.rssi = rssi | |
| self.snr = snr | |
| print('defining LoRa class...') | |
| class LoRa: | |
| def __init__( | |
| self, | |
| *, | |
| device: machine.SPI.Device, | |
| cs_pin: int, | |
| rst_pin: int, | |
| irq_pin: int, | |
| on_recv: callable, | |
| node_address: int = 1, | |
| radiohead: bool = True, | |
| ): | |
| print(f'Initialize LoRa with {node_address=}') | |
| self.device = device | |
| self.cs = machine.Pin(cs_pin, machine.Pin.OUT, value=1) | |
| self.rst = machine.Pin(rst_pin, machine.Pin.OUT) | |
| self.irq_pin = machine.Pin(irq_pin, machine.Pin.IN) | |
| self.node_address = node_address | |
| self.radiohead = radiohead | |
| self.on_recv = on_recv | |
| self._last_header_id = 0 | |
| self.ack_received = False | |
| self.cad_timeout = 1.0 | |
| self.retry_timeout = 0.2 | |
| self._reset_radio() | |
| self._init_radio() | |
| print('Attaching hardware interrupt to IRQ pin...') | |
| self.irq_pin.irq(trigger=machine.Pin.IRQ_RISING, handler=self._isr) | |
| def _isr(self, pin): | |
| # Hardware interrupt handler. Must be fast and allocate no memory. | |
| try: | |
| micropython.schedule(self._handle_irq, 0) | |
| except RuntimeError as e: | |
| print(f'schedule queue is full? ISR scheduling failed: {e}') | |
| else: | |
| print('ISR triggered, scheduled packet processing task.') | |
| def _reset_radio(self): | |
| self.rst.value(0) | |
| time.sleep_ms(10) | |
| self.rst.value(1) | |
| time.sleep_ms(10) | |
| def _read_reg(self, reg): | |
| wbuf = bytearray([reg & 0x7F, 0x00]) | |
| rbuf = bytearray(2) | |
| self.cs.value(0) | |
| self.device.write_readinto(wbuf, rbuf) | |
| self.cs.value(1) | |
| return rbuf[1] | |
| def _write_reg(self, reg, val): | |
| wbuf = bytearray([reg | 0x80, val]) | |
| rbuf = bytearray(2) | |
| self.cs.value(0) | |
| self.device.write_readinto(wbuf, rbuf) | |
| self.cs.value(1) | |
| def _init_radio(self): | |
| self._write_reg(0x01, 0x80) # Sleep | |
| time.sleep_ms(10) | |
| self._write_reg(0x01, 0x81) # Standby | |
| # Set base address of FIFO arrays to 0 | |
| self._write_reg(0x0E, 0x00) # TX_BASE_ADDR | |
| self._write_reg(0x0F, 0x00) # RX_BASE_ADDR | |
| # Set DIO Mapping: DIO0 triggers on RxDone (00) | |
| self._write_reg(0x40, 0x00) # REG_DIO_MAPPING_1 | |
| self._write_reg(0x01, 0x85) # RX Continuous | |
| def set_mode_rx(self): | |
| self._write_reg(0x01, 0x85) # LoRa + RX Continuous | |
| def set_mode_tx(self): | |
| self._write_reg(0x01, 0x83) # LoRa + TX | |
| def set_mode_idle(self): | |
| self._write_reg(0x01, 0x81) # LoRa + Standby | |
| def sleep(self): | |
| self._write_reg(0x01, 0x80) # LoRa + Sleep | |
| def wait_packet_sent(self, timeout=2.0): | |
| start = time.ticks_ms() | |
| while not (self._read_reg(0x12) & 0x08): # Check TxDone bit | |
| if time.ticks_diff(time.ticks_ms(), start) > (timeout * 1000): | |
| return False | |
| time.sleep_ms(5) | |
| self._write_reg(0x12, 0x08) # Clear TxDone flag | |
| return True | |
| def send(self, data, header_to=255, header_id=0, header_flags=0): | |
| self.set_mode_idle() | |
| if self.radiohead: | |
| # 4-byte RadioHead header: TO, FROM, ID, FLAGS | |
| payload = bytearray([header_to, self.node_address, header_id, header_flags]) | |
| if isinstance(data, str): | |
| payload.extend(data.encode()) | |
| else: | |
| payload.extend(data) | |
| else: | |
| payload = bytearray(data) if not isinstance(data, str) else bytearray(data.encode()) | |
| # Load payload into FIFO | |
| self._write_reg(0x0D, 0x00) # Set FIFO ptr to TX base | |
| for b in payload: | |
| self._write_reg(0x00, b) | |
| self._write_reg(0x22, len(payload)) # Set payload length | |
| self.set_mode_tx() | |
| self.wait_packet_sent() | |
| self.set_mode_rx() | |
| return True | |
| def send_to_wait(self, data, header_to, header_flags=0, retries=3): | |
| if not self.radiohead: | |
| return False | |
| self._last_header_id = (self._last_header_id + 1) % 256 | |
| current_id = self._last_header_id | |
| for attempt in range(retries + 1): | |
| self.send(data, header_to, current_id, header_flags) | |
| self.ack_received = False | |
| # Wait for ACK | |
| start = time.ticks_ms() | |
| while time.ticks_diff(time.ticks_ms(), start) < (self.retry_timeout * 1000): | |
| self.poll() # Check for incoming ACKs | |
| if self.ack_received: | |
| return True | |
| time.sleep_ms(10) | |
| return False | |
| def _handle_irq(self, _): | |
| # Scheduled task running in the main thread | |
| irq_flags = self._read_reg(0x12) | |
| # 0x40 is the RxDone flag | |
| if irq_flags & 0x40: | |
| print('Packet received, processing...') | |
| self._write_reg(0x12, 0xFF) # Clear flags | |
| length = self._read_reg(0x13) | |
| print(f'Packet length: {length} bytes') | |
| current_addr = self._read_reg(0x10) | |
| print(f'Current FIFO address: {current_addr:#04x}') | |
| self._write_reg(0x0D, current_addr) | |
| # Burst read FIFO | |
| wbuf = bytearray(length + 1) | |
| wbuf[0] = 0x00 & 0x7F | |
| rbuf = bytearray(length + 1) | |
| self.cs.value(0) | |
| self.device.write_readinto(wbuf, rbuf) | |
| self.cs.value(1) | |
| raw_data = rbuf[1:] | |
| print(f'Raw data: {raw_data}') | |
| # RSSI & SNR calculations | |
| pkt_snr = self._read_reg(0x19) | |
| snr = (pkt_snr - 256) / 4 if pkt_snr > 127 else pkt_snr / 4 | |
| rssi = self._read_reg(0x1A) - 157 | |
| print(f'RSSI: {rssi} dBm, SNR: {snr} dB') | |
| if self.radiohead and length >= 4: | |
| header_to = raw_data[0] | |
| header_from = raw_data[1] | |
| header_id = raw_data[2] | |
| header_flags = raw_data[3] | |
| message = raw_data[4:] | |
| # Filter addresses | |
| if header_to != self.node_address and header_to != 255: | |
| return | |
| # Handle ACK | |
| if header_flags & RH_FLAGS_ACK: | |
| if header_id == self._last_header_id: | |
| self.ack_received = True | |
| return # Don't trigger user callback for an ACK | |
| # Automatically send ACK if requested (and not broadcast) | |
| if header_to != 255: | |
| self.send(b'!', header_to=header_from, header_id=header_id, header_flags=RH_FLAGS_ACK) | |
| payload = Payload(message, header_to, header_from, header_id, header_flags, rssi, snr) | |
| else: | |
| payload = Payload(raw_data, 255, 255, 0, 0, rssi, snr) | |
| self.on_recv(payload) | |
| print('shared machine.SPI bus initialization') | |
| try: | |
| spi_bus = machine.SPI.Bus(host=SPI_HOST, sck=SCK, mosi=MOSI, miso=MISO) | |
| lora_device = machine.SPI.Device( | |
| spi_bus=spi_bus, | |
| freq=5_000_000, | |
| cs=LORA_CS, | |
| ) | |
| except Exception as e: | |
| sys.print_exception(e) | |
| print('Attempting hard reset in 3sec...') | |
| time.sleep(3) | |
| machine.reset() | |
| # Define Callback | |
| def on_receive(payload): | |
| print(f'Received from Node {payload.header_from}: {payload.message}') | |
| print(f'RSSI: {payload.rssi} dBm, SNR: {payload.snr} dB') | |
| lora = LoRa( | |
| device=lora_device, | |
| cs_pin=LORA_CS, | |
| rst_pin=LORA_RESET, | |
| irq_pin=LORA_IRQ, | |
| node_address=10, | |
| radiohead=True, | |
| on_recv=on_receive, | |
| ) | |
| print('LoRa Node Ready. Listening...') | |
| while True: | |
| print(f'{time.time()=} Still listening...') | |
| time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment