This script requires python3.7 or greater.
pip install bleak
pip install crcmod
python3 research_logger.py --scan
python3 research_logger.py --log a-log-file.log 00:11:22:33:44:55
| import argparse | |
| import asyncio | |
| import base64 | |
| import itertools | |
| import json | |
| import re | |
| import struct | |
| import textwrap | |
| import time | |
| import crcmod.predefined | |
| from bleak import BleakClient, BleakScanner, BleakError | |
| modbus_crc = crcmod.predefined.mkCrcFun('modbus') | |
| class DeviceCommand: | |
| def __init__(self, cmd: bytes): | |
| self.cmd = cmd | |
| """Returns the expected response size in bytes""" | |
| def response_size(self) -> int: | |
| pass | |
| """Provide an iter implemention so that bytes(cmd) works""" | |
| def __iter__(self): | |
| return iter(self.cmd) | |
| class QueryRangeCommand(DeviceCommand): | |
| def __init__(self, page: int, offset: int, length: int): | |
| self.page = page | |
| self.offset = offset | |
| self.length = length | |
| cmd = bytearray(8) | |
| cmd[0] = 1 # Standard prefix | |
| cmd[1] = 3 # Range query command | |
| struct.pack_into('!BBH', cmd, 2, page, offset, length) | |
| struct.pack_into('<H', cmd, -2, modbus_crc(cmd[:-2])) | |
| super().__init__(cmd) | |
| def response_size(self): | |
| # 3 byte header | |
| # each returned field is actually 2 bytes | |
| # 2 byte crc | |
| return 2 * self.length + 5 | |
| def __repr__(self): | |
| return f'QueryRangeCommand(page={self.page:#04x}, offset={self.offset:#04x}, length={self.length:#04x})' | |
| class ParseError(Exception): | |
| pass | |
| # Triggers a re-connect | |
| class BadConnectionError(Exception): | |
| pass | |
| class BluetoothPowerStation: | |
| RESPONSE_TIMEOUT = 5 | |
| WRITE_UUID = '0000ff02-0000-1000-8000-00805f9b34fb' | |
| NOTIFY_UUID = '0000ff01-0000-1000-8000-00805f9b34fb' | |
| current_command: DeviceCommand | |
| notify_future: asyncio.Future | |
| notify_data: bytearray | |
| def __init__(self, address: str): | |
| self.address = address | |
| self.client = BleakClient(address) | |
| self.command_queue = asyncio.Queue() | |
| self.notify_future = None | |
| self.loop = asyncio.get_running_loop() | |
| @property | |
| def is_connected(self): | |
| return self.client.is_connected | |
| async def perform(self, cmd: DeviceCommand): | |
| future = self.loop.create_future() | |
| await self.command_queue.put((cmd, future)) | |
| return future | |
| async def perform_nowait(self, cmd: DeviceCommand): | |
| await self.command_queue.put((cmd, None)) | |
| async def run(self): | |
| while True: | |
| try: | |
| await self.client.connect() | |
| await self.client.start_notify(self.NOTIFY_UUID, self._notification_handler) | |
| await self._perform_commands(self.client) | |
| except (BleakError, asyncio.TimeoutError): | |
| continue | |
| except BadConnectionError as err: | |
| # Something went wrong somewhere | |
| await self.client.disconnect() | |
| await asyncio.sleep(1) | |
| finally: | |
| await self.client.disconnect() | |
| async def _perform_commands(self, client): | |
| while client.is_connected: | |
| cmd, cmd_future = await self.command_queue.get() | |
| retries = 0 | |
| while retries < 5: | |
| try: | |
| # Prepare to make request | |
| self.current_command = cmd | |
| self.notify_future = self.loop.create_future() | |
| self.notify_data = bytearray() | |
| # Make request | |
| await client.write_gatt_char(self.WRITE_UUID, self.current_command, True) | |
| # Wait for response | |
| res = await asyncio.wait_for(self.notify_future, timeout=self.RESPONSE_TIMEOUT) | |
| if cmd_future: | |
| # TODO: Parse result | |
| cmd_future.set_result(res) | |
| # Success! | |
| break | |
| except ParseError: | |
| # For safety, wait the full timeout before retrying again | |
| retries += 1 | |
| await asyncio.sleep(self.RESPONSE_TIMEOUT) | |
| except asyncio.TimeoutError: | |
| retries += 1 | |
| except BleakError as err: | |
| if cmd_future: | |
| cmd_future.set_exception(err) | |
| # Don't retry | |
| break | |
| except BadConnectionError as err: | |
| # Exit command loop | |
| if cmd_future: | |
| cmd_future.set_exception(err) | |
| self.command_queue.task_done() | |
| raise | |
| if retries == 5: | |
| err = BadConnectionError('too many retries') | |
| if cmd_future: | |
| cmd_future.set_exception(err) | |
| self.command_queue.task_done() | |
| raise err | |
| else: | |
| self.command_queue.task_done() | |
| def _notification_handler(self, _sender: int, data: bytearray): | |
| # Ignore notifications we don't expect | |
| if not self.notify_future or self.notify_future.done(): | |
| return | |
| # If something went wrong, we might get weird data. | |
| if data == b'AT+NAME?\r' or data == b'AT+ADV?\r': | |
| self.notify_future.set_exception(BadConnectionError('Got AT+ notification')) | |
| return | |
| # Save data | |
| self.notify_data.extend(data) | |
| # Check if we're done reading the data we expected | |
| if len(self.notify_data) == self.current_command.response_size(): | |
| # Validate the CRC | |
| crc = modbus_crc(self.notify_data[0:-2]).to_bytes(2, byteorder='little') | |
| if self.notify_data[-2:] == crc: | |
| self.notify_future.set_result(self.notify_data) | |
| else: | |
| self.notify_future.set_exception(ParseError('Failed checksum')) | |
| async def scan(): | |
| print('Scanning....') | |
| devices = await BleakScanner.discover() | |
| if len(devices) == 0: | |
| print('0 devices found - this is a likely sign that something went wrong') | |
| else: | |
| prefix = re.compile('^(AC200M|AC300|EP500P|EP500)\d+$') | |
| bluetti_devices = [d for d in devices if prefix.match(d.name)] | |
| for d in bluetti_devices: | |
| print(f'Found {d.name}: address {d.address}') | |
| def log_packet(output, data, command): | |
| log_entry = { | |
| 'type': 'client', | |
| 'time': time.strftime('%Y-%m-%d %H:%M:%S %z', time.localtime()), | |
| 'data': base64.b64encode(data).decode('ascii'), | |
| 'command': base64.b64encode(bytes(command)).decode('ascii'), | |
| } | |
| output.write(json.dumps(log_entry) + '\n') | |
| async def log(address, path): | |
| print(f'Connecting to {address}...') | |
| device = BluetoothPowerStation(address) | |
| asyncio.get_running_loop().create_task(device.run()) | |
| commands = [ | |
| QueryRangeCommand(0x00, 0x00, 0x46), | |
| QueryRangeCommand(0x00, 0x46, 0x42), | |
| QueryRangeCommand(0x00, 0x88, 0x4a), | |
| QueryRangeCommand(0x0B, 0xB9, 0x3D) | |
| ] | |
| with open(path, 'a') as log_file: | |
| for command in itertools.cycle(commands): | |
| if not device.is_connected: | |
| await asyncio.sleep(1) | |
| continue | |
| result_future = await device.perform(command) | |
| try: | |
| result = await result_future | |
| log_packet(log_file, result, command) | |
| except ParseError: | |
| print('Got a parse exception...') | |
| except BadConnectionError as err: | |
| print(f'Needed to disconnect due to error: {err}') | |
| parser = argparse.ArgumentParser( | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| description='Scans for Bluetti devices and logs information', | |
| epilog=textwrap.dedent("""\ | |
| To use, run the scanner first: | |
| %(prog)s --scan | |
| Once you have found your device you can run the logger: | |
| %(prog)s --log log-file.log 00:11:22:33:44:55 | |
| """)) | |
| parser.add_argument( | |
| '--scan', | |
| action='store_true', | |
| help='Scans for devices and prints out addresses') | |
| parser.add_argument( | |
| '--log', | |
| metavar='PATH', | |
| help='Connect and log data for the device to the given file') | |
| parser.add_argument( | |
| 'address', | |
| metavar='ADDRESS', | |
| nargs='?', | |
| help='The device MAC to connect to for logging') | |
| args = parser.parse_args() | |
| if args.scan: | |
| asyncio.run(scan()) | |
| elif args.log: | |
| asyncio.run(log(args.address, args.log)) | |
| else: | |
| parser.print_help() |