Skip to content

Instantly share code, notes, and snippets.

@Jaakkonen
Created June 18, 2024 07:26
Show Gist options
  • Select an option

  • Save Jaakkonen/545bc1471f6976c86f1acbd7dd4c8863 to your computer and use it in GitHub Desktop.

Select an option

Save Jaakkonen/545bc1471f6976c86f1acbd7dd4c8863 to your computer and use it in GitHub Desktop.
Minimal DBus Wire protocol/format implementation and packet creator
"""
Opens a DBus connection and does equivalent of
busctl --user call org.freedesktop.Notifications /org/freedesktop/Notifications org.freedesktop.Notifications Notify 'susssasa{sv}i' "" 0 "ata" "ata" "ata" 1 "kiisu" 1 "hint" u 10 1000
"""
import socket
import struct
import os
from pathlib import Path
import sys
import enum
from typing import TypedDict, Unpack, cast
from dataclasses import dataclass
XDG_RUNTIME_DIR = Path(os.environ.get('XDG_RUNTIME_DIR', '/run/user/1000'))
DBUS_USER_SOCK = XDG_RUNTIME_DIR / 'bus'
"""
$ socat -v -x UNIX-LISTEN:/tmp/bus,fork UNIX-CONNECT:/run/user/1000/bus
> 2024/06/17 15:51:59.000307067 length=48 from=0 to=47
00 41 55 54 48 20 45 58 54 45 52 4e 41 4c 0d 0a .AUTH EXTERNAL..
44 41 54 41 0d 0a DATA..
4e 45 47 4f 54 49 41 54 45 5f 55 4e 49 58 5f 46 NEGOTIATE_UNIX_F
44 0d 0a D..
42 45 47 49 4e 0d 0a BEGIN..
--
< 2024/06/17 15:51:59.000307449 length=58 from=0 to=57
44 41 54 41 0d 0a DATA..
4f 4b 20 30 38 61 61 61 34 33 34 30 66 31 34 37 OK 08aaa4340f147
39 38 62 30 30 30 38 39 39 36 66 61 33 66 34 64 98b0008996fa3f4d
61 35 37 0d 0a a57..
41 47 52 45 45 5f 55 4e 49 58 5f 46 44 0d 0a AGREE_UNIX_FD..
--
> 2024/06/17 15:51:59.000307860 length=128 from=48 to=175
6c 01 00 01 00 00 00 00 01 00 00 00 6d 00 00 00 l...........m...
01 01 6f 00 15 00 00 00 2f 6f 72 67 2f 66 72 65 ..o...../org/fre
65 64 65 73 6b 74 6f 70 2f 44 42 75 73 00 00 00 edesktop/DBus...
03 01 73 00 05 00 00 00 48 65 6c 6c 6f 00 00 00 ..s.....Hello...
02 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free
64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00 desktop.DBus....
06 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free
64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00 desktop.DBus....
--
< 2024/06/17 15:51:59.000308714 length=262 from=58 to=319
6c 02 01 01 0b 00 00 00 ff ff ff ff 3f 00 00 00 l...........?...
05 01 75 00 01 00 00 00 07 01 73 00 14 00 00 00 ..u.......s.....
6f 72 67 2e 66 72 65 65 64 65 73 6b 74 6f 70 2e org.freedesktop.
44 42 75 73 00 00 00 00 06 01 73 00 06 00 00 00 DBus......s.....
3a 31 2e 33 38 36 00 00 08 01 67 00 01 73 00 00 :1.386....g..s..
06 00 00 00 3a 31 2e 33 38 36 00 6c 04 01 01 0b ....:1.386.l....
00 00 00 ff ff ff ff 8f 00 00 00 07 01 73 00 14 .............s..
00 00 00 6f 72 67 2e 66 72 65 65 64 65 73 6b 74 ...org.freedeskt
6f 70 2e 44 42 75 73 00 00 00 00 06 01 73 00 06 op.DBus......s..
00 00 00 3a 31 2e 33 38 36 00 00 01 01 6f 00 15 ...:1.386....o..
00 00 00 2f 6f 72 67 2f 66 72 65 65 64 65 73 6b .../org/freedesk
74 6f 70 2f 44 42 75 73 00 00 00 02 01 73 00 14 top/DBus.....s..
00 00 00 6f 72 67 2e 66 72 65 65 64 65 73 6b 74 ...org.freedeskt
6f 70 2e 44 42 75 73 00 00 00 00 03 01 73 00 0c op.DBus......s..
00 00 00 4e 61 6d 65 41 63 71 75 69 72 65 64 00 ...NameAcquired.
00 00 00 08 01 67 00 01 73 00 00 06 00 00 00 3a .....g..s......:
31 2e 33 38 36 00 1.386.
--
> 2024/06/17 15:51:59.000309976 length=276 from=176 to=451
6c 01 04 01 64 00 00 00 02 00 00 00 9b 00 00 00 l...d...........
01 01 6f 00 1e 00 00 00 2f 6f 72 67 2f 66 72 65 ..o...../org/fre
65 64 65 73 6b 74 6f 70 2f 4e 6f 74 69 66 69 63 edesktop/Notific
61 74 69 6f 6e 73 00 00 03 01 73 00 06 00 00 00 ations....s.....
4e 6f 74 69 66 79 00 00 02 01 73 00 1d 00 00 00 Notify....s.....
6f 72 67 2e 66 72 65 65 64 65 73 6b 74 6f 70 2e org.freedesktop.
4e 6f 74 69 66 69 63 61 74 69 6f 6e 73 00 00 00 Notifications...
06 01 73 00 1d 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free
64 65 73 6b 74 6f 70 2e 4e 6f 74 69 66 69 63 61 desktop.Notifica
74 69 6f 6e 73 00 00 00 08 01 67 00 0d 73 75 73 tions.....g..sus
73 73 61 73 61 7b 73 76 7d 69 00 00 00 00 00 00 ssasa{sv}i......
08 00 00 00 61 70 70 5f 6e 61 6d 65 00 00 00 00 ....app_name....
00 00 00 00 08 00 00 00 61 70 70 5f 69 63 6f 6e ........app_icon
00 00 00 00 05 00 00 00 74 69 74 6c 65 00 00 00 ........title...
04 00 00 00 62 6f 64 79 00 00 00 00 0c 00 00 00 ....body........
07 00 00 00 61 63 74 69 6f 6e 31 00 10 00 00 00 ....action1.....
04 00 00 00 68 69 6e 74 00 01 75 00 0a ....hint..u..
00 00 00 e8 03 00 00 .......
--
< 2024/06/17 15:51:59.000333150 length=68 from=320 to=387
6c 02 01 01 04 00 00 00 7b 01 00 00 2d 00 00 00 l.......{...-...
06 01 73 00 06 00 00 00 3a 31 2e 33 38 36 00 00 ..s.....:1.386..
08 01 67 00 01 75 00 00 05 01 75 00 02 00 00 00 ..g..u....u.....
07 01 73 00 04 00 00 00 3a 31 2e 36 00 00 00 00 ..s.....:1.6....
24 00 00 00 $...
--
< 2024/06/17 15:51:59.000334292 length=171 from=388 to=558
6c 04 01 01 0b 00 00 00 ff ff ff ff 8f 00 00 00 l...............
07 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free
64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00 desktop.DBus....
06 01 73 00 06 00 00 00 3a 31 2e 33 38 36 00 00 ..s.....:1.386..
01 01 6f 00 15 00 00 00 2f 6f 72 67 2f 66 72 65 ..o...../org/fre
65 64 65 73 6b 74 6f 70 2f 44 42 75 73 00 00 00 edesktop/DBus...
02 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65 ..s.....org.free
64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00 desktop.DBus....
03 01 73 00 08 00 00 00 4e 61 6d 65 4c 6f 73 74 ..s.....NameLost
00 00 00 00 00 00 00 00 08 01 67 00 01 73 00 00 ..........g..s..
06 00 00 00 3a 31 2e 33 38 36 00 ....:1.386.
--
2024/06/17 15:51:59 socat[36654] E write(6, 0x62708ab48000, 171): Broken pipe
$ DBUS_SESSION_BUS_ADDRESS=unix:path=/tmp/bus busctl --user call org.freedesktop.Notifications /org/freedesktop/Notifications org.freedesktop.Notifications Notify 'susssasa{sv}i' "app_name" 0 "app_icon" "title" "body" 1 "action1" 1 "hint" u 10 1000
u 36
"""
# Open the socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(DBUS_USER_SOCK))
# Send the authentication message
sock.sendmsg([b'\0']) # This could have extra auth data with it
sock.send(b'AUTH EXTERNAL\r\nDATA\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n')
# Read the response
response = sock.recv(1024)
# Parse the response
rows = response.removesuffix(b'\r\n').split(b'\r\n')
data, ok, fd = rows
assert data == b'DATA'
assert ok.startswith(b'OK ')
assert fd == b'AGREE_UNIX_FD'
guid = ok.removeprefix(b'OK ')
# print(f"GUID: {guid}")
endianness = b'l'
method_type = b'\x01' # METHOD_CALL
flags = b'\x00' # 0x01 = NO_REPLY_EXPECTED, 0x02 = NO_AUTO_START, 0x04 = ALLOW_INTERACTIVE_AUTHORIZATION
protocol_version = b'\x01'
message_body_length = b'\x00\x00\x00\x00' # 0
message_serial = struct.pack('<i', 1) # 1
msg = endianness + method_type + flags + protocol_version + message_body_length + message_serial
header_1_field_code = b'\x01' # Path (object)
header_1_variant_signature_length = b'\x01'
header_1_variant_signature = b'o\x00' # object path
header_1_value = b'/org/freedesktop/DBus\x00' # /org/freedesktop/DBus
header_1_length = struct.pack('<i', len(header_1_value) - 1) # Length minus null terminator.
header_1_nonpadded = header_1_field_code + header_1_variant_signature_length + header_1_variant_signature + header_1_length + header_1_value
# Headers (as struct elements) must be padded to 8 bytes
header_1 = header_1_nonpadded + b'\x00' * (8 - len(header_1_nonpadded) % 8)
header_2_field_code = b'\x03' # Interface
header_2_variant_signature_length = b'\x01'
header_2_variant_signature = b's\x00' # string
header_2_value = b'Hello\x00' # Hello
header_2_length = struct.pack('<i', len(header_2_value) - 1) # Length minus null terminator.
header_2_nonpadded = header_2_field_code + header_2_variant_signature_length + header_2_variant_signature + header_2_length + header_2_value
header_2 = header_2_nonpadded + b'\x00' * (8 - len(header_2_nonpadded) % 8)
header_3_field_code = b'\x02' # Interface
header_3_variant_signature_length = b'\x01'
header_3_variant_signature = b's\x00' # string
header_3_value = b'org.freedesktop.DBus\x00' # org.freedesktop.DBus
header_3_length = struct.pack('<i', len(header_3_value) - 1) # Length minus null terminator.
header_3_nonpadded = header_3_field_code + header_3_variant_signature_length + header_3_variant_signature + header_3_length + header_3_value
header_3 = header_3_nonpadded + b'\x00' * (8 - len(header_3_nonpadded) % 8)
header_4_field_code = b'\x06' # Destination
header_4_variant_signature_length = b'\x01'
header_4_variant_signature = b's\x00' # string
header_4_value = b'org.freedesktop.DBus\x00' # org.freedesktop.DBus
header_4_length = struct.pack('<i', len(header_4_value) - 1) # Length minus null terminator.
header_4_nonpadded = header_4_field_code + header_4_variant_signature_length + header_4_variant_signature + header_4_length + header_4_value
header_4 = header_4_nonpadded + b'\x00' * (8 - len(header_4_nonpadded) % 8)
headers_length = len(header_1) + len(header_2) + len(header_3) + len(header_4_nonpadded)
# next is header field array
array_length = struct.pack('<i', headers_length)
msg += array_length
msg += header_1 + header_2 + header_3 + header_4
# sys.stdout.buffer.write(msg)
sock.send(msg)
response = sock.recv(1024)
extract = 'c'
offset = 0
endianness, = struct.unpack_from(extract, response)
assert endianness == b'l' # The following code assumes the struct is little endian
offset += struct.calcsize(extract)
extract = '<bbbiii'
msgtype, msgflags, protocol_version, body_length, msg_serial, header_length = struct.unpack_from(extract, response, offset)
offset += struct.calcsize(extract)
assert msg_serial == -1
# Next up are the headers. Header types are always just 1 symbol so the header type can be read at once
header_data = response[offset:offset+header_length+1]
offset += header_length + 1
headers = []
header_offset = 0
while True:
header_offset = (header_offset + 7) & ~7
if header_offset >= header_length:
break
extract = '<bbs'
header_type, variant_signature_length, variant = struct.unpack_from(extract, header_data, header_offset)
header_offset += struct.calcsize(extract) + len(variant)
match variant:
case b'o' | b's':
string_length, = struct.unpack_from('<i', header_data, header_offset)
header_offset += struct.calcsize('<i')
value = header_data[header_offset:header_offset+string_length]
header_offset += string_length
case b'g':
string_length, = struct.unpack_from('<b', header_data, header_offset)
header_offset += struct.calcsize('<b')
value = header_data[header_offset:header_offset+string_length]
header_offset += string_length
case b'u':
value, = struct.unpack_from('<I', header_data, header_offset)
header_offset += struct.calcsize('<I')
case _:
raise ValueError(f"Unexpected variant signature: {variant}")
headers.append((header_type, variant, value))
class HeaderType(enum.Enum):
PATH = 1
INTERFACE = 2
MEMBER = 3
ERROR_NAME = 4
REPLY_SERIAL = 5
DESTINATION = 6
SENDER = 7
SIGNATURE = 8
UNIX_FDS = 9
path = None
interface = None # Exists for SIGNAL messages
member = None # Exists for SIGNAL and METHOD_CALL messages
error_name = None # Exists for ERROR messages
reply_serial = None # Exists for METHOD_RETURN and ERROR messages
destination = None # optional
header_data = {
HeaderType.PATH: None,
HeaderType.INTERFACE: None,
HeaderType.MEMBER: None,
HeaderType.ERROR_NAME: None,
HeaderType.REPLY_SERIAL: None,
HeaderType.DESTINATION: None,
HeaderType.SENDER: None,
HeaderType.SIGNATURE: "", # Default assumed no body
HeaderType.UNIX_FDS: 0, # Default assumed 0
}
for header_type, variant, value in headers:
header_data[HeaderType(header_type)] = value
import enum
class SignatureSymbol(enum.Enum):
BYTE = 'y'
BOOLEAN = 'b'
INT16 = 'n'
UINT16 = 'q'
INT32 = 'i'
UINT32 = 'u'
INT64 = 'x'
UINT64 = 't'
DOUBLE = 'd'
STRING = 's'
OBJECT_PATH = 'o'
SIGNATURE = 'g'
UNIX_FD = 'h'
ARRAY = 'a'
VARIANT = 'v'
STRUCT = 'r'
DICT_ENTRY = 'e'
TYPES = set(SignatureSymbol) - {SignatureSymbol.STRUCT, SignatureSymbol.DICT_ENTRY}
TYPE_CHARS = {symbol.value for symbol in TYPES}
BASIC_TYPES = TYPES - {SignatureSymbol.ARRAY, SignatureSymbol.VARIANT}
class ArrayToken:
value: "SignaturePart"
def __init__(self, value):
self.value = value
def __repr__(self):
return f'ArrayToken({self.value.__repr__()})'
class DictToken(tuple):
def __repr__(self):
return f'DictToken{super().__repr__()}'
class StructToken(tuple):
def __repr__(self):
return f'StructToken{super().__repr__()}'
SignatureToken = SignatureSymbol | DictToken | StructToken | tuple["SignatureToken", ...]
def tokenize_signature(signature: str) -> tuple[SignatureToken, ...]:
if not signature:
return ()
if signature[0] == '{':
opened = 1
index = 1
while opened > 0:
if signature[index] == '{':
opened += 1
elif signature[index] == '}':
opened -= 1
index += 1
subsignature: SignatureToken = (DictToken(tokenize_signature(signature[1:index-1])),)
return subsignature + tokenize_signature(signature[index:])
if signature[0] == '}':
raise ValueError('Unexpected "}"')
if signature[0] == '(':
opened = 1
index = 1
while opened > 0:
if signature[index] == '(':
opened += 1
elif signature[index] == ')':
opened -= 1
index += 1
subsignature: SignatureToken = (StructToken(tokenize_signature(signature[1:index-1])),)
return subsignature + tokenize_signature(signature[index:])
if signature[0] == ')':
raise ValueError('Unexpected ")"')
if signature[0] in TYPE_CHARS:
return (SignatureSymbol(signature[0]), *tokenize_signature(signature[1:]))
raise ValueError(f'Unknown signature symbol "{signature[0]}"')
SignaturePart = SignatureSymbol | ArrayToken | DictToken | StructToken
def parse_signature(signature: tuple[SignatureToken, ...] | str) -> tuple[SignaturePart, ...]:
"""
Removes array `a` definitions instead making the next token a array token.
"""
if isinstance(signature, str):
return parse_signature(tokenize_signature(signature))
if not signature:
return ()
if signature[0] == SignatureSymbol.ARRAY:
return (ArrayToken(signature[1]), *parse_signature(signature[2:]))
return (signature[0], *parse_signature(signature[1:]))
# Parse the body according to the signature
body = response[offset:]
signature = parse_signature(header_data[HeaderType.SIGNATURE].decode())
body_offset = 0
resp = []
for part in signature:
match part:
case SignatureSymbol.STRING:
string_length, = struct.unpack_from('<i', body, body_offset)
body_offset += struct.calcsize('<i')
value = body[body_offset:body_offset+string_length]
body_offset += string_length
resp.append(value)
case _:
raise ValueError(f"Unexpected signature: {part}")
class Headers(TypedDict, total=False):
path: bytes
interface: bytes
member: bytes
error_name: bytes
reply_serial: int
destination: bytes
sender: bytes
signature: bytes
unix_fds: int
header_types: dict[str, tuple[HeaderType, bytes]] = {
"path": (HeaderType.PATH, b'o'),
"interface": (HeaderType.INTERFACE, b's'),
"member": (HeaderType.MEMBER, b's'),
"error_name": (HeaderType.ERROR_NAME, b's'),
"reply_serial": (HeaderType.REPLY_SERIAL, b'u'),
"destination": (HeaderType.DESTINATION, b's'),
"sender": (HeaderType.SENDER, b's'),
"signature": (HeaderType.SIGNATURE, b'g'),
"unix_fds": (HeaderType.UNIX_FDS, b'u'),
}
def make_headers(
**args: Unpack[Headers]
) -> tuple[int, bytes]:
"""
Returns header length and the padded headers
"""
out = b''
last_len = 0
for key, value in args.items():
header_type, variant = header_types[key]
match variant:
case b'o' | b's':
value = cast(bytes, value)
# 4 bytes for the length
out += struct.pack('<bbs', header_type.value, 1, variant) + b'\x00' + struct.pack('<i', len(value)) + value + b'\0'
case b'g':
value = cast(bytes, value)
# Only 1 byte for the length
out += struct.pack('<bbs', header_type.value, 1, variant) + b'\x00' + struct.pack('<b', len(value)) + value + b'\0'
case b'u':
value = cast(int, value)
# 4 bytes for the value
out += struct.pack('<bbs', header_type.value, 1, variant) + b'\x00' + struct.pack('<I', value)
last_len = len(out)
out += b'\0' * (-last_len % 8)
# TODO: Figure out if there's a problem with the string 4 byte alignment requirement. This far this has worked.
# Remove the last padding
return last_len, out
def pad_to(body: bytes, alignment: int) -> bytes:
body += b'\0' * (-len(body) % alignment)
return body
def construct_message(
message_serial: int,
body_signature: bytes,
body: bytes,
path: bytes = b'/org/freedesktop/DBus',
member: bytes = b'Hello',
interface: bytes = b'org.freedesktop.DBus',
destination: bytes = b'org.freedesktop.DBus',
) -> bytes:
endianness = b'l'
method_type = b'\x01' # METHOD_CALL
flags = b'\x04' # 0x01 = NO_REPLY_EXPECTED, 0x02 = NO_AUTO_START, 0x04 = ALLOW_INTERACTIVE_AUTHORIZATION
protocol_version = b'\x01'
message_body_length = len(body).to_bytes(4, 'little')
_message_serial = struct.pack('<i', message_serial)
headers_length, headers = make_headers(
path=path,
member=member,
interface=interface,
destination=destination,
signature=body_signature,
)
msg = endianness + method_type + flags + protocol_version + message_body_length + _message_serial + headers_length.to_bytes(4, 'little') + headers
msg = pad_to(msg, 8)
msg += body
return msg
@dataclass
class Variant:
signature: str
value: object
def encode_body(
signature: str,
*args: object
):
body = bytearray()
def pad_to(alignment: int):
nonlocal body
body += b'\0' * (-len(body) % alignment)
def append_part(part: SignaturePart, arg: object):
nonlocal body
match part:
case SignatureSymbol.STRING:
arg = cast(str, arg)
pad_to(4)
body += len(arg).to_bytes(4, 'little') + arg.encode() + b'\0'
# pad_to(4)
case SignatureSymbol.UINT32:
pad_to(4)
arg = cast(int, arg)
body += arg.to_bytes(4, 'little')
pad_to(4)
case SignatureSymbol.UINT16:
pad_to(2)
arg = cast(int, arg)
body += arg.to_bytes(2, 'little')
pad_to(2)
case SignatureSymbol.UINT64:
pad_to(8)
arg = cast(int, arg)
body += arg.to_bytes(8, 'little')
pad_to(8)
case SignatureSymbol.INT32:
pad_to(4)
arg = cast(int, arg)
body += arg.to_bytes(4, 'little', signed=True)
pad_to(4)
case SignatureSymbol.INT16:
pad_to(2)
arg = cast(int, arg)
body += arg.to_bytes(2, 'little', signed=True)
pad_to(2)
case SignatureSymbol.INT64:
pad_to(8)
arg = cast(int, arg)
body += arg.to_bytes(8, 'little', signed=True)
pad_to(8)
case SignatureSymbol.DOUBLE:
pad_to(8)
arg = cast(float, arg)
body += struct.pack('<d', arg)
pad_to(8)
case SignatureSymbol.BOOLEAN:
arg = cast(bool, arg)
body += b'\x01' if arg else b'\x00'
case ArrayToken():
pad_to(4)
arg = cast(tuple, arg)
# Now we need to calculate the length of the array from the current position
body += b'\0\0\0\0' # Placeholder for the length
after_body = len(body)
for item in arg:
append_part(part.value, item)
# Calculate the length of the array
array_length = len(body) - after_body
body[after_body-4:after_body] = array_length.to_bytes(4, 'little')
case StructToken() | DictToken():
pad_to(8)
arg = cast(tuple, arg)
for subpart, item in zip(part, arg):
append_part(subpart, item)
# TODO: Should we pad here?
case SignatureSymbol.VARIANT:
arg = cast(Variant, arg)
sub_signature = arg.signature
sub_value = arg.value
if not isinstance(sub_value, tuple):
sub_value = (sub_value,)
pad_to(1)
body += len(sub_signature).to_bytes(1, 'little') + sub_signature.encode() + b'\0'
for subpart, item in zip(parse_signature(sub_signature), sub_value):
append_part(subpart, item)
case _:
raise ValueError(f"Unexpected signature: {part}")
sig = parse_signature(signature)
assert len(sig) == len(args), f"Signature length mismatch: {len(sig)} != {len(args)}"
for part, arg in zip(sig, args):
append_part(part, arg)
return bytes(body)
msg = construct_message(2, b'susssasa{sv}i',
encode_body('susssasa{sv}i',
"app_name", # s
0, # u
"app_icon", # s
"h4x0r", # s
"greetings from manually crafted packets", # s
("action1",), # as
(("hint", Variant('u', 10)),), # a{sv}
10_000 # i
),
path=b'/org/freedesktop/Notifications',
member=b'Notify',
interface=b'org.freedesktop.Notifications',
destination=b'org.freedesktop.Notifications',
)
# sys.stdout.buffer.write(msg)
sock.send(msg)
resp = sock.recv(1024)
def parse_packet(
response: bytes
):
extract = 'c'
offset = 0
endianness, = struct.unpack_from(extract, response)
assert endianness == b'l' # The following code assumes the struct is little endian
offset += struct.calcsize(extract)
extract = '<bbbiii'
msgtype, msgflags, protocol_version, body_length, msg_serial, header_length = struct.unpack_from(extract, response, offset)
offset += struct.calcsize(extract)
# assert msg_serial == -1
# Next up are the headers. Header types are always just 1 symbol so the header type can be read at once
header_data = response[offset:offset+header_length+1]
offset += header_length + 1
headers = []
header_offset = 0
while True:
header_offset = (header_offset + 7) & ~7
if header_offset >= header_length:
break
extract = '<bbs'
header_type, variant_signature_length, variant = struct.unpack_from(extract, header_data, header_offset)
header_offset += struct.calcsize(extract) + len(variant)
match variant:
case b'o' | b's':
string_length, = struct.unpack_from('<i', header_data, header_offset)
header_offset += struct.calcsize('<i')
value = header_data[header_offset:header_offset+string_length]
header_offset += string_length
case b'g':
string_length, = struct.unpack_from('<b', header_data, header_offset)
header_offset += struct.calcsize('<b')
value = header_data[header_offset:header_offset+string_length]
header_offset += string_length
case b'u':
value, = struct.unpack_from('<I', header_data, header_offset)
header_offset += struct.calcsize('<I')
case _:
raise ValueError(f"Unexpected variant signature: {variant}")
headers.append((header_type, variant, value))
header_data = {
HeaderType.PATH: None,
HeaderType.INTERFACE: None,
HeaderType.MEMBER: None,
HeaderType.ERROR_NAME: None,
HeaderType.REPLY_SERIAL: None,
HeaderType.DESTINATION: None,
HeaderType.SENDER: None,
HeaderType.SIGNATURE: "", # Default assumed no body
HeaderType.UNIX_FDS: 0, # Default assumed 0
}
for header_type, variant, value in headers:
header_data[HeaderType(header_type)] = value
# Parse the body according to the signature
# Body starts at 8 byte alignment
offset = (offset + 7) & ~7
# body = response[offset:]
signature = parse_signature(header_data[HeaderType.SIGNATURE].decode())
def parse_part(part: SignaturePart):
nonlocal response
nonlocal offset
match part:
case SignatureSymbol.STRING:
# Align to 4 bytes
offset = (offset + 3) & ~3
string_length, = struct.unpack_from('<i', response, offset)
offset += struct.calcsize('<i')
value = response[offset:offset+string_length+1]
offset += string_length+1 # Include the null terminator
return value
case SignatureSymbol.UINT32:
offset = (offset + 3) & ~3
value, = struct.unpack_from('<I', response, offset)
offset += struct.calcsize('<I')
return value
case SignatureSymbol.UINT16:
offset = (offset + 1) & ~1
value, = struct.unpack_from('<H', response, offset)
offset += struct.calcsize('<H')
return value
case SignatureSymbol.UINT64:
offset = (offset + 7) & ~7
value, = struct.unpack_from('<Q', response, offset)
offset += struct.calcsize('<Q')
return value
case SignatureSymbol.INT32:
offset = (offset + 3) & ~3
value, = struct.unpack_from('<i', response, offset)
offset += struct.calcsize('<i')
return value
case SignatureSymbol.INT16:
offset = (offset + 1) & ~1
value, = struct.unpack_from('<h', response, offset)
offset += struct.calcsize('<h')
return value
case SignatureSymbol.INT64:
offset = (offset + 7) & ~7
value, = struct.unpack_from('<q', response, offset)
offset += struct.calcsize('<q')
return value
case SignatureSymbol.DOUBLE:
offset = (offset + 7) & ~7
value, = struct.unpack_from('<d', response, offset)
offset += struct.calcsize('<d')
return value
case SignatureSymbol.BOOLEAN:
value, = struct.unpack_from('<?', response, offset)
offset += struct.calcsize('<?')
return value
case ArrayToken():
offset = (offset + 3) & ~3
array_length, = struct.unpack_from('<I', response, offset)
offset += struct.calcsize('<I')
array = []
for _ in range(array_length):
array.append(parse_part(part.value))
return array
case StructToken() | DictToken():
offset = (offset + 7) & ~7
stru = []
for subpart in part:
stru.append(parse_part(subpart))
return stru
out = []
for part in signature:
out.append(parse_part(part))
return out
parse_packet(resp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment