Skip to content

Instantly share code, notes, and snippets.

@solarkennedy
Created January 25, 2026 23:54
Show Gist options
  • Select an option

  • Save solarkennedy/1c8d8eb96da9e5c74dc1eeea86abd853 to your computer and use it in GitHub Desktop.

Select an option

Save solarkennedy/1c8d8eb96da9e5c74dc1eeea86abd853 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Aquaris/XMG OASIS BLE Controller
Reads settings from TCC's user.conf and applies them to the device.
"""
import asyncio
import json
import os
import sys
from pathlib import Path
from enum import IntEnum
from dataclasses import dataclass
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError
# Nordic UART Service UUIDs
SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
TX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
RX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
# Command bytes
CMD_RESET = 0x19
CMD_FAN = 0x1b
CMD_PUMP = 0x1c
CMD_RGB = 0x1e
# Device model names to look for during scanning
DEVICE_NAMES = ["LCT21001", "LCT22002"]
class RGBState(IntEnum):
STATIC = 0x00
BREATHE = 0x01
RAINBOW = 0x02
BREATHE_RAINBOW = 0x03
class PumpVoltage(IntEnum):
V11 = 0x00
V12 = 0x01
V7 = 0x02
V8 = 0x03
@dataclass
class AquarisState:
device_uuid: str = None
red: int = 255
green: int = 0
blue: int = 0
led_mode: int = RGBState.STATIC
fan_duty_cycle: int = 50
pump_duty_cycle: int = 60
pump_voltage: int = PumpVoltage.V8
led_on: bool = True
fan_on: bool = True
pump_on: bool = True
@classmethod
def from_tcc_config(cls, config_path: Path = None) -> "AquarisState":
"""Load state from TCC's user.conf file."""
if config_path is None:
config_path = Path.home() / ".tcc" / "user.conf"
if not config_path.exists():
print(f"Config file not found: {config_path}")
return cls()
with open(config_path) as f:
config = json.load(f)
if "aquarisSaveState" not in config:
print("No aquarisSaveState found in config, using defaults")
return cls()
state_json = json.loads(config["aquarisSaveState"])
return cls(
device_uuid=state_json.get("deviceUUID"),
red=state_json.get("red", 255),
green=state_json.get("green", 0),
blue=state_json.get("blue", 0),
led_mode=state_json.get("ledMode", RGBState.STATIC),
fan_duty_cycle=state_json.get("fanDutyCycle", 50),
pump_duty_cycle=state_json.get("pumpDutyCycle", 60),
pump_voltage=state_json.get("pumpVoltage", PumpVoltage.V8),
led_on=state_json.get("ledOn", True),
fan_on=state_json.get("fanOn", True),
pump_on=state_json.get("pumpOn", True),
)
def make_command(cmd: int, mode: int, p1: int, p2: int, p3: int, p4: int) -> bytes:
"""Build an 8-byte Aquaris command frame."""
return bytes([0xFE, cmd, mode, p1, p2, p3, p4, 0xEF])
def rgb_on(r: int, g: int, b: int, mode: int = RGBState.STATIC) -> bytes:
"""LED on command."""
return make_command(CMD_RGB, 0x01, r, g, b, mode)
def rgb_off() -> bytes:
"""LED off command."""
return make_command(CMD_RGB, 0x00, 0, 0, 0, 0)
def fan_on(duty_percent: int) -> bytes:
"""Fan on command with duty cycle (0-100)."""
return make_command(CMD_FAN, 0x01, duty_percent, 0, 0, 0)
def fan_off() -> bytes:
"""Fan off command."""
return make_command(CMD_FAN, 0x00, 0, 0, 0, 0)
def pump_on(duty_percent: int, voltage: int = PumpVoltage.V8) -> bytes:
"""Pump on command with duty cycle and voltage."""
return make_command(CMD_PUMP, 0x01, duty_percent, voltage, 0, 0)
def pump_off() -> bytes:
"""Pump off command."""
return make_command(CMD_PUMP, 0x00, 0, 0, 0, 0)
def reset_cmd() -> bytes:
"""Reset command (sent on disconnect)."""
return make_command(CMD_RESET, 0x00, 0x01, 0, 0, 0)
async def find_aquaris(target_uuid: str = None, timeout: float = 10.0) -> str:
"""
Scan for Aquaris devices.
If target_uuid is provided, wait for that specific device.
Otherwise, return the first Aquaris device found.
"""
print(f"Scanning for Aquaris devices (timeout: {timeout}s)...")
if target_uuid:
# Try direct connection first if we have a known UUID
print(f"Looking for device: {target_uuid}")
devices = await BleakScanner.discover(timeout=timeout)
for d in devices:
if target_uuid and d.address.upper() == target_uuid.upper():
print(f"Found target device: {d.name} ({d.address})")
return d.address
if d.name and any(name in d.name for name in DEVICE_NAMES):
print(f"Found Aquaris: {d.name} ({d.address})")
if not target_uuid:
return d.address
return None
async def apply_state(client: BleakClient, state: AquarisState):
"""Apply the full state to the connected device."""
print("\nApplying settings...")
# LED
if state.led_on:
cmd = rgb_on(state.red, state.green, state.blue, state.led_mode)
await client.write_gatt_char(TX_CHAR_UUID, cmd)
mode_name = RGBState(state.led_mode).name if state.led_mode <= 3 else str(state.led_mode)
print(f" LED: ON - RGB({state.red}, {state.green}, {state.blue}) mode={mode_name}")
else:
await client.write_gatt_char(TX_CHAR_UUID, rgb_off())
print(" LED: OFF")
await asyncio.sleep(0.1) # Small delay between commands
# Fan
if state.fan_on:
await client.write_gatt_char(TX_CHAR_UUID, fan_on(state.fan_duty_cycle))
print(f" Fan: ON - {state.fan_duty_cycle}%")
else:
await client.write_gatt_char(TX_CHAR_UUID, fan_off())
print(" Fan: OFF")
await asyncio.sleep(0.1)
# Pump
if state.pump_on:
await client.write_gatt_char(TX_CHAR_UUID, pump_on(state.pump_duty_cycle, state.pump_voltage))
voltage_name = PumpVoltage(state.pump_voltage).name if state.pump_voltage <= 3 else str(state.pump_voltage)
print(f" Pump: ON - {state.pump_duty_cycle}% @ {voltage_name}")
else:
await client.write_gatt_char(TX_CHAR_UUID, pump_off())
print(" Pump: OFF")
print("\nSettings applied successfully!")
async def main():
# Load state from TCC config
state = AquarisState.from_tcc_config()
print("=" * 50)
print("Aquaris Controller")
print("=" * 50)
print(f"\nLoaded settings from ~/.tcc/user.conf:")
print(f" Device UUID: {state.device_uuid}")
print(f" LED: {'ON' if state.led_on else 'OFF'} - RGB({state.red}, {state.green}, {state.blue})")
print(f" Fan: {'ON' if state.fan_on else 'OFF'} - {state.fan_duty_cycle}%")
print(f" Pump: {'ON' if state.pump_on else 'OFF'} - {state.pump_duty_cycle}%")
# Find device
address = await find_aquaris(target_uuid=state.device_uuid)
if not address:
print("\nERROR: No Aquaris device found!")
print("Make sure the device is powered on and the blue LED is blinking.")
sys.exit(1)
# Connect and apply settings
print(f"\nConnecting to {address}...")
try:
async with BleakClient(address, timeout=10.0) as client:
print(f"Connected!")
await apply_state(client, state)
# Keep connection alive
print("\nConnection active. Press Ctrl+C to disconnect...")
try:
while True:
# Check if still connected
if not client.is_connected:
print("\nConnection lost!")
break
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n\nDisconnecting...")
# Send reset command before disconnecting
await client.write_gatt_char(TX_CHAR_UUID, reset_cmd())
print("Sent reset command.")
except BleakError as e:
print(f"\nBluetooth error: {e}")
sys.exit(1)
except Exception as e:
print(f"\nError: {e}")
sys.exit(1)
print("Disconnected.")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment