Skip to content

Instantly share code, notes, and snippets.

@georgelviv
Created June 9, 2024 11:03
Show Gist options
  • Select an option

  • Save georgelviv/a28eac491ca5b7beeb2246afeca1e4ba to your computer and use it in GitHub Desktop.

Select an option

Save georgelviv/a28eac491ca5b7beeb2246afeca1e4ba to your computer and use it in GitHub Desktop.
Python lib for Lora module, based on lora HAT, adapted for RPI5
from serial import Serial
from glob import glob
from gpiozero import DigitalOutputDevice
import time
import json
data_rate_bits = {
'300': 0,
'1200': 1,
'2400': 2,
'4800': 3,
'9600': 4,
'19200': 5,
'38400': 6,
'62500': 7
}
transmitting_power_bits = {
'30': 0,
'27': 1,
'24': 2,
'21': 3
}
def get_key_from_value(dictionary, target_value):
for key, value in dictionary.items():
if value == target_value:
return key
return None
def process_json_messages(input_string, extension=None):
json_messages = []
start = 0
while start < len(input_string):
try:
# Decode a JSON object from the input string
obj, end = json.JSONDecoder().raw_decode(input_string[start:])
if extension:
obj.update(extension)
json_messages.append(obj)
# Move the start index to the end of the current JSON object
start += end
# Skip any whitespace or separators between JSON objects
while start < len(input_string) and input_string[start].isspace():
start += 1
except json.JSONDecodeError:
# Break if no more valid JSON objects are found
break
return json_messages
class sx1262:
def __init__(self, port, m0_pin=22, m1_pin=27):
self.serial_port = port
self.ser = Serial(self.serial_port, 9600)
print('Init SX1262')
time.sleep(0.1)
self.m0 = DigitalOutputDevice(m0_pin)
self.m1 = DigitalOutputDevice(m1_pin)
def configuration_mode(self):
self.m0.off()
self.m1.on()
time.sleep(0.1)
def normal_mode(self):
self.m0.off()
self.m1.off()
time.sleep(0.1)
def read_register(self):
self.configuration_mode()
read_register_config = [0xC1, 0x00, 0x05]
self.ser.reset_input_buffer()
self.ser.write(bytes(read_register_config))
time.sleep(0.2)
buffer_size = self.ser.in_waiting
response = self.ser.read(buffer_size)
print(f'Read Config: ({buffer_size} bits)')
if buffer_size == 0:
print('No response')
return None
print(f'Header:{hex(response[0])} - {hex(response[1])} - {hex(response[2])}')
if response[0] == 0xff and response[1] == 0xff and response[2] == 0xff:
print('Wrong format')
return None
payload = response[3:]
addh = payload[0]
addl = payload[1]
netid = payload[2]
reg0 = payload[3]
reg1 = payload[4]
print(f'Configuration:')
print(f'ADDH={bin(addh)}\nADDL={bin(addl)}\nNETID={bin(netid)}')
print(f'REG0={bin(reg0)}\nREG1={bin(reg1)}\n')
config = {}
config['address'] = int(addh << 8) + int(addl)
config['network_id'] = int(netid)
config['data_rate'] = get_key_from_value(data_rate_bits, int(reg0 & 7))
config['transmitting_power'] = get_key_from_value(transmitting_power_bits, int(reg1 & 3))
return config
def set_register(self, address=0, net_id=10, freq=868, data_rate='2400',
transmitting_power='30'):
self.configuration_mode()
data_rate_bit = data_rate_bits[data_rate]
transmitting_power_bit = transmitting_power_bits[transmitting_power]
ADDH = address >> 8 & 0xff
ADDL = address & 0xff
NETID = net_id & 0xff
REG0 = (0b01100 << 3) + data_rate_bit
REG1 = (0b001000 << 2) + transmitting_power_bit
set_config_register = [0xC0, 0x00, 0x05, ADDH, ADDL, NETID, REG0, REG1]
self.ser.reset_input_buffer()
self.ser.write(bytes(set_config_register))
time.sleep(0.2)
r_size = self.ser.in_waiting
if not r_size:
print('Error to read from serial')
return
response = self.ser.read(r_size)
if response[0] == 0xC1:
print(f'\nConfiguration is set:')
print(f'ADDH={bin(ADDH)}\nADDL={bin(ADDL)}\nNETID={NETID}')
print(f'REG0={bin(REG0)}\nREG1={bin(REG1)}\n')
else:
print(f'Error to set temp configuration, size={r_size} response={response}')
exit(1)
def send(self, data):
self.normal_mode()
self.ser.write(data)
time.sleep(0.1)
def receive(self, cb):
self.normal_mode()
if self.ser.in_waiting > 0:
time.sleep(2)
response = self.ser.read(self.ser.in_waiting)
d = response.decode('utf-8', errors='ignore')
last_brace_index = d.rfind('}')
msg = d[:last_brace_index + 1]
rssi = self.get_channel_rssi()
rssi_extension = {"rssi": rssi}
parsed_json = process_json_messages(msg, extension=rssi_extension)
cb(parsed_json)
def get_channel_rssi(self):
self.normal_mode()
time.sleep(0.1)
self.ser.reset_input_buffer()
self.ser.write(bytes([0xC0,0xC1,0xC2,0xC3,0x00,0x02]))
time.sleep(0.5)
re_temp = bytes(5)
if self.ser.in_waiting > 0:
time.sleep(0.1)
re_temp = self.ser.read(self.ser.in_waiting)
if re_temp[0] == 0xC1 and re_temp[1] == 0x00 and re_temp[2] == 0x02:
return 256 - re_temp[3]
else:
print('receive rssi value fail')
def release(self):
self.m0.close()
self.m1.close()
self.ser.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment