Created
November 15, 2025 19:32
-
-
Save jlian/18f98ee679106d552b3f378d77afeb39 to your computer and use it in GitHub Desktop.
Python script to for Raspberry Pi to monitor the HDMI-CEC bus and turn on the the Denon receiver when a playback source is active
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import subprocess | |
| import sys | |
| import time | |
| from datetime import datetime, timedelta | |
| from threading import Thread | |
| # ---- Configuration --------------------------------------------------------- | |
| # Logical address of the TV and the AVR (CEC standard values) | |
| LOGICAL_TV = 0x0 | |
| LOGICAL_AUDIO = 0x5 | |
| # Playback device logical addresses (4, 8, 9, B) per CEC spec | |
| PLAYBACK_LOGICALS = {0x4, 0x8, 0x9, 0xB} | |
| # How long after seeing Active Source we are willing to send a SAMR | |
| PENDING_TIMEOUT_SEC = 0.5 | |
| # If we’ve seen a real Set System Audio Mode (5f:72:01) recently, | |
| # we don’t need to help – Denon is already being handled by something else. | |
| IGNORE_AFTER_REAL_SAM_SEC = 10 | |
| # Toggle this to test behavior without actually sending CEC commands | |
| DRY_RUN = False | |
| # Path to cec-client, adjust if needed | |
| CEC_CLIENT_BIN = "/usr/bin/cec-client" | |
| # --------------------------------------------------------------------------- | |
| def log(prefix, msg): | |
| """Simple timestamped logger.""" | |
| now = datetime.now().strftime("%H:%M:%S") | |
| print(f"[{prefix} {now}] {msg}", flush=True) | |
| class CECWatcher: | |
| """ | |
| Wraps a single long-lived cec-client process. | |
| - Reads stdout to observe TRAFFIC. | |
| - Writes commands to stdin (e.g. 'tx 15:70:00:00'). | |
| """ | |
| def __init__(self, bin_path=CEC_CLIENT_BIN): | |
| self.bin_path = bin_path | |
| self.proc = None | |
| self._start_proc() | |
| # State: last time we saw the Denon declare System Audio Mode | |
| self.last_real_sam = None | |
| # State: pending console wakeup we might help with | |
| self.pending_console = None # (logical_addr, phys_addr, timestamp) | |
| def _start_proc(self): | |
| """ | |
| Spawn cec-client in interactive mode with TRAFFIC logging. | |
| """ | |
| log("INFO", f"Starting cec-client: {self.bin_path}") | |
| # -d 8 = TRAFFIC, no -m so we can transmit | |
| self.proc = subprocess.Popen( | |
| [self.bin_path, "-d", "8"], | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, # line-buffered | |
| ) | |
| def send_command(self, cmd: str): | |
| """ | |
| Send a single command line (e.g. 'tx 15:70:00:00') to cec-client. | |
| """ | |
| if self.proc.poll() is not None: | |
| log("ERROR", "cec-client process has exited, cannot send command.") | |
| return | |
| line = cmd.strip() + "\n" | |
| log("AUTO", f"Sending: {line.strip()}") | |
| try: | |
| self.proc.stdin.write(line) | |
| self.proc.stdin.flush() | |
| except Exception as e: | |
| log("ERROR", f"Failed to write to cec-client stdin: {e}") | |
| def _parse_traffic_line(self, line: str): | |
| """ | |
| Parse a TRAFFIC line like: | |
| 'TRAFFIC: [...] >> bf:82:36:00' | |
| Returns (src_logical, dst_logical, opcode, params_bytes) | |
| or None if parsing fails. | |
| """ | |
| try: | |
| # Only care about lines with '>> ' (received message) | |
| marker = ">> " | |
| if marker not in line: | |
| return None | |
| payload = line.split(marker, 1)[1].strip() | |
| # Example payload: 'bf:82:36:00' | |
| parts = payload.split(":") | |
| if len(parts) < 2: | |
| return None | |
| srcdst = parts[0] | |
| if len(srcdst) != 2: | |
| return None | |
| src = int(srcdst[0], 16) | |
| dst = int(srcdst[1], 16) | |
| opcode = int(parts[1], 16) | |
| params = [int(p, 16) for p in parts[2:]] if len(parts) > 2 else [] | |
| return src, dst, opcode, params | |
| except Exception: | |
| # Anything weird, we just ignore the line | |
| return None | |
| def _handle_message(self, src, dst, opcode, params): | |
| """ | |
| React to parsed CEC messages. | |
| """ | |
| # 1) Track when we see a real Denon Set System Audio Mode (5f:72:01) | |
| if src == LOGICAL_AUDIO and opcode == 0x72: | |
| # params[0] == 0x01 means "system audio mode on" | |
| mode = params[0] if params else None | |
| if mode == 0x01: | |
| self.last_real_sam = datetime.now() | |
| log("INFO", f"Detected Denon Set System Audio Mode (5f:72:01).") | |
| # 2) Watch for Active Source (0x82) from playback/console devices | |
| if opcode == 0x82: | |
| phys_str = ( | |
| f"{params[0]:02x}:{params[1]:02x}" if len(params) >= 2 else "??:??" | |
| ) | |
| if src in PLAYBACK_LOGICALS: | |
| now = datetime.now() | |
| self.pending_console = (src, phys_str, now) | |
| log( | |
| "AUTO", | |
| f"Playback/console at logical {src:X} became Active Source, " | |
| f"phys {phys_str}.", | |
| ) | |
| self._maybe_help_with_system_audio() | |
| else: | |
| # Clear any pending console entries if some other device becomes active | |
| self.pending_console = None | |
| def _maybe_help_with_system_audio(self): | |
| """ | |
| Decide whether to send System Audio Mode Request (SAMR) now. | |
| """ | |
| if not self.pending_console: | |
| return | |
| src, phys_str, t0 = self.pending_console | |
| # Respect timeout: only act immediately after the console's Active Source | |
| elapsed = (datetime.now() - t0).total_seconds() | |
| if elapsed > PENDING_TIMEOUT_SEC: | |
| log( | |
| "AUTO", | |
| f"Pending console Active Source {phys_str} is too old " | |
| f"({elapsed:.2f}s, limit {PENDING_TIMEOUT_SEC}s). Skipping.", | |
| ) | |
| self.pending_console = None | |
| return | |
| # If we saw the Denon set System Audio Mode recently (e.g. Apple TV path), | |
| # don't do anything – the system is already configured. | |
| if self.last_real_sam: | |
| since_sam = (datetime.now() - self.last_real_sam).total_seconds() | |
| if since_sam < IGNORE_AFTER_REAL_SAM_SEC: | |
| log( | |
| "AUTO", | |
| "Recently saw real Set System Audio Mode from Denon; " | |
| "assuming audio is already handled. Skipping.", | |
| ) | |
| self.pending_console = None | |
| return | |
| # At this point, we think the console became Active Source and | |
| # nobody has taken care of system audio yet. Time to help. | |
| cmd = "tx 15:70:00:00" # TV (1) -> Audio (5): System Audio Mode Request | |
| if DRY_RUN: | |
| log("AUTO", f"[DRY RUN] Would send: {cmd}") | |
| else: | |
| self.send_command(cmd) | |
| # Clear pending state so we don't spam the bus | |
| self.pending_console = None | |
| def run(self): | |
| """ | |
| Main loop: read cec-client output line by line and handle traffic. | |
| """ | |
| log("INFO", "Starting CEC auto-audio loop.") | |
| log("INFO", f"DRY_RUN = {DRY_RUN}") | |
| log("INFO", "Monitoring for:") | |
| log("INFO", " - Denon Set System Audio Mode (5f:72:01)") | |
| log( | |
| "INFO", | |
| " - Active Source from any Playback device (4/8/9/B), " | |
| "then optionally sending System Audio Mode Request (15:70:00:00)", | |
| ) | |
| # cec-client dumps both logs and our own echo to a single stream (stdout). | |
| for raw_line in self.proc.stdout: | |
| line = raw_line.rstrip("\n") | |
| # Pass through all cec-client lines to journald for debugging | |
| # (or comment this out if you want it quieter) | |
| print(line, flush=True) | |
| # Only parse TRAFFIC lines | |
| if "TRAFFIC:" not in line: | |
| continue | |
| parsed = self._parse_traffic_line(line) | |
| if parsed is None: | |
| continue | |
| src, dst, opcode, params = parsed | |
| self._handle_message(src, dst, opcode, params) | |
| # If we ever break out of the loop, cec-client probably died. | |
| log("ERROR", "cec-client stdout ended; exiting watcher.") | |
| def main(): | |
| watcher = CECWatcher() | |
| watcher.run() | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| log("INFO", "Interrupted, exiting.") | |
| sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment