Skip to content

Instantly share code, notes, and snippets.

@jedie
Created March 10, 2026 17:26
Show Gist options
  • Select an option

  • Save jedie/03d760504dd5a4a2773b42decf49ef8d to your computer and use it in GitHub Desktop.

Select an option

Save jedie/03d760504dd5a4a2773b42decf49ef8d to your computer and use it in GitHub Desktop.
lora_test2.py
print('Starting LoRa test script...')
"""
Minimal MicroPython example to test machine.SPI communication with the RFM95W LoRa module on the ESP32 Unphone.
On European frequencies (868 MHz), the RFM95W should be able to receive packets from nearby LoRa devices.
"""
# mpremote fs cp lora_test2.py :mpos/board/unphone.py && mpremote
import sys
import machine
from micropython import const
import time
import urandom
# Hardware constraints based on unPhone spin9
SPI_HOST = const(1)
SDA = const(3)
SCL = const(4)
SCK = const(39)
MOSI = const(40)
MISO = const(41)
LORA_CS = const(44)
LORA_RESET = const(42)
LORA_IRQ = const(10) # Connected to DIO0 (RxDone, TxDone)
LORA_DIO1 = const(11) # Connected to DIO1 (RxTimeout, CAD)
# RadioHead specific flags
RH_FLAGS_ACK = const(0x80)
print('defining Payload structure...')
class Payload:
def __init__(self, message, header_to, header_from, header_id, header_flags, rssi, snr):
self.message = message
self.header_to = header_to
self.header_from = header_from
self.header_id = header_id
self.header_flags = header_flags
self.rssi = rssi
self.snr = snr
print('defining LoRa class...')
class LoRa:
def __init__(
self,
*,
device: machine.SPI.Device,
cs_pin: int,
rst_pin: int,
irq_pin: int,
on_recv: callable,
node_address: int = 1,
radiohead: bool = True,
):
print(f'Initialize LoRa with {node_address=}')
self.device = device
self.cs = machine.Pin(cs_pin, machine.Pin.OUT, value=1)
self.rst = machine.Pin(rst_pin, machine.Pin.OUT)
self.irq_pin = machine.Pin(irq_pin, machine.Pin.IN)
self.node_address = node_address
self.radiohead = radiohead
self.on_recv = on_recv
self._last_header_id = 0
self.ack_received = False
self.cad_timeout = 1.0
self.retry_timeout = 0.2
self._reset_radio()
self._init_radio()
print('Attaching hardware interrupt to IRQ pin...')
self.irq_pin.irq(trigger=machine.Pin.IRQ_RISING, handler=self._isr)
def _isr(self, pin):
# Hardware interrupt handler. Must be fast and allocate no memory.
try:
micropython.schedule(self._handle_irq, 0)
except RuntimeError as e:
print(f'schedule queue is full? ISR scheduling failed: {e}')
else:
print('ISR triggered, scheduled packet processing task.')
def _reset_radio(self):
self.rst.value(0)
time.sleep_ms(10)
self.rst.value(1)
time.sleep_ms(10)
def _read_reg(self, reg):
wbuf = bytearray([reg & 0x7F, 0x00])
rbuf = bytearray(2)
self.cs.value(0)
self.device.write_readinto(wbuf, rbuf)
self.cs.value(1)
return rbuf[1]
def _write_reg(self, reg, val):
wbuf = bytearray([reg | 0x80, val])
rbuf = bytearray(2)
self.cs.value(0)
self.device.write_readinto(wbuf, rbuf)
self.cs.value(1)
def _init_radio(self):
self._write_reg(0x01, 0x80) # Sleep
time.sleep_ms(10)
self._write_reg(0x01, 0x81) # Standby
# Set base address of FIFO arrays to 0
self._write_reg(0x0E, 0x00) # TX_BASE_ADDR
self._write_reg(0x0F, 0x00) # RX_BASE_ADDR
# Set DIO Mapping: DIO0 triggers on RxDone (00)
self._write_reg(0x40, 0x00) # REG_DIO_MAPPING_1
self._write_reg(0x01, 0x85) # RX Continuous
def set_mode_rx(self):
self._write_reg(0x01, 0x85) # LoRa + RX Continuous
def set_mode_tx(self):
self._write_reg(0x01, 0x83) # LoRa + TX
def set_mode_idle(self):
self._write_reg(0x01, 0x81) # LoRa + Standby
def sleep(self):
self._write_reg(0x01, 0x80) # LoRa + Sleep
def wait_packet_sent(self, timeout=2.0):
start = time.ticks_ms()
while not (self._read_reg(0x12) & 0x08): # Check TxDone bit
if time.ticks_diff(time.ticks_ms(), start) > (timeout * 1000):
return False
time.sleep_ms(5)
self._write_reg(0x12, 0x08) # Clear TxDone flag
return True
def send(self, data, header_to=255, header_id=0, header_flags=0):
self.set_mode_idle()
if self.radiohead:
# 4-byte RadioHead header: TO, FROM, ID, FLAGS
payload = bytearray([header_to, self.node_address, header_id, header_flags])
if isinstance(data, str):
payload.extend(data.encode())
else:
payload.extend(data)
else:
payload = bytearray(data) if not isinstance(data, str) else bytearray(data.encode())
# Load payload into FIFO
self._write_reg(0x0D, 0x00) # Set FIFO ptr to TX base
for b in payload:
self._write_reg(0x00, b)
self._write_reg(0x22, len(payload)) # Set payload length
self.set_mode_tx()
self.wait_packet_sent()
self.set_mode_rx()
return True
def send_to_wait(self, data, header_to, header_flags=0, retries=3):
if not self.radiohead:
return False
self._last_header_id = (self._last_header_id + 1) % 256
current_id = self._last_header_id
for attempt in range(retries + 1):
self.send(data, header_to, current_id, header_flags)
self.ack_received = False
# Wait for ACK
start = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), start) < (self.retry_timeout * 1000):
self.poll() # Check for incoming ACKs
if self.ack_received:
return True
time.sleep_ms(10)
return False
def _handle_irq(self, _):
# Scheduled task running in the main thread
irq_flags = self._read_reg(0x12)
# 0x40 is the RxDone flag
if irq_flags & 0x40:
print('Packet received, processing...')
self._write_reg(0x12, 0xFF) # Clear flags
length = self._read_reg(0x13)
print(f'Packet length: {length} bytes')
current_addr = self._read_reg(0x10)
print(f'Current FIFO address: {current_addr:#04x}')
self._write_reg(0x0D, current_addr)
# Burst read FIFO
wbuf = bytearray(length + 1)
wbuf[0] = 0x00 & 0x7F
rbuf = bytearray(length + 1)
self.cs.value(0)
self.device.write_readinto(wbuf, rbuf)
self.cs.value(1)
raw_data = rbuf[1:]
print(f'Raw data: {raw_data}')
# RSSI & SNR calculations
pkt_snr = self._read_reg(0x19)
snr = (pkt_snr - 256) / 4 if pkt_snr > 127 else pkt_snr / 4
rssi = self._read_reg(0x1A) - 157
print(f'RSSI: {rssi} dBm, SNR: {snr} dB')
if self.radiohead and length >= 4:
header_to = raw_data[0]
header_from = raw_data[1]
header_id = raw_data[2]
header_flags = raw_data[3]
message = raw_data[4:]
# Filter addresses
if header_to != self.node_address and header_to != 255:
return
# Handle ACK
if header_flags & RH_FLAGS_ACK:
if header_id == self._last_header_id:
self.ack_received = True
return # Don't trigger user callback for an ACK
# Automatically send ACK if requested (and not broadcast)
if header_to != 255:
self.send(b'!', header_to=header_from, header_id=header_id, header_flags=RH_FLAGS_ACK)
payload = Payload(message, header_to, header_from, header_id, header_flags, rssi, snr)
else:
payload = Payload(raw_data, 255, 255, 0, 0, rssi, snr)
self.on_recv(payload)
print('shared machine.SPI bus initialization')
try:
spi_bus = machine.SPI.Bus(host=SPI_HOST, sck=SCK, mosi=MOSI, miso=MISO)
lora_device = machine.SPI.Device(
spi_bus=spi_bus,
freq=5_000_000,
cs=LORA_CS,
)
except Exception as e:
sys.print_exception(e)
print('Attempting hard reset in 3sec...')
time.sleep(3)
machine.reset()
# Define Callback
def on_receive(payload):
print(f'Received from Node {payload.header_from}: {payload.message}')
print(f'RSSI: {payload.rssi} dBm, SNR: {payload.snr} dB')
lora = LoRa(
device=lora_device,
cs_pin=LORA_CS,
rst_pin=LORA_RESET,
irq_pin=LORA_IRQ,
node_address=10,
radiohead=True,
on_recv=on_receive,
)
print('LoRa Node Ready. Listening...')
while True:
print(f'{time.time()=} Still listening...')
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment