|
import os |
|
import time |
|
import struct |
|
import logging |
|
|
|
from threading import Thread |
|
from threading import Event as ThreadEvent |
|
|
|
from subprocess import Popen, PIPE |
|
# All the device buttons are part of event0, which appears as a |
|
# buttons along the edge are: 1, 2, 3, 4, m |
|
# next to the knob: ESC |
|
# knob click: Enter |
|
# Turning the knob is a separate device, event1, which also appears as a |
|
# turning the knob corresponds to the left and right arrow keys |
|
|
|
|
|
DEV_BUTTONS = '/dev/input/event0' |
|
DEV_KNOB = '/dev/input/event1' |
|
|
|
|
|
# for event0, these are the keycodes for buttons |
|
BUTTONS_CODE_MAP = { |
|
2: '1', |
|
3: '2', |
|
4: '3', |
|
5: '4', |
|
50: 'm', |
|
28: 'ENTER', |
|
1: 'ESC', |
|
} |
|
|
|
# for event1, when the knob is turned it is always keycode 6, but value changes on direction |
|
KNOB_LEFT = 4294967295 # actually -1 but unsigned int so wraps around |
|
KNOB_RIGHT = 1 |
|
|
|
# https://github.com/torvalds/linux/blob/v5.5-rc5/include/uapi/linux/input.h#L28 |
|
# long int, long int, unsigned short, unsigned short, unsigned int |
|
EVENT_FORMAT = 'llHHI' |
|
EVENT_SIZE = struct.calcsize(EVENT_FORMAT) |
|
|
|
logformat = logging.Formatter( |
|
'%(created)f %(levelname)s [%(filename)s:%(lineno)d]: %(message)s') |
|
logger = logging.getLogger('buttons') |
|
logger.setLevel(logging.DEBUG) |
|
|
|
fh = logging.FileHandler('/var/log/buttons.log') |
|
fh.setLevel(logging.DEBUG) |
|
fh.setFormatter(logformat) |
|
logger.addHandler(fh) |
|
|
|
ch = logging.StreamHandler() |
|
ch.setLevel(logging.DEBUG) |
|
ch.setFormatter(logformat) |
|
logger.addHandler(ch) |
|
|
|
|
|
def translate_event(etype: int, code: int, value: int) -> str: |
|
""" |
|
Translate combination of type, code, value into string representing button pressed |
|
""" |
|
if etype == 1: |
|
# button press |
|
if code in BUTTONS_CODE_MAP: |
|
xdotool_cmd_name = "keydown" if value == 1 else "keyup" |
|
return BUTTONS_CODE_MAP[code], xdotool_cmd_name |
|
if etype == 2: |
|
if code == 6: |
|
# knob turn |
|
if value == KNOB_RIGHT: |
|
return 'RIGHT', "key" |
|
if value == KNOB_LEFT: |
|
return 'LEFT', "key" |
|
return 'UNKNOWN' |
|
|
|
|
|
|
|
|
|
class xdotoolContext(): |
|
""" |
|
Context for handling a subprocess shell to send button events as keyboard input with xdotool cmds |
|
Mapped for https://wadcmd.com/ default keybinds with a couple buttons mapped to just numbers |
|
""" |
|
def __init__(self) -> None: |
|
self.button_mapping = { |
|
"1": "Up", |
|
"2": "Down", |
|
"3": "3", |
|
"4": "4", |
|
"m": "Shift_L", |
|
"ESC": "Space", |
|
"LEFT": "Left", |
|
"RIGHT": "Right", |
|
"ENTER": "Control_L" |
|
} |
|
self.setup_xdotool_subprocess() |
|
|
|
def setup_xdotool_subprocess(self): |
|
self.process = Popen( |
|
"/bin/bash", |
|
stdin=PIPE, |
|
stdout=PIPE, |
|
stderr=PIPE, |
|
text=True, # Use text mode to avoid dealing with binary data |
|
shell=True |
|
) |
|
self.process.stdin.write('export DISPLAY=:0\n') |
|
self.process.stdin.flush() |
|
|
|
def send_xdotool_key(self, key, cmd): |
|
self.process.stdin.write( |
|
f"xdotool {cmd} {key}\n" |
|
) |
|
self.process.stdin.flush() |
|
|
|
def handle_button(self, key_event): |
|
key = self.button_mapping[key_event[0]] # get keyboard key |
|
cmd = key_event[1] |
|
|
|
if key_event[0] in ["LEFT", "RIGHT"]: |
|
self.send_xdotool_key(key, "keydown") |
|
|
|
# debounce on knob is 20ms :( |
|
# https://github.com/spsgsb/kernel-common/commit/dabe814e3af18e9574ac9afb8033d303e43b006f |
|
time.sleep(0.018) # 18ms feels better for input, presumably due to python overhead, etc |
|
self.send_xdotool_key(key, "keyup") |
|
else: |
|
self.send_xdotool_key(key, cmd) |
|
|
|
|
|
class EventListener(): |
|
""" |
|
Listen to a specific /dev/eventX and call handle_button |
|
""" |
|
|
|
def __init__(self, device: str) -> None: |
|
self.device = device |
|
self.stopper = ThreadEvent() |
|
self.thread: Thread = None |
|
self.start() |
|
|
|
def start(self): |
|
""" |
|
Start listening thread |
|
""" |
|
logger.info(f'Starting listener for {self.device}') |
|
self.thread = Thread(target=self.listen, daemon=True) |
|
self.thread.start() |
|
|
|
def stop(self): |
|
""" |
|
Stop listening thread |
|
""" |
|
logger.info(f'Stopping listener for {self.device}') |
|
self.stopper.set() |
|
self.thread.join() |
|
|
|
def listen(self): |
|
""" |
|
To run in thread, listen for events and call handle_buttons if applicable |
|
""" |
|
with open(self.device, "rb") as in_file: |
|
xdotool_handler = xdotoolContext() |
|
event = in_file.read(EVENT_SIZE) |
|
while event and not self.stopper.is_set(): |
|
if self.stopper.is_set(): |
|
break |
|
(_sec, _usec, etype, code, value) = struct.unpack( |
|
EVENT_FORMAT, event) |
|
# logger.info(f'Event: type: {etype}, code: {code}, value:{value}') |
|
key_event = translate_event(etype, code, value) |
|
if key_event[0] in ['1', '2', '3', '4', 'm', 'ENTER', 'ESC', 'LEFT', 'RIGHT']: |
|
xdotool_handler.handle_button(key_event) |
|
event = in_file.read(EVENT_SIZE) |
|
|
|
|
|
if __name__ == '__main__': |
|
logger.info('Starting buttons listeners') |
|
|
|
EventListener(DEV_BUTTONS) |
|
EventListener(DEV_KNOB) |
|
|
|
while True: |
|
time.sleep(1) |