Created
January 25, 2026 23:54
-
-
Save solarkennedy/1c8d8eb96da9e5c74dc1eeea86abd853 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Aquaris/XMG OASIS BLE Controller | |
| Reads settings from TCC's user.conf and applies them to the device. | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from enum import IntEnum | |
| from dataclasses import dataclass | |
| from bleak import BleakClient, BleakScanner | |
| from bleak.exc import BleakError | |
| # Nordic UART Service UUIDs | |
| SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" | |
| TX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" | |
| RX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" | |
| # Command bytes | |
| CMD_RESET = 0x19 | |
| CMD_FAN = 0x1b | |
| CMD_PUMP = 0x1c | |
| CMD_RGB = 0x1e | |
| # Device model names to look for during scanning | |
| DEVICE_NAMES = ["LCT21001", "LCT22002"] | |
| class RGBState(IntEnum): | |
| STATIC = 0x00 | |
| BREATHE = 0x01 | |
| RAINBOW = 0x02 | |
| BREATHE_RAINBOW = 0x03 | |
| class PumpVoltage(IntEnum): | |
| V11 = 0x00 | |
| V12 = 0x01 | |
| V7 = 0x02 | |
| V8 = 0x03 | |
| @dataclass | |
| class AquarisState: | |
| device_uuid: str = None | |
| red: int = 255 | |
| green: int = 0 | |
| blue: int = 0 | |
| led_mode: int = RGBState.STATIC | |
| fan_duty_cycle: int = 50 | |
| pump_duty_cycle: int = 60 | |
| pump_voltage: int = PumpVoltage.V8 | |
| led_on: bool = True | |
| fan_on: bool = True | |
| pump_on: bool = True | |
| @classmethod | |
| def from_tcc_config(cls, config_path: Path = None) -> "AquarisState": | |
| """Load state from TCC's user.conf file.""" | |
| if config_path is None: | |
| config_path = Path.home() / ".tcc" / "user.conf" | |
| if not config_path.exists(): | |
| print(f"Config file not found: {config_path}") | |
| return cls() | |
| with open(config_path) as f: | |
| config = json.load(f) | |
| if "aquarisSaveState" not in config: | |
| print("No aquarisSaveState found in config, using defaults") | |
| return cls() | |
| state_json = json.loads(config["aquarisSaveState"]) | |
| return cls( | |
| device_uuid=state_json.get("deviceUUID"), | |
| red=state_json.get("red", 255), | |
| green=state_json.get("green", 0), | |
| blue=state_json.get("blue", 0), | |
| led_mode=state_json.get("ledMode", RGBState.STATIC), | |
| fan_duty_cycle=state_json.get("fanDutyCycle", 50), | |
| pump_duty_cycle=state_json.get("pumpDutyCycle", 60), | |
| pump_voltage=state_json.get("pumpVoltage", PumpVoltage.V8), | |
| led_on=state_json.get("ledOn", True), | |
| fan_on=state_json.get("fanOn", True), | |
| pump_on=state_json.get("pumpOn", True), | |
| ) | |
| def make_command(cmd: int, mode: int, p1: int, p2: int, p3: int, p4: int) -> bytes: | |
| """Build an 8-byte Aquaris command frame.""" | |
| return bytes([0xFE, cmd, mode, p1, p2, p3, p4, 0xEF]) | |
| def rgb_on(r: int, g: int, b: int, mode: int = RGBState.STATIC) -> bytes: | |
| """LED on command.""" | |
| return make_command(CMD_RGB, 0x01, r, g, b, mode) | |
| def rgb_off() -> bytes: | |
| """LED off command.""" | |
| return make_command(CMD_RGB, 0x00, 0, 0, 0, 0) | |
| def fan_on(duty_percent: int) -> bytes: | |
| """Fan on command with duty cycle (0-100).""" | |
| return make_command(CMD_FAN, 0x01, duty_percent, 0, 0, 0) | |
| def fan_off() -> bytes: | |
| """Fan off command.""" | |
| return make_command(CMD_FAN, 0x00, 0, 0, 0, 0) | |
| def pump_on(duty_percent: int, voltage: int = PumpVoltage.V8) -> bytes: | |
| """Pump on command with duty cycle and voltage.""" | |
| return make_command(CMD_PUMP, 0x01, duty_percent, voltage, 0, 0) | |
| def pump_off() -> bytes: | |
| """Pump off command.""" | |
| return make_command(CMD_PUMP, 0x00, 0, 0, 0, 0) | |
| def reset_cmd() -> bytes: | |
| """Reset command (sent on disconnect).""" | |
| return make_command(CMD_RESET, 0x00, 0x01, 0, 0, 0) | |
| async def find_aquaris(target_uuid: str = None, timeout: float = 10.0) -> str: | |
| """ | |
| Scan for Aquaris devices. | |
| If target_uuid is provided, wait for that specific device. | |
| Otherwise, return the first Aquaris device found. | |
| """ | |
| print(f"Scanning for Aquaris devices (timeout: {timeout}s)...") | |
| if target_uuid: | |
| # Try direct connection first if we have a known UUID | |
| print(f"Looking for device: {target_uuid}") | |
| devices = await BleakScanner.discover(timeout=timeout) | |
| for d in devices: | |
| if target_uuid and d.address.upper() == target_uuid.upper(): | |
| print(f"Found target device: {d.name} ({d.address})") | |
| return d.address | |
| if d.name and any(name in d.name for name in DEVICE_NAMES): | |
| print(f"Found Aquaris: {d.name} ({d.address})") | |
| if not target_uuid: | |
| return d.address | |
| return None | |
| async def apply_state(client: BleakClient, state: AquarisState): | |
| """Apply the full state to the connected device.""" | |
| print("\nApplying settings...") | |
| # LED | |
| if state.led_on: | |
| cmd = rgb_on(state.red, state.green, state.blue, state.led_mode) | |
| await client.write_gatt_char(TX_CHAR_UUID, cmd) | |
| mode_name = RGBState(state.led_mode).name if state.led_mode <= 3 else str(state.led_mode) | |
| print(f" LED: ON - RGB({state.red}, {state.green}, {state.blue}) mode={mode_name}") | |
| else: | |
| await client.write_gatt_char(TX_CHAR_UUID, rgb_off()) | |
| print(" LED: OFF") | |
| await asyncio.sleep(0.1) # Small delay between commands | |
| # Fan | |
| if state.fan_on: | |
| await client.write_gatt_char(TX_CHAR_UUID, fan_on(state.fan_duty_cycle)) | |
| print(f" Fan: ON - {state.fan_duty_cycle}%") | |
| else: | |
| await client.write_gatt_char(TX_CHAR_UUID, fan_off()) | |
| print(" Fan: OFF") | |
| await asyncio.sleep(0.1) | |
| # Pump | |
| if state.pump_on: | |
| await client.write_gatt_char(TX_CHAR_UUID, pump_on(state.pump_duty_cycle, state.pump_voltage)) | |
| voltage_name = PumpVoltage(state.pump_voltage).name if state.pump_voltage <= 3 else str(state.pump_voltage) | |
| print(f" Pump: ON - {state.pump_duty_cycle}% @ {voltage_name}") | |
| else: | |
| await client.write_gatt_char(TX_CHAR_UUID, pump_off()) | |
| print(" Pump: OFF") | |
| print("\nSettings applied successfully!") | |
| async def main(): | |
| # Load state from TCC config | |
| state = AquarisState.from_tcc_config() | |
| print("=" * 50) | |
| print("Aquaris Controller") | |
| print("=" * 50) | |
| print(f"\nLoaded settings from ~/.tcc/user.conf:") | |
| print(f" Device UUID: {state.device_uuid}") | |
| print(f" LED: {'ON' if state.led_on else 'OFF'} - RGB({state.red}, {state.green}, {state.blue})") | |
| print(f" Fan: {'ON' if state.fan_on else 'OFF'} - {state.fan_duty_cycle}%") | |
| print(f" Pump: {'ON' if state.pump_on else 'OFF'} - {state.pump_duty_cycle}%") | |
| # Find device | |
| address = await find_aquaris(target_uuid=state.device_uuid) | |
| if not address: | |
| print("\nERROR: No Aquaris device found!") | |
| print("Make sure the device is powered on and the blue LED is blinking.") | |
| sys.exit(1) | |
| # Connect and apply settings | |
| print(f"\nConnecting to {address}...") | |
| try: | |
| async with BleakClient(address, timeout=10.0) as client: | |
| print(f"Connected!") | |
| await apply_state(client, state) | |
| # Keep connection alive | |
| print("\nConnection active. Press Ctrl+C to disconnect...") | |
| try: | |
| while True: | |
| # Check if still connected | |
| if not client.is_connected: | |
| print("\nConnection lost!") | |
| break | |
| await asyncio.sleep(1) | |
| except KeyboardInterrupt: | |
| print("\n\nDisconnecting...") | |
| # Send reset command before disconnecting | |
| await client.write_gatt_char(TX_CHAR_UUID, reset_cmd()) | |
| print("Sent reset command.") | |
| except BleakError as e: | |
| print(f"\nBluetooth error: {e}") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"\nError: {e}") | |
| sys.exit(1) | |
| print("Disconnected.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment