Skip to content

Instantly share code, notes, and snippets.

@nosoop
Last active July 31, 2025 07:50
Show Gist options
  • Select an option

  • Save nosoop/796d5698edb1fb129b1b4b838d1d09c2 to your computer and use it in GitHub Desktop.

Select an option

Save nosoop/796d5698edb1fb129b1b4b838d1d09c2 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import asyncio
import base64
import dataclasses
import hashlib
import itertools
import io
import lzma
import operator
import pathlib
import struct
import typing
import zipfile
import zlib
from contextvars import ContextVar
from typing import AsyncIterable, Iterable
import httpx
import vdf
import pyzstd
import websockets.asyncio.client as ws_client
import cryptography.hazmat.primitives.ciphers as crypto
import fume.pb.steam as pbmsg
import fume.pb.cdn.content_manifest as contentmsg
from fume.enum.emsg import EMsg
import protobug
_PROTOBUF_MASK = 0x80000000
_MSG_HEADER = struct.Struct("<II") # (id: int, len: int)
@dataclasses.dataclass
class SteamProtobufMessagePacket:
"""
Steam Protobuf message packet. This incldues the header.
"""
header: pbmsg.CMsgProtoBufHeader | None
body: pbmsg.SteamProtobufMessage
def dumps(self) -> bytes:
header_bytes = protobug.dumps(self.header) if self.header else b""
return (
struct.pack("<II", (self.body.msg_id or 0) | _PROTOBUF_MASK, len(header_bytes))
+ header_bytes
+ self.body.dumps()
)
@protobug.message
class CMsgServiceMethodCallFromClient(pbmsg.SteamProtobufMessage, msg_id=151):
def __init_subclass__(cls, **kwargs):
pass
@protobug.message
class CMsgServiceMethodResponse(pbmsg.SteamProtobufMessage, msg_id=147):
pass
@protobug.message
class CContentServerDirectory_GetManifestRequestCode_Request(CMsgServiceMethodCallFromClient):
# TODO how to implement service calls?
app_id: protobug.UInt32 | None = protobug.field(1, default=None)
depot_id: protobug.UInt32 | None = protobug.field(2, default=None)
manifest_id: protobug.UInt64 | None = protobug.field(3, default=None)
app_branch: protobug.String | None = protobug.field(4, default=None)
branch_password_hash: protobug.String | None = protobug.field(5, default=None)
@protobug.message
class CContentServerDirectory_GetManifestRequestCode_Response(
pbmsg.SteamProtobufMessage, msg_id=147
):
manifest_request_code: protobug.UInt64 | None = protobug.field(1, default=None)
def _recv_messages(payload: bytes) -> Iterable[SteamProtobufMessagePacket]:
"""
Creates Steam message packet instances from received byte payloads.
"""
msg_id, msglen = _MSG_HEADER.unpack_from(payload)
if msg_id & _PROTOBUF_MASK:
# for protobufs, the msglen indicates the length of the protobuf header
# the rest of the payload is the body
header = protobug.loads(
payload[_MSG_HEADER.size : _MSG_HEADER.size + msglen], pbmsg.CMsgProtoBufHeader
)
body = payload[_MSG_HEADER.size + msglen :]
# returns the appropriate message
message = pbmsg.SteamMessage.loads(msg_id, body)
if message.msg_id == EMsg.Multi:
# special case for multi: unpack each message and yield its contents
message = typing.cast(pbmsg.CMsgMulti, message)
if not message.message_body:
return
# equivalent to gzip.decompress, but faster since only one member is present
unpacked_body = (
zlib.decompress(message.message_body, wbits=31)
if message.size_unzipped
else message.message_body
)
size_unzipped_actual = len(unpacked_body)
if message.size_unzipped and size_unzipped_actual != message.size_unzipped:
raise Exception(
"CMsgMulti unpacked message size mismatch "
f"(expected: {message.size_unzipped}, received: {size_unzipped_actual})"
)
while len(unpacked_body):
(size,) = struct.unpack_from("<I", unpacked_body)
yield from _recv_messages(unpacked_body[4 : 4 + size])
unpacked_body = unpacked_body[4 + size :]
else:
yield SteamProtobufMessagePacket(header, message)
else:
raise NotImplementedError(f"Non-protobuf message {msg_id} not implemented")
SteamMessageQueue = asyncio.Queue[SteamProtobufMessagePacket]
_send_queue: SteamMessageQueue = asyncio.Queue()
""" Queue containing messages to be sent to Steam. """
_send_queue_listeners: set[SteamMessageQueue] = set()
"""
Set containing subscribed listeners to the sending message stream.
Note that this is mainly present for debugging purposes; clients don't need this.
"""
_recv_queue_listeners: set[SteamMessageQueue] = set()
""" Set containing subscribed listeners to the received message stream. """
_jobid_counter = map(lambda n: n % 0xFFFFFFFF, itertools.count())
""" Allocate job IDs by calling ``next(_jobid_counter)``. """
_logon_ctx: ContextVar[pbmsg.CMsgProtoBufHeader] = ContextVar("logon_ctx")
""" The current logon session context. """
async def _msg_send_task(ws: ws_client.ClientConnection) -> None:
"""
Pushes messages out from the queue.
"""
while True:
msg = await _send_queue.get()
for q in _send_queue_listeners:
q.put_nowait(msg)
await ws.send(msg.dumps())
def _msg_send_put(msg: SteamProtobufMessagePacket) -> None:
"""
Pushes the given message onto the sending queue and returns to the caller so they can set up
a wait operation for any desired responses.
If the client is logged in, this will populate the header with the current session
identifiers.
"""
logon_header: pbmsg.CMsgProtoBufHeader | None = _logon_ctx.get(None)
if logon_header:
if not msg.header:
msg.header = pbmsg.CMsgProtoBufHeader()
msg.header.steamid = logon_header.steamid
msg.header.client_sessionid = logon_header.client_sessionid
_send_queue.put_nowait(msg)
def _msg_send_put_job(msg: SteamProtobufMessagePacket) -> int:
"""
Pushes the given message onto the sending queue with a new job ID and returns the ID to the
caller.
"""
jobid = next(_jobid_counter)
if not msg.header:
msg.header = pbmsg.CMsgProtoBufHeader()
msg.header.jobid_source = jobid
_msg_send_put(msg)
return jobid
async def _msg_recv_task(ws: ws_client.ClientConnection) -> None:
"""
Parses messages from the client connection and fans it out to subscribers.
"""
while True:
for msg in _recv_messages(await ws.recv()):
for q in _recv_queue_listeners:
q.put_nowait(msg)
async def _msg_recv_stream() -> AsyncIterable[SteamProtobufMessagePacket]:
"""
Subscribes to the received packet stream, yielding all packets received.
"""
q: SteamMessageQueue = asyncio.Queue()
_recv_queue_listeners.add(q)
try:
while True:
yield await q.get()
q.task_done()
finally:
_recv_queue_listeners.remove(q)
async def _msg_wait_for(emsg: EMsg, jobid: int | None = None) -> SteamProtobufMessagePacket:
"""
Waits for a matching message from the received packet stream and returns it.
Use with ``asyncio.wait_for`` if you want to limit the time waiting.
"""
async for msg in _msg_recv_stream():
if msg.body.msg_id == emsg and (jobid is None or msg.header.jobid_target == jobid):
return msg
raise AssertionError("Unreachable code")
async def _msg_wait_pics_response() -> AsyncIterable[SteamProtobufMessagePacket]:
first_msg = await _msg_wait_for(EMsg.ClientPICSProductInfoResponse)
yield first_msg
async for msg in _msg_recv_stream():
if msg.body.msg_id != EMsg.ClientPICSProductInfoResponse:
continue
elif msg.header.jobid_target != first_msg.header.jobid_target:
continue
yield msg
resp = typing.cast(pbmsg.CMsgClientPICSProductInfoResponse, msg.body)
if not resp.response_pending:
return
async def _process_app(app_pics_payload: dict) -> None:
"""
Given an appinfo structure, downloads the manifest and its contents in turn.
"""
app_id = int(app_pics_payload["appinfo"]["appid"])
depot_public_gids = {
int(depot_id): int(depot["manifests"]["public"]["gid"])
for depot_id, depot in app_pics_payload["appinfo"]["depots"].items()
if depot_id.isdigit()
and "manifests" in depot # ignore top-level keys 'branches', 'overridescddb'
}
# implementation of ValvePython/steam symmetric_decrypt using cryptography module
def symmetric_decrypt(ciphertext: str, key: bytes) -> bytes:
iv_cipher = crypto.Cipher(crypto.algorithms.AES(key), crypto.modes.ECB())
iv_cipher_decryptor = iv_cipher.decryptor()
iv = iv_cipher_decryptor.update(ciphertext[:16])
cipher = crypto.Cipher(crypto.algorithms.AES(key), crypto.modes.CBC(iv))
cipher_decryptor = cipher.decryptor()
result = cipher_decryptor.update(ciphertext[16:])
# result is padded with repeated bytes indicating amount to discard
return result[: -result[-1]]
@dataclasses.dataclass
class Manifest:
payload: contentmsg.ContentManifestPayload
metadata: contentmsg.ContentManifestMetadata
signature: contentmsg.ContentManifestSignature
def decrypt_filenames(self, depot_key: bytes) -> None:
if not self.metadata.filenames_encrypted:
return
for fm in manifest.payload.mappings:
# decrypted filenames are padded by repeated bytes corresponding to the amount
# to truncate
fm.filename = symmetric_decrypt(base64.b64decode(fm.filename), depot_key)
if fm.linktarget:
fm.linktarget = symmetric_decrypt(
base64.b64decode(fm.linktarget), depot_key
)
self.metadata.filenames_encrypted = False
def parse_manifest(manifest_data: bytes) -> Manifest:
with zipfile.ZipFile(io.BytesIO(manifest_data)) as zf:
payload = zf.read(zf.filelist[0])
msg_id, msglen = _MSG_HEADER.unpack_from(payload)
content_manifest = protobug.loads(
payload[_MSG_HEADER.size : _MSG_HEADER.size + msglen],
contentmsg.ContentManifestPayload,
)
payload = payload[_MSG_HEADER.size + msglen :]
# extract next
msg_id, msglen = _MSG_HEADER.unpack_from(payload)
content_manifest_metadata = protobug.loads(
payload[_MSG_HEADER.size : _MSG_HEADER.size + msglen],
contentmsg.ContentManifestMetadata,
)
payload = payload[_MSG_HEADER.size + msglen :]
# extract next
msg_id, msglen = _MSG_HEADER.unpack_from(payload)
content_manifest_signature = protobug.loads(
payload[_MSG_HEADER.size : _MSG_HEADER.size + msglen],
contentmsg.ContentManifestSignature,
)
return Manifest(content_manifest, content_manifest_metadata, content_manifest_signature)
async with httpx.AsyncClient() as client:
for depot_id, manifest_gid in depot_public_gids.items():
call = CContentServerDirectory_GetManifestRequestCode_Request(
app_id=app_id,
depot_id=depot_id,
manifest_id=manifest_gid,
)
content_manifest_code_request = SteamProtobufMessagePacket(
pbmsg.CMsgProtoBufHeader(
target_job_name="ContentServerDirectory.GetManifestRequestCode#1",
),
call,
)
# print(content_manifest_code_request)
content_manifest_code_jobid = _msg_send_put_job(content_manifest_code_request)
content_manifest_code_response = await _msg_wait_for(
147, jobid=content_manifest_code_jobid
)
print(content_manifest_code_response)
manifest_request_code = content_manifest_code_response.body.manifest_request_code
# content_manifest
content_manifest_web_request = await client.get(
"https://google2.cdn.steampipe.steamcontent.com/depot/"
f"{depot_id}/manifest/{manifest_gid}/5/{manifest_request_code}"
)
manifest = parse_manifest(content_manifest_web_request.content)
# print(manifest)
depot_decryption_key_job = _msg_send_put_job(
SteamProtobufMessagePacket(
None,
pbmsg.CMsgClientGetDepotDecryptionKey(depot_id=depot_id, app_id=app_id),
)
)
depot_decryption_key_response = await _msg_wait_for(
EMsg.ClientGetDepotDecryptionKeyResponse, jobid=depot_decryption_key_job
)
depot_key = depot_decryption_key_response.body.depot_encryption_key
# print(f"{depot_key=}")
output_basepath = pathlib.Path() / str(app_id)
manifest.decrypt_filenames(depot_key)
for fm in manifest.payload.mappings:
# decrypted filenames are padded by repeated bytes corresponding to the amount
# to truncate
output_path = output_basepath / fm.filename.decode("utf8").rstrip("\x00")
if output_path.suffix not in (".dll", ".so", ".inf"):
continue
if (
output_path.exists()
and hashlib.sha1(output_path.read_bytes()).digest() == fm.sha_content
):
continue
print(fm)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("wb") as outfile:
# protobug(?) does not sort chunks by offset
for c in sorted(fm.chunks, key=operator.attrgetter("offset")):
chunk_id = c.sha.hex()
while True:
# the CDN may return an error; retry until it doesn't
try:
chunk_req = await client.get(
"https://google2.cdn.steampipe.steamcontent.com/depot/"
f"{depot_id}/chunk/{chunk_id}"
)
chunk_req.raise_for_status()
break
except (httpx.HTTPStatusError, httpx.TimeoutException):
await asyncio.sleep(10)
chunk_data = symmetric_decrypt(chunk_req.content, depot_key)
if chunk_data[:2] == b"VZ":
if chunk_data[-2:] != b"zv":
raise ValueError("VZ: incorrect footer")
elif chunk_data[2:3] != b"a":
raise ValueError("VZ: incorrect version")
filter = lzma._decode_filter_properties(
lzma.FILTER_LZMA1, chunk_data[7:12]
)
dec = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[filter])
checksum, size = struct.unpack("<II", chunk_data[-10:-2])
chunk_data = dec.decompress(chunk_data[12:-9])[:size]
if zlib.crc32(chunk_data) != checksum:
raise ValueError("VZ: checksum mismatch")
elif chunk_data[:3] == b"VSZ":
if chunk_data[-3:] != b"zsv":
raise ValueError("VSZ: incorrect footer")
elif chunk_data[3:4] != b"a":
raise ValueError("VSZ: incorrect version")
(crc_head,) = struct.unpack_from("<I", chunk_data, 4)
crc_tail, size = struct.unpack_from("<II", chunk_data, -15)
chunk_data = pyzstd.decompress(chunk_data[8:-15])[:size]
if zlib.crc32(chunk_data) != crc_head != crc_tail:
raise ValueError("VSZ: checksum mismatch")
else:
try:
with zipfile.ZipFile(io.BytesIO(chunk_data)) as zf:
chunk_data = zf.read(zf.filelist[0])
except zipfile.BadZipFile:
prefix = chunk_data[:4]
# encrypted_prefix = chunk_req.content[:4]
raise ValueError(
f"Unexpected chunk {prefix = }; ({depot_id = }, {chunk_id = })"
)
await asyncio.to_thread(outfile.write, chunk_data)
# verify
if (
int.from_bytes(fm.sha_content)
and hashlib.sha1(output_path.read_bytes()).digest() != fm.sha_content
):
raise ValueError("hash failed")
async def main() -> None:
async with (
ws_client.connect("wss://cmp1-lax1.steamserver.net:27018/cmsocket/") as ws,
asyncio.TaskGroup() as tg,
):
send_task = tg.create_task(_msg_send_task(ws))
recv_task = tg.create_task(_msg_recv_task(ws))
_msg_send_put(
SteamProtobufMessagePacket(
pbmsg.CMsgProtoBufHeader(steamid=0x01A00000_00000000),
pbmsg.CMsgClientLogon(
protocol_version=65580,
client_package_version=1561159470,
),
)
)
logon = await _msg_wait_for(EMsg.ClientLogonResponse)
print(f"{logon = }")
if not logon.header:
raise RuntimeError("No header in response")
logon_body = typing.cast(pbmsg.CMsgClientLogonResponse, logon.body)
if logon_body.eresult != 1:
raise RuntimeError("Failed login")
# note our login session to be used by subsequent tasks
_logon_ctx.set(logon.header)
requested_apps = [
pbmsg.CMsgClientPICSProductInfoRequest.AppInfo(appid=appid, access_token=0)
for appid in (
232250,
222860,
232330,
222840,
232290,
317670,
17505,
17575,
1136190,
295230,
228780,
)
]
_requested_packages = [
pbmsg.CMsgClientPICSProductInfoRequest.PackageInfo(packageid=17906, access_token=0)
]
pics_job = _msg_send_put_job(
SteamProtobufMessagePacket(
None,
pbmsg.CMsgClientPICSProductInfoRequest(
apps=requested_apps,
meta_data_only=False,
num_prev_failed=0,
),
)
)
pics_changes = await _msg_wait_for(EMsg.ClientPICSProductInfoResponse, jobid=pics_job)
async with asyncio.TaskGroup() as tg_app:
for app in pics_changes.body.apps:
payload = vdf.loads(app.buffer.decode("utf8").rstrip("\x00"))
tg_app.create_task(_process_app(payload))
_msg_send_put(SteamProtobufMessagePacket(None, pbmsg.CMsgClientLogOff()))
# we should be putting the queue into shutdown, but we need 3.13 for that...
await asyncio.sleep(0)
recv_task.cancel("logging off")
send_task.cancel("logging off")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment