Last active
July 31, 2025 07:50
-
-
Save nosoop/796d5698edb1fb129b1b4b838d1d09c2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| import asyncio | |
| import base64 | |
| import dataclasses | |
| import hashlib | |
| import itertools | |
| import io | |
| import lzma | |
| import operator | |
| import pathlib | |
| import struct | |
| import typing | |
| import zipfile | |
| import zlib | |
| from contextvars import ContextVar | |
| from typing import AsyncIterable, Iterable | |
| import httpx | |
| import vdf | |
| import pyzstd | |
| import websockets.asyncio.client as ws_client | |
| import cryptography.hazmat.primitives.ciphers as crypto | |
| import fume.pb.steam as pbmsg | |
| import fume.pb.cdn.content_manifest as contentmsg | |
| from fume.enum.emsg import EMsg | |
| import protobug | |
| _PROTOBUF_MASK = 0x80000000 | |
| _MSG_HEADER = struct.Struct("<II") # (id: int, len: int) | |
| @dataclasses.dataclass | |
| class SteamProtobufMessagePacket: | |
| """ | |
| Steam Protobuf message packet. This incldues the header. | |
| """ | |
| header: pbmsg.CMsgProtoBufHeader | None | |
| body: pbmsg.SteamProtobufMessage | |
| def dumps(self) -> bytes: | |
| header_bytes = protobug.dumps(self.header) if self.header else b"" | |
| return ( | |
| struct.pack("<II", (self.body.msg_id or 0) | _PROTOBUF_MASK, len(header_bytes)) | |
| + header_bytes | |
| + self.body.dumps() | |
| ) | |
| @protobug.message | |
| class CMsgServiceMethodCallFromClient(pbmsg.SteamProtobufMessage, msg_id=151): | |
| def __init_subclass__(cls, **kwargs): | |
| pass | |
| @protobug.message | |
| class CMsgServiceMethodResponse(pbmsg.SteamProtobufMessage, msg_id=147): | |
| pass | |
| @protobug.message | |
| class CContentServerDirectory_GetManifestRequestCode_Request(CMsgServiceMethodCallFromClient): | |
| # TODO how to implement service calls? | |
| app_id: protobug.UInt32 | None = protobug.field(1, default=None) | |
| depot_id: protobug.UInt32 | None = protobug.field(2, default=None) | |
| manifest_id: protobug.UInt64 | None = protobug.field(3, default=None) | |
| app_branch: protobug.String | None = protobug.field(4, default=None) | |
| branch_password_hash: protobug.String | None = protobug.field(5, default=None) | |
| @protobug.message | |
| class CContentServerDirectory_GetManifestRequestCode_Response( | |
| pbmsg.SteamProtobufMessage, msg_id=147 | |
| ): | |
| manifest_request_code: protobug.UInt64 | None = protobug.field(1, default=None) | |
| def _recv_messages(payload: bytes) -> Iterable[SteamProtobufMessagePacket]: | |
| """ | |
| Creates Steam message packet instances from received byte payloads. | |
| """ | |
| msg_id, msglen = _MSG_HEADER.unpack_from(payload) | |
| if msg_id & _PROTOBUF_MASK: | |
| # for protobufs, the msglen indicates the length of the protobuf header | |
| # the rest of the payload is the body | |
| header = protobug.loads( | |
| payload[_MSG_HEADER.size : _MSG_HEADER.size + msglen], pbmsg.CMsgProtoBufHeader | |
| ) | |
| body = payload[_MSG_HEADER.size + msglen :] | |
| # returns the appropriate message | |
| message = pbmsg.SteamMessage.loads(msg_id, body) | |
| if message.msg_id == EMsg.Multi: | |
| # special case for multi: unpack each message and yield its contents | |
| message = typing.cast(pbmsg.CMsgMulti, message) | |
| if not message.message_body: | |
| return | |
| # equivalent to gzip.decompress, but faster since only one member is present | |
| unpacked_body = ( | |
| zlib.decompress(message.message_body, wbits=31) | |
| if message.size_unzipped | |
| else message.message_body | |
| ) | |
| size_unzipped_actual = len(unpacked_body) | |
| if message.size_unzipped and size_unzipped_actual != message.size_unzipped: | |
| raise Exception( | |
| "CMsgMulti unpacked message size mismatch " | |
| f"(expected: {message.size_unzipped}, received: {size_unzipped_actual})" | |
| ) | |
| while len(unpacked_body): | |
| (size,) = struct.unpack_from("<I", unpacked_body) | |
| yield from _recv_messages(unpacked_body[4 : 4 + size]) | |
| unpacked_body = unpacked_body[4 + size :] | |
| else: | |
| yield SteamProtobufMessagePacket(header, message) | |
| else: | |
| raise NotImplementedError(f"Non-protobuf message {msg_id} not implemented") | |
| SteamMessageQueue = asyncio.Queue[SteamProtobufMessagePacket] | |
| _send_queue: SteamMessageQueue = asyncio.Queue() | |
| """ Queue containing messages to be sent to Steam. """ | |
| _send_queue_listeners: set[SteamMessageQueue] = set() | |
| """ | |
| Set containing subscribed listeners to the sending message stream. | |
| Note that this is mainly present for debugging purposes; clients don't need this. | |
| """ | |
| _recv_queue_listeners: set[SteamMessageQueue] = set() | |
| """ Set containing subscribed listeners to the received message stream. """ | |
| _jobid_counter = map(lambda n: n % 0xFFFFFFFF, itertools.count()) | |
| """ Allocate job IDs by calling ``next(_jobid_counter)``. """ | |
| _logon_ctx: ContextVar[pbmsg.CMsgProtoBufHeader] = ContextVar("logon_ctx") | |
| """ The current logon session context. """ | |
| async def _msg_send_task(ws: ws_client.ClientConnection) -> None: | |
| """ | |
| Pushes messages out from the queue. | |
| """ | |
| while True: | |
| msg = await _send_queue.get() | |
| for q in _send_queue_listeners: | |
| q.put_nowait(msg) | |
| await ws.send(msg.dumps()) | |
| def _msg_send_put(msg: SteamProtobufMessagePacket) -> None: | |
| """ | |
| Pushes the given message onto the sending queue and returns to the caller so they can set up | |
| a wait operation for any desired responses. | |
| If the client is logged in, this will populate the header with the current session | |
| identifiers. | |
| """ | |
| logon_header: pbmsg.CMsgProtoBufHeader | None = _logon_ctx.get(None) | |
| if logon_header: | |
| if not msg.header: | |
| msg.header = pbmsg.CMsgProtoBufHeader() | |
| msg.header.steamid = logon_header.steamid | |
| msg.header.client_sessionid = logon_header.client_sessionid | |
| _send_queue.put_nowait(msg) | |
| def _msg_send_put_job(msg: SteamProtobufMessagePacket) -> int: | |
| """ | |
| Pushes the given message onto the sending queue with a new job ID and returns the ID to the | |
| caller. | |
| """ | |
| jobid = next(_jobid_counter) | |
| if not msg.header: | |
| msg.header = pbmsg.CMsgProtoBufHeader() | |
| msg.header.jobid_source = jobid | |
| _msg_send_put(msg) | |
| return jobid | |
| async def _msg_recv_task(ws: ws_client.ClientConnection) -> None: | |
| """ | |
| Parses messages from the client connection and fans it out to subscribers. | |
| """ | |
| while True: | |
| for msg in _recv_messages(await ws.recv()): | |
| for q in _recv_queue_listeners: | |
| q.put_nowait(msg) | |
| async def _msg_recv_stream() -> AsyncIterable[SteamProtobufMessagePacket]: | |
| """ | |
| Subscribes to the received packet stream, yielding all packets received. | |
| """ | |
| q: SteamMessageQueue = asyncio.Queue() | |
| _recv_queue_listeners.add(q) | |
| try: | |
| while True: | |
| yield await q.get() | |
| q.task_done() | |
| finally: | |
| _recv_queue_listeners.remove(q) | |
| async def _msg_wait_for(emsg: EMsg, jobid: int | None = None) -> SteamProtobufMessagePacket: | |
| """ | |
| Waits for a matching message from the received packet stream and returns it. | |
| Use with ``asyncio.wait_for`` if you want to limit the time waiting. | |
| """ | |
| async for msg in _msg_recv_stream(): | |
| if msg.body.msg_id == emsg and (jobid is None or msg.header.jobid_target == jobid): | |
| return msg | |
| raise AssertionError("Unreachable code") | |
| async def _msg_wait_pics_response() -> AsyncIterable[SteamProtobufMessagePacket]: | |
| first_msg = await _msg_wait_for(EMsg.ClientPICSProductInfoResponse) | |
| yield first_msg | |
| async for msg in _msg_recv_stream(): | |
| if msg.body.msg_id != EMsg.ClientPICSProductInfoResponse: | |
| continue | |
| elif msg.header.jobid_target != first_msg.header.jobid_target: | |
| continue | |
| yield msg | |
| resp = typing.cast(pbmsg.CMsgClientPICSProductInfoResponse, msg.body) | |
| if not resp.response_pending: | |
| return | |
| async def _process_app(app_pics_payload: dict) -> None: | |
| """ | |
| Given an appinfo structure, downloads the manifest and its contents in turn. | |
| """ | |
| app_id = int(app_pics_payload["appinfo"]["appid"]) | |
| depot_public_gids = { | |
| int(depot_id): int(depot["manifests"]["public"]["gid"]) | |
| for depot_id, depot in app_pics_payload["appinfo"]["depots"].items() | |
| if depot_id.isdigit() | |
| and "manifests" in depot # ignore top-level keys 'branches', 'overridescddb' | |
| } | |
| # implementation of ValvePython/steam symmetric_decrypt using cryptography module | |
| def symmetric_decrypt(ciphertext: str, key: bytes) -> bytes: | |
| iv_cipher = crypto.Cipher(crypto.algorithms.AES(key), crypto.modes.ECB()) | |
| iv_cipher_decryptor = iv_cipher.decryptor() | |
| iv = iv_cipher_decryptor.update(ciphertext[:16]) | |
| cipher = crypto.Cipher(crypto.algorithms.AES(key), crypto.modes.CBC(iv)) | |
| cipher_decryptor = cipher.decryptor() | |
| result = cipher_decryptor.update(ciphertext[16:]) | |
| # result is padded with repeated bytes indicating amount to discard | |
| return result[: -result[-1]] | |
| @dataclasses.dataclass | |
| class Manifest: | |
| payload: contentmsg.ContentManifestPayload | |
| metadata: contentmsg.ContentManifestMetadata | |
| signature: contentmsg.ContentManifestSignature | |
| def decrypt_filenames(self, depot_key: bytes) -> None: | |
| if not self.metadata.filenames_encrypted: | |
| return | |
| for fm in manifest.payload.mappings: | |
| # decrypted filenames are padded by repeated bytes corresponding to the amount | |
| # to truncate | |
| fm.filename = symmetric_decrypt(base64.b64decode(fm.filename), depot_key) | |
| if fm.linktarget: | |
| fm.linktarget = symmetric_decrypt( | |
| base64.b64decode(fm.linktarget), depot_key | |
| ) | |
| self.metadata.filenames_encrypted = False | |
| def parse_manifest(manifest_data: bytes) -> Manifest: | |
| with zipfile.ZipFile(io.BytesIO(manifest_data)) as zf: | |
| payload = zf.read(zf.filelist[0]) | |
| msg_id, msglen = _MSG_HEADER.unpack_from(payload) | |
| content_manifest = protobug.loads( | |
| payload[_MSG_HEADER.size : _MSG_HEADER.size + msglen], | |
| contentmsg.ContentManifestPayload, | |
| ) | |
| payload = payload[_MSG_HEADER.size + msglen :] | |
| # extract next | |
| msg_id, msglen = _MSG_HEADER.unpack_from(payload) | |
| content_manifest_metadata = protobug.loads( | |
| payload[_MSG_HEADER.size : _MSG_HEADER.size + msglen], | |
| contentmsg.ContentManifestMetadata, | |
| ) | |
| payload = payload[_MSG_HEADER.size + msglen :] | |
| # extract next | |
| msg_id, msglen = _MSG_HEADER.unpack_from(payload) | |
| content_manifest_signature = protobug.loads( | |
| payload[_MSG_HEADER.size : _MSG_HEADER.size + msglen], | |
| contentmsg.ContentManifestSignature, | |
| ) | |
| return Manifest(content_manifest, content_manifest_metadata, content_manifest_signature) | |
| async with httpx.AsyncClient() as client: | |
| for depot_id, manifest_gid in depot_public_gids.items(): | |
| call = CContentServerDirectory_GetManifestRequestCode_Request( | |
| app_id=app_id, | |
| depot_id=depot_id, | |
| manifest_id=manifest_gid, | |
| ) | |
| content_manifest_code_request = SteamProtobufMessagePacket( | |
| pbmsg.CMsgProtoBufHeader( | |
| target_job_name="ContentServerDirectory.GetManifestRequestCode#1", | |
| ), | |
| call, | |
| ) | |
| # print(content_manifest_code_request) | |
| content_manifest_code_jobid = _msg_send_put_job(content_manifest_code_request) | |
| content_manifest_code_response = await _msg_wait_for( | |
| 147, jobid=content_manifest_code_jobid | |
| ) | |
| print(content_manifest_code_response) | |
| manifest_request_code = content_manifest_code_response.body.manifest_request_code | |
| # content_manifest | |
| content_manifest_web_request = await client.get( | |
| "https://google2.cdn.steampipe.steamcontent.com/depot/" | |
| f"{depot_id}/manifest/{manifest_gid}/5/{manifest_request_code}" | |
| ) | |
| manifest = parse_manifest(content_manifest_web_request.content) | |
| # print(manifest) | |
| depot_decryption_key_job = _msg_send_put_job( | |
| SteamProtobufMessagePacket( | |
| None, | |
| pbmsg.CMsgClientGetDepotDecryptionKey(depot_id=depot_id, app_id=app_id), | |
| ) | |
| ) | |
| depot_decryption_key_response = await _msg_wait_for( | |
| EMsg.ClientGetDepotDecryptionKeyResponse, jobid=depot_decryption_key_job | |
| ) | |
| depot_key = depot_decryption_key_response.body.depot_encryption_key | |
| # print(f"{depot_key=}") | |
| output_basepath = pathlib.Path() / str(app_id) | |
| manifest.decrypt_filenames(depot_key) | |
| for fm in manifest.payload.mappings: | |
| # decrypted filenames are padded by repeated bytes corresponding to the amount | |
| # to truncate | |
| output_path = output_basepath / fm.filename.decode("utf8").rstrip("\x00") | |
| if output_path.suffix not in (".dll", ".so", ".inf"): | |
| continue | |
| if ( | |
| output_path.exists() | |
| and hashlib.sha1(output_path.read_bytes()).digest() == fm.sha_content | |
| ): | |
| continue | |
| print(fm) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with output_path.open("wb") as outfile: | |
| # protobug(?) does not sort chunks by offset | |
| for c in sorted(fm.chunks, key=operator.attrgetter("offset")): | |
| chunk_id = c.sha.hex() | |
| while True: | |
| # the CDN may return an error; retry until it doesn't | |
| try: | |
| chunk_req = await client.get( | |
| "https://google2.cdn.steampipe.steamcontent.com/depot/" | |
| f"{depot_id}/chunk/{chunk_id}" | |
| ) | |
| chunk_req.raise_for_status() | |
| break | |
| except (httpx.HTTPStatusError, httpx.TimeoutException): | |
| await asyncio.sleep(10) | |
| chunk_data = symmetric_decrypt(chunk_req.content, depot_key) | |
| if chunk_data[:2] == b"VZ": | |
| if chunk_data[-2:] != b"zv": | |
| raise ValueError("VZ: incorrect footer") | |
| elif chunk_data[2:3] != b"a": | |
| raise ValueError("VZ: incorrect version") | |
| filter = lzma._decode_filter_properties( | |
| lzma.FILTER_LZMA1, chunk_data[7:12] | |
| ) | |
| dec = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[filter]) | |
| checksum, size = struct.unpack("<II", chunk_data[-10:-2]) | |
| chunk_data = dec.decompress(chunk_data[12:-9])[:size] | |
| if zlib.crc32(chunk_data) != checksum: | |
| raise ValueError("VZ: checksum mismatch") | |
| elif chunk_data[:3] == b"VSZ": | |
| if chunk_data[-3:] != b"zsv": | |
| raise ValueError("VSZ: incorrect footer") | |
| elif chunk_data[3:4] != b"a": | |
| raise ValueError("VSZ: incorrect version") | |
| (crc_head,) = struct.unpack_from("<I", chunk_data, 4) | |
| crc_tail, size = struct.unpack_from("<II", chunk_data, -15) | |
| chunk_data = pyzstd.decompress(chunk_data[8:-15])[:size] | |
| if zlib.crc32(chunk_data) != crc_head != crc_tail: | |
| raise ValueError("VSZ: checksum mismatch") | |
| else: | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(chunk_data)) as zf: | |
| chunk_data = zf.read(zf.filelist[0]) | |
| except zipfile.BadZipFile: | |
| prefix = chunk_data[:4] | |
| # encrypted_prefix = chunk_req.content[:4] | |
| raise ValueError( | |
| f"Unexpected chunk {prefix = }; ({depot_id = }, {chunk_id = })" | |
| ) | |
| await asyncio.to_thread(outfile.write, chunk_data) | |
| # verify | |
| if ( | |
| int.from_bytes(fm.sha_content) | |
| and hashlib.sha1(output_path.read_bytes()).digest() != fm.sha_content | |
| ): | |
| raise ValueError("hash failed") | |
| async def main() -> None: | |
| async with ( | |
| ws_client.connect("wss://cmp1-lax1.steamserver.net:27018/cmsocket/") as ws, | |
| asyncio.TaskGroup() as tg, | |
| ): | |
| send_task = tg.create_task(_msg_send_task(ws)) | |
| recv_task = tg.create_task(_msg_recv_task(ws)) | |
| _msg_send_put( | |
| SteamProtobufMessagePacket( | |
| pbmsg.CMsgProtoBufHeader(steamid=0x01A00000_00000000), | |
| pbmsg.CMsgClientLogon( | |
| protocol_version=65580, | |
| client_package_version=1561159470, | |
| ), | |
| ) | |
| ) | |
| logon = await _msg_wait_for(EMsg.ClientLogonResponse) | |
| print(f"{logon = }") | |
| if not logon.header: | |
| raise RuntimeError("No header in response") | |
| logon_body = typing.cast(pbmsg.CMsgClientLogonResponse, logon.body) | |
| if logon_body.eresult != 1: | |
| raise RuntimeError("Failed login") | |
| # note our login session to be used by subsequent tasks | |
| _logon_ctx.set(logon.header) | |
| requested_apps = [ | |
| pbmsg.CMsgClientPICSProductInfoRequest.AppInfo(appid=appid, access_token=0) | |
| for appid in ( | |
| 232250, | |
| 222860, | |
| 232330, | |
| 222840, | |
| 232290, | |
| 317670, | |
| 17505, | |
| 17575, | |
| 1136190, | |
| 295230, | |
| 228780, | |
| ) | |
| ] | |
| _requested_packages = [ | |
| pbmsg.CMsgClientPICSProductInfoRequest.PackageInfo(packageid=17906, access_token=0) | |
| ] | |
| pics_job = _msg_send_put_job( | |
| SteamProtobufMessagePacket( | |
| None, | |
| pbmsg.CMsgClientPICSProductInfoRequest( | |
| apps=requested_apps, | |
| meta_data_only=False, | |
| num_prev_failed=0, | |
| ), | |
| ) | |
| ) | |
| pics_changes = await _msg_wait_for(EMsg.ClientPICSProductInfoResponse, jobid=pics_job) | |
| async with asyncio.TaskGroup() as tg_app: | |
| for app in pics_changes.body.apps: | |
| payload = vdf.loads(app.buffer.decode("utf8").rstrip("\x00")) | |
| tg_app.create_task(_process_app(payload)) | |
| _msg_send_put(SteamProtobufMessagePacket(None, pbmsg.CMsgClientLogOff())) | |
| # we should be putting the queue into shutdown, but we need 3.13 for that... | |
| await asyncio.sleep(0) | |
| recv_task.cancel("logging off") | |
| send_task.cancel("logging off") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment