Created
June 9, 2024 11:03
-
-
Save georgelviv/a28eac491ca5b7beeb2246afeca1e4ba to your computer and use it in GitHub Desktop.
Python lib for Lora module, based on lora HAT, adapted for RPI5
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from serial import Serial | |
| from glob import glob | |
| from gpiozero import DigitalOutputDevice | |
| import time | |
| import json | |
| data_rate_bits = { | |
| '300': 0, | |
| '1200': 1, | |
| '2400': 2, | |
| '4800': 3, | |
| '9600': 4, | |
| '19200': 5, | |
| '38400': 6, | |
| '62500': 7 | |
| } | |
| transmitting_power_bits = { | |
| '30': 0, | |
| '27': 1, | |
| '24': 2, | |
| '21': 3 | |
| } | |
| def get_key_from_value(dictionary, target_value): | |
| for key, value in dictionary.items(): | |
| if value == target_value: | |
| return key | |
| return None | |
| def process_json_messages(input_string, extension=None): | |
| json_messages = [] | |
| start = 0 | |
| while start < len(input_string): | |
| try: | |
| # Decode a JSON object from the input string | |
| obj, end = json.JSONDecoder().raw_decode(input_string[start:]) | |
| if extension: | |
| obj.update(extension) | |
| json_messages.append(obj) | |
| # Move the start index to the end of the current JSON object | |
| start += end | |
| # Skip any whitespace or separators between JSON objects | |
| while start < len(input_string) and input_string[start].isspace(): | |
| start += 1 | |
| except json.JSONDecodeError: | |
| # Break if no more valid JSON objects are found | |
| break | |
| return json_messages | |
| class sx1262: | |
| def __init__(self, port, m0_pin=22, m1_pin=27): | |
| self.serial_port = port | |
| self.ser = Serial(self.serial_port, 9600) | |
| print('Init SX1262') | |
| time.sleep(0.1) | |
| self.m0 = DigitalOutputDevice(m0_pin) | |
| self.m1 = DigitalOutputDevice(m1_pin) | |
| def configuration_mode(self): | |
| self.m0.off() | |
| self.m1.on() | |
| time.sleep(0.1) | |
| def normal_mode(self): | |
| self.m0.off() | |
| self.m1.off() | |
| time.sleep(0.1) | |
| def read_register(self): | |
| self.configuration_mode() | |
| read_register_config = [0xC1, 0x00, 0x05] | |
| self.ser.reset_input_buffer() | |
| self.ser.write(bytes(read_register_config)) | |
| time.sleep(0.2) | |
| buffer_size = self.ser.in_waiting | |
| response = self.ser.read(buffer_size) | |
| print(f'Read Config: ({buffer_size} bits)') | |
| if buffer_size == 0: | |
| print('No response') | |
| return None | |
| print(f'Header:{hex(response[0])} - {hex(response[1])} - {hex(response[2])}') | |
| if response[0] == 0xff and response[1] == 0xff and response[2] == 0xff: | |
| print('Wrong format') | |
| return None | |
| payload = response[3:] | |
| addh = payload[0] | |
| addl = payload[1] | |
| netid = payload[2] | |
| reg0 = payload[3] | |
| reg1 = payload[4] | |
| print(f'Configuration:') | |
| print(f'ADDH={bin(addh)}\nADDL={bin(addl)}\nNETID={bin(netid)}') | |
| print(f'REG0={bin(reg0)}\nREG1={bin(reg1)}\n') | |
| config = {} | |
| config['address'] = int(addh << 8) + int(addl) | |
| config['network_id'] = int(netid) | |
| config['data_rate'] = get_key_from_value(data_rate_bits, int(reg0 & 7)) | |
| config['transmitting_power'] = get_key_from_value(transmitting_power_bits, int(reg1 & 3)) | |
| return config | |
| def set_register(self, address=0, net_id=10, freq=868, data_rate='2400', | |
| transmitting_power='30'): | |
| self.configuration_mode() | |
| data_rate_bit = data_rate_bits[data_rate] | |
| transmitting_power_bit = transmitting_power_bits[transmitting_power] | |
| ADDH = address >> 8 & 0xff | |
| ADDL = address & 0xff | |
| NETID = net_id & 0xff | |
| REG0 = (0b01100 << 3) + data_rate_bit | |
| REG1 = (0b001000 << 2) + transmitting_power_bit | |
| set_config_register = [0xC0, 0x00, 0x05, ADDH, ADDL, NETID, REG0, REG1] | |
| self.ser.reset_input_buffer() | |
| self.ser.write(bytes(set_config_register)) | |
| time.sleep(0.2) | |
| r_size = self.ser.in_waiting | |
| if not r_size: | |
| print('Error to read from serial') | |
| return | |
| response = self.ser.read(r_size) | |
| if response[0] == 0xC1: | |
| print(f'\nConfiguration is set:') | |
| print(f'ADDH={bin(ADDH)}\nADDL={bin(ADDL)}\nNETID={NETID}') | |
| print(f'REG0={bin(REG0)}\nREG1={bin(REG1)}\n') | |
| else: | |
| print(f'Error to set temp configuration, size={r_size} response={response}') | |
| exit(1) | |
| def send(self, data): | |
| self.normal_mode() | |
| self.ser.write(data) | |
| time.sleep(0.1) | |
| def receive(self, cb): | |
| self.normal_mode() | |
| if self.ser.in_waiting > 0: | |
| time.sleep(2) | |
| response = self.ser.read(self.ser.in_waiting) | |
| d = response.decode('utf-8', errors='ignore') | |
| last_brace_index = d.rfind('}') | |
| msg = d[:last_brace_index + 1] | |
| rssi = self.get_channel_rssi() | |
| rssi_extension = {"rssi": rssi} | |
| parsed_json = process_json_messages(msg, extension=rssi_extension) | |
| cb(parsed_json) | |
| def get_channel_rssi(self): | |
| self.normal_mode() | |
| time.sleep(0.1) | |
| self.ser.reset_input_buffer() | |
| self.ser.write(bytes([0xC0,0xC1,0xC2,0xC3,0x00,0x02])) | |
| time.sleep(0.5) | |
| re_temp = bytes(5) | |
| if self.ser.in_waiting > 0: | |
| time.sleep(0.1) | |
| re_temp = self.ser.read(self.ser.in_waiting) | |
| if re_temp[0] == 0xC1 and re_temp[1] == 0x00 and re_temp[2] == 0x02: | |
| return 256 - re_temp[3] | |
| else: | |
| print('receive rssi value fail') | |
| def release(self): | |
| self.m0.close() | |
| self.m1.close() | |
| self.ser.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment