Skip to content

Instantly share code, notes, and snippets.

@jlian
Created November 15, 2025 19:32
Show Gist options
  • Select an option

  • Save jlian/18f98ee679106d552b3f378d77afeb39 to your computer and use it in GitHub Desktop.

Select an option

Save jlian/18f98ee679106d552b3f378d77afeb39 to your computer and use it in GitHub Desktop.
Python script to for Raspberry Pi to monitor the HDMI-CEC bus and turn on the the Denon receiver when a playback source is active
#!/usr/bin/env python3
import subprocess
import sys
import time
from datetime import datetime, timedelta
from threading import Thread
# ---- Configuration ---------------------------------------------------------
# Logical address of the TV and the AVR (CEC standard values)
LOGICAL_TV = 0x0
LOGICAL_AUDIO = 0x5
# Playback device logical addresses (4, 8, 9, B) per CEC spec
PLAYBACK_LOGICALS = {0x4, 0x8, 0x9, 0xB}
# How long after seeing Active Source we are willing to send a SAMR
PENDING_TIMEOUT_SEC = 0.5
# If we’ve seen a real Set System Audio Mode (5f:72:01) recently,
# we don’t need to help – Denon is already being handled by something else.
IGNORE_AFTER_REAL_SAM_SEC = 10
# Toggle this to test behavior without actually sending CEC commands
DRY_RUN = False
# Path to cec-client, adjust if needed
CEC_CLIENT_BIN = "/usr/bin/cec-client"
# ---------------------------------------------------------------------------
def log(prefix, msg):
"""Simple timestamped logger."""
now = datetime.now().strftime("%H:%M:%S")
print(f"[{prefix} {now}] {msg}", flush=True)
class CECWatcher:
"""
Wraps a single long-lived cec-client process.
- Reads stdout to observe TRAFFIC.
- Writes commands to stdin (e.g. 'tx 15:70:00:00').
"""
def __init__(self, bin_path=CEC_CLIENT_BIN):
self.bin_path = bin_path
self.proc = None
self._start_proc()
# State: last time we saw the Denon declare System Audio Mode
self.last_real_sam = None
# State: pending console wakeup we might help with
self.pending_console = None # (logical_addr, phys_addr, timestamp)
def _start_proc(self):
"""
Spawn cec-client in interactive mode with TRAFFIC logging.
"""
log("INFO", f"Starting cec-client: {self.bin_path}")
# -d 8 = TRAFFIC, no -m so we can transmit
self.proc = subprocess.Popen(
[self.bin_path, "-d", "8"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1, # line-buffered
)
def send_command(self, cmd: str):
"""
Send a single command line (e.g. 'tx 15:70:00:00') to cec-client.
"""
if self.proc.poll() is not None:
log("ERROR", "cec-client process has exited, cannot send command.")
return
line = cmd.strip() + "\n"
log("AUTO", f"Sending: {line.strip()}")
try:
self.proc.stdin.write(line)
self.proc.stdin.flush()
except Exception as e:
log("ERROR", f"Failed to write to cec-client stdin: {e}")
def _parse_traffic_line(self, line: str):
"""
Parse a TRAFFIC line like:
'TRAFFIC: [...] >> bf:82:36:00'
Returns (src_logical, dst_logical, opcode, params_bytes)
or None if parsing fails.
"""
try:
# Only care about lines with '>> ' (received message)
marker = ">> "
if marker not in line:
return None
payload = line.split(marker, 1)[1].strip()
# Example payload: 'bf:82:36:00'
parts = payload.split(":")
if len(parts) < 2:
return None
srcdst = parts[0]
if len(srcdst) != 2:
return None
src = int(srcdst[0], 16)
dst = int(srcdst[1], 16)
opcode = int(parts[1], 16)
params = [int(p, 16) for p in parts[2:]] if len(parts) > 2 else []
return src, dst, opcode, params
except Exception:
# Anything weird, we just ignore the line
return None
def _handle_message(self, src, dst, opcode, params):
"""
React to parsed CEC messages.
"""
# 1) Track when we see a real Denon Set System Audio Mode (5f:72:01)
if src == LOGICAL_AUDIO and opcode == 0x72:
# params[0] == 0x01 means "system audio mode on"
mode = params[0] if params else None
if mode == 0x01:
self.last_real_sam = datetime.now()
log("INFO", f"Detected Denon Set System Audio Mode (5f:72:01).")
# 2) Watch for Active Source (0x82) from playback/console devices
if opcode == 0x82:
phys_str = (
f"{params[0]:02x}:{params[1]:02x}" if len(params) >= 2 else "??:??"
)
if src in PLAYBACK_LOGICALS:
now = datetime.now()
self.pending_console = (src, phys_str, now)
log(
"AUTO",
f"Playback/console at logical {src:X} became Active Source, "
f"phys {phys_str}.",
)
self._maybe_help_with_system_audio()
else:
# Clear any pending console entries if some other device becomes active
self.pending_console = None
def _maybe_help_with_system_audio(self):
"""
Decide whether to send System Audio Mode Request (SAMR) now.
"""
if not self.pending_console:
return
src, phys_str, t0 = self.pending_console
# Respect timeout: only act immediately after the console's Active Source
elapsed = (datetime.now() - t0).total_seconds()
if elapsed > PENDING_TIMEOUT_SEC:
log(
"AUTO",
f"Pending console Active Source {phys_str} is too old "
f"({elapsed:.2f}s, limit {PENDING_TIMEOUT_SEC}s). Skipping.",
)
self.pending_console = None
return
# If we saw the Denon set System Audio Mode recently (e.g. Apple TV path),
# don't do anything – the system is already configured.
if self.last_real_sam:
since_sam = (datetime.now() - self.last_real_sam).total_seconds()
if since_sam < IGNORE_AFTER_REAL_SAM_SEC:
log(
"AUTO",
"Recently saw real Set System Audio Mode from Denon; "
"assuming audio is already handled. Skipping.",
)
self.pending_console = None
return
# At this point, we think the console became Active Source and
# nobody has taken care of system audio yet. Time to help.
cmd = "tx 15:70:00:00" # TV (1) -> Audio (5): System Audio Mode Request
if DRY_RUN:
log("AUTO", f"[DRY RUN] Would send: {cmd}")
else:
self.send_command(cmd)
# Clear pending state so we don't spam the bus
self.pending_console = None
def run(self):
"""
Main loop: read cec-client output line by line and handle traffic.
"""
log("INFO", "Starting CEC auto-audio loop.")
log("INFO", f"DRY_RUN = {DRY_RUN}")
log("INFO", "Monitoring for:")
log("INFO", " - Denon Set System Audio Mode (5f:72:01)")
log(
"INFO",
" - Active Source from any Playback device (4/8/9/B), "
"then optionally sending System Audio Mode Request (15:70:00:00)",
)
# cec-client dumps both logs and our own echo to a single stream (stdout).
for raw_line in self.proc.stdout:
line = raw_line.rstrip("\n")
# Pass through all cec-client lines to journald for debugging
# (or comment this out if you want it quieter)
print(line, flush=True)
# Only parse TRAFFIC lines
if "TRAFFIC:" not in line:
continue
parsed = self._parse_traffic_line(line)
if parsed is None:
continue
src, dst, opcode, params = parsed
self._handle_message(src, dst, opcode, params)
# If we ever break out of the loop, cec-client probably died.
log("ERROR", "cec-client stdout ended; exiting watcher.")
def main():
watcher = CECWatcher()
watcher.run()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
log("INFO", "Interrupted, exiting.")
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment