Created
June 18, 2024 07:26
-
-
Save Jaakkonen/545bc1471f6976c86f1acbd7dd4c8863 to your computer and use it in GitHub Desktop.
Minimal DBus Wire protocol/format implementation and packet creator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Opens a DBus connection and does equivalent of | |
| busctl --user call org.freedesktop.Notifications /org/freedesktop/Notifications org.freedesktop.Notifications Notify 'susssasa{sv}i' "" 0 "ata" "ata" "ata" 1 "kiisu" 1 "hint" u 10 1000 | |
| """ | |
| import socket | |
| import struct | |
| import os | |
| from pathlib import Path | |
| import sys | |
| import enum | |
| from typing import TypedDict, Unpack, cast | |
| from dataclasses import dataclass | |
| XDG_RUNTIME_DIR = Path(os.environ.get('XDG_RUNTIME_DIR', '/run/user/1000')) | |
| DBUS_USER_SOCK = XDG_RUNTIME_DIR / 'bus' | |
| """ | |
| $ socat -v -x UNIX-LISTEN:/tmp/bus,fork UNIX-CONNECT:/run/user/1000/bus | |
| > 2024/06/17 15:51:59.000307067 length=48 from=0 to=47 | |
| 00 41 55 54 48 20 45 58 54 45 52 4e 41 4c 0d 0a .AUTH EXTERNAL.. | |
| 44 41 54 41 0d 0a DATA.. | |
| 4e 45 47 4f 54 49 41 54 45 5f 55 4e 49 58 5f 46 NEGOTIATE_UNIX_F | |
| 44 0d 0a D.. | |
| 42 45 47 49 4e 0d 0a BEGIN.. | |
| -- | |
| < 2024/06/17 15:51:59.000307449 length=58 from=0 to=57 | |
| 44 41 54 41 0d 0a DATA.. | |
| 4f 4b 20 30 38 61 61 61 34 33 34 30 66 31 34 37 OK 08aaa4340f147 | |
| 39 38 62 30 30 30 38 39 39 36 66 61 33 66 34 64 98b0008996fa3f4d | |
| 61 35 37 0d 0a a57.. | |
| 41 47 52 45 45 5f 55 4e 49 58 5f 46 44 0d 0a AGREE_UNIX_FD.. | |
| -- | |
| > 2024/06/17 15:51:59.000307860 length=128 from=48 to=175 | |
| 6c 01 00 01 00 00 00 00 01 00 00 00 6d 00 00 00 l...........m... | |
| 01 01 6f 00 15 00 00 00 2f 6f 72 67 2f 66 72 65 ..o...../org/fre | |
| 65 64 65 73 6b 74 6f 70 2f 44 42 75 73 00 00 00 edesktop/DBus... | |
| 03 01 73 00 05 00 00 00 48 65 6c 6c 6f 00 00 00 ..s.....Hello... | |
| 02 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free | |
| 64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00 desktop.DBus.... | |
| 06 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free | |
| 64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00 desktop.DBus.... | |
| -- | |
| < 2024/06/17 15:51:59.000308714 length=262 from=58 to=319 | |
| 6c 02 01 01 0b 00 00 00 ff ff ff ff 3f 00 00 00 l...........?... | |
| 05 01 75 00 01 00 00 00 07 01 73 00 14 00 00 00 ..u.......s..... | |
| 6f 72 67 2e 66 72 65 65 64 65 73 6b 74 6f 70 2e org.freedesktop. | |
| 44 42 75 73 00 00 00 00 06 01 73 00 06 00 00 00 DBus......s..... | |
| 3a 31 2e 33 38 36 00 00 08 01 67 00 01 73 00 00 :1.386....g..s.. | |
| 06 00 00 00 3a 31 2e 33 38 36 00 6c 04 01 01 0b ....:1.386.l.... | |
| 00 00 00 ff ff ff ff 8f 00 00 00 07 01 73 00 14 .............s.. | |
| 00 00 00 6f 72 67 2e 66 72 65 65 64 65 73 6b 74 ...org.freedeskt | |
| 6f 70 2e 44 42 75 73 00 00 00 00 06 01 73 00 06 op.DBus......s.. | |
| 00 00 00 3a 31 2e 33 38 36 00 00 01 01 6f 00 15 ...:1.386....o.. | |
| 00 00 00 2f 6f 72 67 2f 66 72 65 65 64 65 73 6b .../org/freedesk | |
| 74 6f 70 2f 44 42 75 73 00 00 00 02 01 73 00 14 top/DBus.....s.. | |
| 00 00 00 6f 72 67 2e 66 72 65 65 64 65 73 6b 74 ...org.freedeskt | |
| 6f 70 2e 44 42 75 73 00 00 00 00 03 01 73 00 0c op.DBus......s.. | |
| 00 00 00 4e 61 6d 65 41 63 71 75 69 72 65 64 00 ...NameAcquired. | |
| 00 00 00 08 01 67 00 01 73 00 00 06 00 00 00 3a .....g..s......: | |
| 31 2e 33 38 36 00 1.386. | |
| -- | |
| > 2024/06/17 15:51:59.000309976 length=276 from=176 to=451 | |
| 6c 01 04 01 64 00 00 00 02 00 00 00 9b 00 00 00 l...d........... | |
| 01 01 6f 00 1e 00 00 00 2f 6f 72 67 2f 66 72 65 ..o...../org/fre | |
| 65 64 65 73 6b 74 6f 70 2f 4e 6f 74 69 66 69 63 edesktop/Notific | |
| 61 74 69 6f 6e 73 00 00 03 01 73 00 06 00 00 00 ations....s..... | |
| 4e 6f 74 69 66 79 00 00 02 01 73 00 1d 00 00 00 Notify....s..... | |
| 6f 72 67 2e 66 72 65 65 64 65 73 6b 74 6f 70 2e org.freedesktop. | |
| 4e 6f 74 69 66 69 63 61 74 69 6f 6e 73 00 00 00 Notifications... | |
| 06 01 73 00 1d 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free | |
| 64 65 73 6b 74 6f 70 2e 4e 6f 74 69 66 69 63 61 desktop.Notifica | |
| 74 69 6f 6e 73 00 00 00 08 01 67 00 0d 73 75 73 tions.....g..sus | |
| 73 73 61 73 61 7b 73 76 7d 69 00 00 00 00 00 00 ssasa{sv}i...... | |
| 08 00 00 00 61 70 70 5f 6e 61 6d 65 00 00 00 00 ....app_name.... | |
| 00 00 00 00 08 00 00 00 61 70 70 5f 69 63 6f 6e ........app_icon | |
| 00 00 00 00 05 00 00 00 74 69 74 6c 65 00 00 00 ........title... | |
| 04 00 00 00 62 6f 64 79 00 00 00 00 0c 00 00 00 ....body........ | |
| 07 00 00 00 61 63 74 69 6f 6e 31 00 10 00 00 00 ....action1..... | |
| 04 00 00 00 68 69 6e 74 00 01 75 00 0a ....hint..u.. | |
| 00 00 00 e8 03 00 00 ....... | |
| -- | |
| < 2024/06/17 15:51:59.000333150 length=68 from=320 to=387 | |
| 6c 02 01 01 04 00 00 00 7b 01 00 00 2d 00 00 00 l.......{...-... | |
| 06 01 73 00 06 00 00 00 3a 31 2e 33 38 36 00 00 ..s.....:1.386.. | |
| 08 01 67 00 01 75 00 00 05 01 75 00 02 00 00 00 ..g..u....u..... | |
| 07 01 73 00 04 00 00 00 3a 31 2e 36 00 00 00 00 ..s.....:1.6.... | |
| 24 00 00 00 $... | |
| -- | |
| < 2024/06/17 15:51:59.000334292 length=171 from=388 to=558 | |
| 6c 04 01 01 0b 00 00 00 ff ff ff ff 8f 00 00 00 l............... | |
| 07 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free | |
| 64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00 desktop.DBus.... | |
| 06 01 73 00 06 00 00 00 3a 31 2e 33 38 36 00 00 ..s.....:1.386.. | |
| 01 01 6f 00 15 00 00 00 2f 6f 72 67 2f 66 72 65 ..o...../org/fre | |
| 65 64 65 73 6b 74 6f 70 2f 44 42 75 73 00 00 00 edesktop/DBus... | |
| 02 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free | |
| 64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00 desktop.DBus.... | |
| 03 01 73 00 08 00 00 00 4e 61 6d 65 4c 6f 73 74 ..s.....NameLost | |
| 00 00 00 00 00 00 00 00 08 01 67 00 01 73 00 00 ..........g..s.. | |
| 06 00 00 00 3a 31 2e 33 38 36 00 ....:1.386. | |
| -- | |
| 2024/06/17 15:51:59 socat[36654] E write(6, 0x62708ab48000, 171): Broken pipe | |
| $ DBUS_SESSION_BUS_ADDRESS=unix:path=/tmp/bus busctl --user call org.freedesktop.Notifications /org/freedesktop/Notifications org.freedesktop.Notifications Notify 'susssasa{sv}i' "app_name" 0 "app_icon" "title" "body" 1 "action1" 1 "hint" u 10 1000 | |
| u 36 | |
| """ | |
| # Open the socket | |
| sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| sock.connect(str(DBUS_USER_SOCK)) | |
| # Send the authentication message | |
| sock.sendmsg([b'\0']) # This could have extra auth data with it | |
| sock.send(b'AUTH EXTERNAL\r\nDATA\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n') | |
| # Read the response | |
| response = sock.recv(1024) | |
| # Parse the response | |
| rows = response.removesuffix(b'\r\n').split(b'\r\n') | |
| data, ok, fd = rows | |
| assert data == b'DATA' | |
| assert ok.startswith(b'OK ') | |
| assert fd == b'AGREE_UNIX_FD' | |
| guid = ok.removeprefix(b'OK ') | |
| # print(f"GUID: {guid}") | |
| endianness = b'l' | |
| method_type = b'\x01' # METHOD_CALL | |
| flags = b'\x00' # 0x01 = NO_REPLY_EXPECTED, 0x02 = NO_AUTO_START, 0x04 = ALLOW_INTERACTIVE_AUTHORIZATION | |
| protocol_version = b'\x01' | |
| message_body_length = b'\x00\x00\x00\x00' # 0 | |
| message_serial = struct.pack('<i', 1) # 1 | |
| msg = endianness + method_type + flags + protocol_version + message_body_length + message_serial | |
| header_1_field_code = b'\x01' # Path (object) | |
| header_1_variant_signature_length = b'\x01' | |
| header_1_variant_signature = b'o\x00' # object path | |
| header_1_value = b'/org/freedesktop/DBus\x00' # /org/freedesktop/DBus | |
| header_1_length = struct.pack('<i', len(header_1_value) - 1) # Length minus null terminator. | |
| header_1_nonpadded = header_1_field_code + header_1_variant_signature_length + header_1_variant_signature + header_1_length + header_1_value | |
| # Headers (as struct elements) must be padded to 8 bytes | |
| header_1 = header_1_nonpadded + b'\x00' * (8 - len(header_1_nonpadded) % 8) | |
| header_2_field_code = b'\x03' # Interface | |
| header_2_variant_signature_length = b'\x01' | |
| header_2_variant_signature = b's\x00' # string | |
| header_2_value = b'Hello\x00' # Hello | |
| header_2_length = struct.pack('<i', len(header_2_value) - 1) # Length minus null terminator. | |
| header_2_nonpadded = header_2_field_code + header_2_variant_signature_length + header_2_variant_signature + header_2_length + header_2_value | |
| header_2 = header_2_nonpadded + b'\x00' * (8 - len(header_2_nonpadded) % 8) | |
| header_3_field_code = b'\x02' # Interface | |
| header_3_variant_signature_length = b'\x01' | |
| header_3_variant_signature = b's\x00' # string | |
| header_3_value = b'org.freedesktop.DBus\x00' # org.freedesktop.DBus | |
| header_3_length = struct.pack('<i', len(header_3_value) - 1) # Length minus null terminator. | |
| header_3_nonpadded = header_3_field_code + header_3_variant_signature_length + header_3_variant_signature + header_3_length + header_3_value | |
| header_3 = header_3_nonpadded + b'\x00' * (8 - len(header_3_nonpadded) % 8) | |
| header_4_field_code = b'\x06' # Destination | |
| header_4_variant_signature_length = b'\x01' | |
| header_4_variant_signature = b's\x00' # string | |
| header_4_value = b'org.freedesktop.DBus\x00' # org.freedesktop.DBus | |
| header_4_length = struct.pack('<i', len(header_4_value) - 1) # Length minus null terminator. | |
| header_4_nonpadded = header_4_field_code + header_4_variant_signature_length + header_4_variant_signature + header_4_length + header_4_value | |
| header_4 = header_4_nonpadded + b'\x00' * (8 - len(header_4_nonpadded) % 8) | |
| headers_length = len(header_1) + len(header_2) + len(header_3) + len(header_4_nonpadded) | |
| # next is header field array | |
| array_length = struct.pack('<i', headers_length) | |
| msg += array_length | |
| msg += header_1 + header_2 + header_3 + header_4 | |
| # sys.stdout.buffer.write(msg) | |
| sock.send(msg) | |
| response = sock.recv(1024) | |
| extract = 'c' | |
| offset = 0 | |
| endianness, = struct.unpack_from(extract, response) | |
| assert endianness == b'l' # The following code assumes the struct is little endian | |
| offset += struct.calcsize(extract) | |
| extract = '<bbbiii' | |
| msgtype, msgflags, protocol_version, body_length, msg_serial, header_length = struct.unpack_from(extract, response, offset) | |
| offset += struct.calcsize(extract) | |
| assert msg_serial == -1 | |
| # Next up are the headers. Header types are always just 1 symbol so the header type can be read at once | |
| header_data = response[offset:offset+header_length+1] | |
| offset += header_length + 1 | |
| headers = [] | |
| header_offset = 0 | |
| while True: | |
| header_offset = (header_offset + 7) & ~7 | |
| if header_offset >= header_length: | |
| break | |
| extract = '<bbs' | |
| header_type, variant_signature_length, variant = struct.unpack_from(extract, header_data, header_offset) | |
| header_offset += struct.calcsize(extract) + len(variant) | |
| match variant: | |
| case b'o' | b's': | |
| string_length, = struct.unpack_from('<i', header_data, header_offset) | |
| header_offset += struct.calcsize('<i') | |
| value = header_data[header_offset:header_offset+string_length] | |
| header_offset += string_length | |
| case b'g': | |
| string_length, = struct.unpack_from('<b', header_data, header_offset) | |
| header_offset += struct.calcsize('<b') | |
| value = header_data[header_offset:header_offset+string_length] | |
| header_offset += string_length | |
| case b'u': | |
| value, = struct.unpack_from('<I', header_data, header_offset) | |
| header_offset += struct.calcsize('<I') | |
| case _: | |
| raise ValueError(f"Unexpected variant signature: {variant}") | |
| headers.append((header_type, variant, value)) | |
| class HeaderType(enum.Enum): | |
| PATH = 1 | |
| INTERFACE = 2 | |
| MEMBER = 3 | |
| ERROR_NAME = 4 | |
| REPLY_SERIAL = 5 | |
| DESTINATION = 6 | |
| SENDER = 7 | |
| SIGNATURE = 8 | |
| UNIX_FDS = 9 | |
| path = None | |
| interface = None # Exists for SIGNAL messages | |
| member = None # Exists for SIGNAL and METHOD_CALL messages | |
| error_name = None # Exists for ERROR messages | |
| reply_serial = None # Exists for METHOD_RETURN and ERROR messages | |
| destination = None # optional | |
| header_data = { | |
| HeaderType.PATH: None, | |
| HeaderType.INTERFACE: None, | |
| HeaderType.MEMBER: None, | |
| HeaderType.ERROR_NAME: None, | |
| HeaderType.REPLY_SERIAL: None, | |
| HeaderType.DESTINATION: None, | |
| HeaderType.SENDER: None, | |
| HeaderType.SIGNATURE: "", # Default assumed no body | |
| HeaderType.UNIX_FDS: 0, # Default assumed 0 | |
| } | |
| for header_type, variant, value in headers: | |
| header_data[HeaderType(header_type)] = value | |
| import enum | |
| class SignatureSymbol(enum.Enum): | |
| BYTE = 'y' | |
| BOOLEAN = 'b' | |
| INT16 = 'n' | |
| UINT16 = 'q' | |
| INT32 = 'i' | |
| UINT32 = 'u' | |
| INT64 = 'x' | |
| UINT64 = 't' | |
| DOUBLE = 'd' | |
| STRING = 's' | |
| OBJECT_PATH = 'o' | |
| SIGNATURE = 'g' | |
| UNIX_FD = 'h' | |
| ARRAY = 'a' | |
| VARIANT = 'v' | |
| STRUCT = 'r' | |
| DICT_ENTRY = 'e' | |
| TYPES = set(SignatureSymbol) - {SignatureSymbol.STRUCT, SignatureSymbol.DICT_ENTRY} | |
| TYPE_CHARS = {symbol.value for symbol in TYPES} | |
| BASIC_TYPES = TYPES - {SignatureSymbol.ARRAY, SignatureSymbol.VARIANT} | |
| class ArrayToken: | |
| value: "SignaturePart" | |
| def __init__(self, value): | |
| self.value = value | |
| def __repr__(self): | |
| return f'ArrayToken({self.value.__repr__()})' | |
| class DictToken(tuple): | |
| def __repr__(self): | |
| return f'DictToken{super().__repr__()}' | |
| class StructToken(tuple): | |
| def __repr__(self): | |
| return f'StructToken{super().__repr__()}' | |
| SignatureToken = SignatureSymbol | DictToken | StructToken | tuple["SignatureToken", ...] | |
| def tokenize_signature(signature: str) -> tuple[SignatureToken, ...]: | |
| if not signature: | |
| return () | |
| if signature[0] == '{': | |
| opened = 1 | |
| index = 1 | |
| while opened > 0: | |
| if signature[index] == '{': | |
| opened += 1 | |
| elif signature[index] == '}': | |
| opened -= 1 | |
| index += 1 | |
| subsignature: SignatureToken = (DictToken(tokenize_signature(signature[1:index-1])),) | |
| return subsignature + tokenize_signature(signature[index:]) | |
| if signature[0] == '}': | |
| raise ValueError('Unexpected "}"') | |
| if signature[0] == '(': | |
| opened = 1 | |
| index = 1 | |
| while opened > 0: | |
| if signature[index] == '(': | |
| opened += 1 | |
| elif signature[index] == ')': | |
| opened -= 1 | |
| index += 1 | |
| subsignature: SignatureToken = (StructToken(tokenize_signature(signature[1:index-1])),) | |
| return subsignature + tokenize_signature(signature[index:]) | |
| if signature[0] == ')': | |
| raise ValueError('Unexpected ")"') | |
| if signature[0] in TYPE_CHARS: | |
| return (SignatureSymbol(signature[0]), *tokenize_signature(signature[1:])) | |
| raise ValueError(f'Unknown signature symbol "{signature[0]}"') | |
| SignaturePart = SignatureSymbol | ArrayToken | DictToken | StructToken | |
| def parse_signature(signature: tuple[SignatureToken, ...] | str) -> tuple[SignaturePart, ...]: | |
| """ | |
| Removes array `a` definitions instead making the next token a array token. | |
| """ | |
| if isinstance(signature, str): | |
| return parse_signature(tokenize_signature(signature)) | |
| if not signature: | |
| return () | |
| if signature[0] == SignatureSymbol.ARRAY: | |
| return (ArrayToken(signature[1]), *parse_signature(signature[2:])) | |
| return (signature[0], *parse_signature(signature[1:])) | |
| # Parse the body according to the signature | |
| body = response[offset:] | |
| signature = parse_signature(header_data[HeaderType.SIGNATURE].decode()) | |
| body_offset = 0 | |
| resp = [] | |
| for part in signature: | |
| match part: | |
| case SignatureSymbol.STRING: | |
| string_length, = struct.unpack_from('<i', body, body_offset) | |
| body_offset += struct.calcsize('<i') | |
| value = body[body_offset:body_offset+string_length] | |
| body_offset += string_length | |
| resp.append(value) | |
| case _: | |
| raise ValueError(f"Unexpected signature: {part}") | |
| class Headers(TypedDict, total=False): | |
| path: bytes | |
| interface: bytes | |
| member: bytes | |
| error_name: bytes | |
| reply_serial: int | |
| destination: bytes | |
| sender: bytes | |
| signature: bytes | |
| unix_fds: int | |
| header_types: dict[str, tuple[HeaderType, bytes]] = { | |
| "path": (HeaderType.PATH, b'o'), | |
| "interface": (HeaderType.INTERFACE, b's'), | |
| "member": (HeaderType.MEMBER, b's'), | |
| "error_name": (HeaderType.ERROR_NAME, b's'), | |
| "reply_serial": (HeaderType.REPLY_SERIAL, b'u'), | |
| "destination": (HeaderType.DESTINATION, b's'), | |
| "sender": (HeaderType.SENDER, b's'), | |
| "signature": (HeaderType.SIGNATURE, b'g'), | |
| "unix_fds": (HeaderType.UNIX_FDS, b'u'), | |
| } | |
| def make_headers( | |
| **args: Unpack[Headers] | |
| ) -> tuple[int, bytes]: | |
| """ | |
| Returns header length and the padded headers | |
| """ | |
| out = b'' | |
| last_len = 0 | |
| for key, value in args.items(): | |
| header_type, variant = header_types[key] | |
| match variant: | |
| case b'o' | b's': | |
| value = cast(bytes, value) | |
| # 4 bytes for the length | |
| out += struct.pack('<bbs', header_type.value, 1, variant) + b'\x00' + struct.pack('<i', len(value)) + value + b'\0' | |
| case b'g': | |
| value = cast(bytes, value) | |
| # Only 1 byte for the length | |
| out += struct.pack('<bbs', header_type.value, 1, variant) + b'\x00' + struct.pack('<b', len(value)) + value + b'\0' | |
| case b'u': | |
| value = cast(int, value) | |
| # 4 bytes for the value | |
| out += struct.pack('<bbs', header_type.value, 1, variant) + b'\x00' + struct.pack('<I', value) | |
| last_len = len(out) | |
| out += b'\0' * (-last_len % 8) | |
| # TODO: Figure out if there's a problem with the string 4 byte alignment requirement. This far this has worked. | |
| # Remove the last padding | |
| return last_len, out | |
| def pad_to(body: bytes, alignment: int) -> bytes: | |
| body += b'\0' * (-len(body) % alignment) | |
| return body | |
| def construct_message( | |
| message_serial: int, | |
| body_signature: bytes, | |
| body: bytes, | |
| path: bytes = b'/org/freedesktop/DBus', | |
| member: bytes = b'Hello', | |
| interface: bytes = b'org.freedesktop.DBus', | |
| destination: bytes = b'org.freedesktop.DBus', | |
| ) -> bytes: | |
| endianness = b'l' | |
| method_type = b'\x01' # METHOD_CALL | |
| flags = b'\x04' # 0x01 = NO_REPLY_EXPECTED, 0x02 = NO_AUTO_START, 0x04 = ALLOW_INTERACTIVE_AUTHORIZATION | |
| protocol_version = b'\x01' | |
| message_body_length = len(body).to_bytes(4, 'little') | |
| _message_serial = struct.pack('<i', message_serial) | |
| headers_length, headers = make_headers( | |
| path=path, | |
| member=member, | |
| interface=interface, | |
| destination=destination, | |
| signature=body_signature, | |
| ) | |
| msg = endianness + method_type + flags + protocol_version + message_body_length + _message_serial + headers_length.to_bytes(4, 'little') + headers | |
| msg = pad_to(msg, 8) | |
| msg += body | |
| return msg | |
| @dataclass | |
| class Variant: | |
| signature: str | |
| value: object | |
| def encode_body( | |
| signature: str, | |
| *args: object | |
| ): | |
| body = bytearray() | |
| def pad_to(alignment: int): | |
| nonlocal body | |
| body += b'\0' * (-len(body) % alignment) | |
| def append_part(part: SignaturePart, arg: object): | |
| nonlocal body | |
| match part: | |
| case SignatureSymbol.STRING: | |
| arg = cast(str, arg) | |
| pad_to(4) | |
| body += len(arg).to_bytes(4, 'little') + arg.encode() + b'\0' | |
| # pad_to(4) | |
| case SignatureSymbol.UINT32: | |
| pad_to(4) | |
| arg = cast(int, arg) | |
| body += arg.to_bytes(4, 'little') | |
| pad_to(4) | |
| case SignatureSymbol.UINT16: | |
| pad_to(2) | |
| arg = cast(int, arg) | |
| body += arg.to_bytes(2, 'little') | |
| pad_to(2) | |
| case SignatureSymbol.UINT64: | |
| pad_to(8) | |
| arg = cast(int, arg) | |
| body += arg.to_bytes(8, 'little') | |
| pad_to(8) | |
| case SignatureSymbol.INT32: | |
| pad_to(4) | |
| arg = cast(int, arg) | |
| body += arg.to_bytes(4, 'little', signed=True) | |
| pad_to(4) | |
| case SignatureSymbol.INT16: | |
| pad_to(2) | |
| arg = cast(int, arg) | |
| body += arg.to_bytes(2, 'little', signed=True) | |
| pad_to(2) | |
| case SignatureSymbol.INT64: | |
| pad_to(8) | |
| arg = cast(int, arg) | |
| body += arg.to_bytes(8, 'little', signed=True) | |
| pad_to(8) | |
| case SignatureSymbol.DOUBLE: | |
| pad_to(8) | |
| arg = cast(float, arg) | |
| body += struct.pack('<d', arg) | |
| pad_to(8) | |
| case SignatureSymbol.BOOLEAN: | |
| arg = cast(bool, arg) | |
| body += b'\x01' if arg else b'\x00' | |
| case ArrayToken(): | |
| pad_to(4) | |
| arg = cast(tuple, arg) | |
| # Now we need to calculate the length of the array from the current position | |
| body += b'\0\0\0\0' # Placeholder for the length | |
| after_body = len(body) | |
| for item in arg: | |
| append_part(part.value, item) | |
| # Calculate the length of the array | |
| array_length = len(body) - after_body | |
| body[after_body-4:after_body] = array_length.to_bytes(4, 'little') | |
| case StructToken() | DictToken(): | |
| pad_to(8) | |
| arg = cast(tuple, arg) | |
| for subpart, item in zip(part, arg): | |
| append_part(subpart, item) | |
| # TODO: Should we pad here? | |
| case SignatureSymbol.VARIANT: | |
| arg = cast(Variant, arg) | |
| sub_signature = arg.signature | |
| sub_value = arg.value | |
| if not isinstance(sub_value, tuple): | |
| sub_value = (sub_value,) | |
| pad_to(1) | |
| body += len(sub_signature).to_bytes(1, 'little') + sub_signature.encode() + b'\0' | |
| for subpart, item in zip(parse_signature(sub_signature), sub_value): | |
| append_part(subpart, item) | |
| case _: | |
| raise ValueError(f"Unexpected signature: {part}") | |
| sig = parse_signature(signature) | |
| assert len(sig) == len(args), f"Signature length mismatch: {len(sig)} != {len(args)}" | |
| for part, arg in zip(sig, args): | |
| append_part(part, arg) | |
| return bytes(body) | |
| msg = construct_message(2, b'susssasa{sv}i', | |
| encode_body('susssasa{sv}i', | |
| "app_name", # s | |
| 0, # u | |
| "app_icon", # s | |
| "h4x0r", # s | |
| "greetings from manually crafted packets", # s | |
| ("action1",), # as | |
| (("hint", Variant('u', 10)),), # a{sv} | |
| 10_000 # i | |
| ), | |
| path=b'/org/freedesktop/Notifications', | |
| member=b'Notify', | |
| interface=b'org.freedesktop.Notifications', | |
| destination=b'org.freedesktop.Notifications', | |
| ) | |
| # sys.stdout.buffer.write(msg) | |
| sock.send(msg) | |
| resp = sock.recv(1024) | |
| def parse_packet( | |
| response: bytes | |
| ): | |
| extract = 'c' | |
| offset = 0 | |
| endianness, = struct.unpack_from(extract, response) | |
| assert endianness == b'l' # The following code assumes the struct is little endian | |
| offset += struct.calcsize(extract) | |
| extract = '<bbbiii' | |
| msgtype, msgflags, protocol_version, body_length, msg_serial, header_length = struct.unpack_from(extract, response, offset) | |
| offset += struct.calcsize(extract) | |
| # assert msg_serial == -1 | |
| # Next up are the headers. Header types are always just 1 symbol so the header type can be read at once | |
| header_data = response[offset:offset+header_length+1] | |
| offset += header_length + 1 | |
| headers = [] | |
| header_offset = 0 | |
| while True: | |
| header_offset = (header_offset + 7) & ~7 | |
| if header_offset >= header_length: | |
| break | |
| extract = '<bbs' | |
| header_type, variant_signature_length, variant = struct.unpack_from(extract, header_data, header_offset) | |
| header_offset += struct.calcsize(extract) + len(variant) | |
| match variant: | |
| case b'o' | b's': | |
| string_length, = struct.unpack_from('<i', header_data, header_offset) | |
| header_offset += struct.calcsize('<i') | |
| value = header_data[header_offset:header_offset+string_length] | |
| header_offset += string_length | |
| case b'g': | |
| string_length, = struct.unpack_from('<b', header_data, header_offset) | |
| header_offset += struct.calcsize('<b') | |
| value = header_data[header_offset:header_offset+string_length] | |
| header_offset += string_length | |
| case b'u': | |
| value, = struct.unpack_from('<I', header_data, header_offset) | |
| header_offset += struct.calcsize('<I') | |
| case _: | |
| raise ValueError(f"Unexpected variant signature: {variant}") | |
| headers.append((header_type, variant, value)) | |
| header_data = { | |
| HeaderType.PATH: None, | |
| HeaderType.INTERFACE: None, | |
| HeaderType.MEMBER: None, | |
| HeaderType.ERROR_NAME: None, | |
| HeaderType.REPLY_SERIAL: None, | |
| HeaderType.DESTINATION: None, | |
| HeaderType.SENDER: None, | |
| HeaderType.SIGNATURE: "", # Default assumed no body | |
| HeaderType.UNIX_FDS: 0, # Default assumed 0 | |
| } | |
| for header_type, variant, value in headers: | |
| header_data[HeaderType(header_type)] = value | |
| # Parse the body according to the signature | |
| # Body starts at 8 byte alignment | |
| offset = (offset + 7) & ~7 | |
| # body = response[offset:] | |
| signature = parse_signature(header_data[HeaderType.SIGNATURE].decode()) | |
| def parse_part(part: SignaturePart): | |
| nonlocal response | |
| nonlocal offset | |
| match part: | |
| case SignatureSymbol.STRING: | |
| # Align to 4 bytes | |
| offset = (offset + 3) & ~3 | |
| string_length, = struct.unpack_from('<i', response, offset) | |
| offset += struct.calcsize('<i') | |
| value = response[offset:offset+string_length+1] | |
| offset += string_length+1 # Include the null terminator | |
| return value | |
| case SignatureSymbol.UINT32: | |
| offset = (offset + 3) & ~3 | |
| value, = struct.unpack_from('<I', response, offset) | |
| offset += struct.calcsize('<I') | |
| return value | |
| case SignatureSymbol.UINT16: | |
| offset = (offset + 1) & ~1 | |
| value, = struct.unpack_from('<H', response, offset) | |
| offset += struct.calcsize('<H') | |
| return value | |
| case SignatureSymbol.UINT64: | |
| offset = (offset + 7) & ~7 | |
| value, = struct.unpack_from('<Q', response, offset) | |
| offset += struct.calcsize('<Q') | |
| return value | |
| case SignatureSymbol.INT32: | |
| offset = (offset + 3) & ~3 | |
| value, = struct.unpack_from('<i', response, offset) | |
| offset += struct.calcsize('<i') | |
| return value | |
| case SignatureSymbol.INT16: | |
| offset = (offset + 1) & ~1 | |
| value, = struct.unpack_from('<h', response, offset) | |
| offset += struct.calcsize('<h') | |
| return value | |
| case SignatureSymbol.INT64: | |
| offset = (offset + 7) & ~7 | |
| value, = struct.unpack_from('<q', response, offset) | |
| offset += struct.calcsize('<q') | |
| return value | |
| case SignatureSymbol.DOUBLE: | |
| offset = (offset + 7) & ~7 | |
| value, = struct.unpack_from('<d', response, offset) | |
| offset += struct.calcsize('<d') | |
| return value | |
| case SignatureSymbol.BOOLEAN: | |
| value, = struct.unpack_from('<?', response, offset) | |
| offset += struct.calcsize('<?') | |
| return value | |
| case ArrayToken(): | |
| offset = (offset + 3) & ~3 | |
| array_length, = struct.unpack_from('<I', response, offset) | |
| offset += struct.calcsize('<I') | |
| array = [] | |
| for _ in range(array_length): | |
| array.append(parse_part(part.value)) | |
| return array | |
| case StructToken() | DictToken(): | |
| offset = (offset + 7) & ~7 | |
| stru = [] | |
| for subpart in part: | |
| stru.append(parse_part(subpart)) | |
| return stru | |
| out = [] | |
| for part in signature: | |
| out.append(parse_part(part)) | |
| return out | |
| parse_packet(resp) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment