Skip to content

Instantly share code, notes, and snippets.

@Kobzol
Created December 2, 2019 14:12
Show Gist options
  • Select an option

  • Save Kobzol/e623c986f10760355eae699fd35c7707 to your computer and use it in GitHub Desktop.

Select an option

Save Kobzol/e623c986f10760355eae699fd35c7707 to your computer and use it in GitHub Desktop.
Dask distributed packet parser
import msgpack
BYTES = []
def parse_messages(stream):
data = stream.read(8)
if len(data) < 8:
return []
chunk_count = int.from_bytes(data, "little", signed=False)
chunk_sizes = []
for chunk in range(chunk_count):
size = int.from_bytes(stream.read(8), "little", signed=False)
chunk_sizes.append(size)
messages = []
for id, size in enumerate(chunk_sizes):
if size:
data = stream.read(size)
assert len(data) == size
print(id, size, len(chunk_sizes))
parsed = msgpack.unpackb(data, use_list=False)
print(parsed)
messages.append(parsed)
return messages
with open("stream.bin", "rb") as f:
while True:
messages = parse_messages(f)
if not messages:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment