Last active
October 29, 2025 22:58
-
-
Save thegamecracks/f02d59c1ba12a45c2a2518b48c48834f to your computer and use it in GitHub Desktop.
A zero-dependency, single-file workshop mod manager CLI for Arma 3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| """Manage workshop mods for an Arma 3 server. | |
| See ${prog} help for more information. | |
| """ | |
| # /// script | |
| # dependencies = [] | |
| # requires-python = ">=3.11" | |
| # /// | |
| # | |
| # MIT License | |
| # | |
| # Copyright (c) 2025 thegamecracks | |
| # | |
| # Permission is hereby granted, free of charge, to any person obtaining a copy | |
| # of this software and associated documentation files (the "Software"), to deal | |
| # in the Software without restriction, including without limitation the rights | |
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| # copies of the Software, and to permit persons to whom the Software is | |
| # furnished to do so, subject to the following conditions: | |
| # | |
| # The above copyright notice and this permission notice shall be included in all | |
| # copies or substantial portions of the Software. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| # SOFTWARE. | |
| from __future__ import annotations | |
| import argparse | |
| import datetime | |
| import json | |
| import logging | |
| import os | |
| import platform | |
| import re | |
| import shlex | |
| import shutil | |
| import subprocess | |
| import sys | |
| import textwrap | |
| import urllib.parse | |
| import urllib.request | |
| from abc import ABC, abstractmethod | |
| from contextlib import contextmanager, suppress | |
| from dataclasses import dataclass, field | |
| from http.client import HTTPResponse | |
| from pathlib import Path | |
| from string import Template | |
| from typing import ( | |
| Any, | |
| ClassVar, | |
| Collection, | |
| Iterable, | |
| Iterator, | |
| Mapping, | |
| Self, | |
| Type, | |
| TypeAlias, | |
| overload, | |
| ) | |
| HELP_DOCSTRINGS = { | |
| "summary": """ | |
| ${prog} is written to help manage workshop mods specifically for Arma 3. | |
| This can perform the following tasks, either in sequence or one at a time: | |
| 1. Fetch workshop mods and collections | |
| 2. Execute SteamCMD commands to install workshop mods | |
| 3. Symlink workshop mods and bikeys | |
| 4. Apply common fixes for Arma 3 mods | |
| Example usage: | |
| ${prog} install 450814997 3514182772 my_modpack.html | |
| ${prog} i --dry-run https://steamcommunity.com/sharedfiles/filedetails/?id=3489945148 | |
| ${prog} update --all | |
| ${prog} details | |
| ${prog} remove @cba_a3 3514182772 | |
| To see a help section, run ${prog} help <section>. | |
| All available sections: | |
| ${all_sections} | |
| Next section: '${prog} help auth' | |
| """, | |
| "auth": """ | |
| For seamless integration with SteamCMD, we recommend that you log into SteamCMD | |
| first to cache your credentials: | |
| $ ./steamcmd.sh | |
| Steam>login yoursteamuser | |
| Cached credentials not found. | |
| password: **** | |
| Proceeding with login using username/password. | |
| Logging in user 'yoursteamuser' [U:1:1234] to Steam Public...OK | |
| Waiting for client config...OK | |
| Waiting for user info...OK | |
| Steam>quit | |
| Unloading Steam API...OK | |
| Afterwards, you can use arguments or environment variables to specify | |
| the path to SteamCMD and your username login: | |
| # ~/.bashrc: | |
| export TGM_STEAMCMD=/path/to/steamcmd.sh | |
| export TGM_STEAM_USER=yoursteamuser | |
| # Or CLI arguments: | |
| ${prog} --steamcmd /path/to/steamcmd.sh --steamcmd-user yoursteamuser install ... | |
| INSTALL: 450814997 (CBA_A3) | |
| /path/to/steamcmd.sh +login yoursteamuser +workshop_download_item 107410 450814997 +quit | |
| If you don't want ${prog} to run SteamCMD at all, pass the -n/--dry-run flag: | |
| ${prog} install --dry-run ... | |
| Next section: '${prog} help dry' | |
| """, | |
| "dry": """ | |
| Most commands have side effects such as installing mods and symlinking files. | |
| To test commands without making any changes, you can use the -n/--dry-run flags: | |
| ${prog} install -n 450814997 | |
| ${prog} update -n --all | |
| ${prog} remove -n https://steamcommunity.com/sharedfiles/filedetails/?id=3489945148 | |
| ${prog} lowercase -n | |
| ${prog} link-mods -n | |
| ${prog} link-keys -n | |
| This prints most operations that the command will perform, but will avoid | |
| enacting filesystem changes. Some operations may not be printed until the | |
| actual run when side effects are known, such as symlinking newly installed mods. | |
| Next section: '${prog} help dir' | |
| """, | |
| "dir": """ | |
| ${prog} has three important directories that it uses: | |
| 1. The workshop directory (current: ${workshop_dir}) | |
| 2. The mod directory (current: ${mod_dir}) | |
| 3. The key directory (current: ${key_dir}) | |
| The workshop directory is where SteamCMD is expected to download mods to. | |
| Subcommands like 'fix-meta' and 'lowercase' directly affect mod files here. | |
| The mod directory is where mod symlinks are created. The 'link-mods' subcommand | |
| will generate symlinks to each mod in the workshop directory, like @cba_a3. | |
| The key directory is where key symlinks are created. The 'link-keys' subcommand | |
| will generate symlinks to any keys in the mod directory, like cba_a3.bikey. | |
| These directories can be customized using the following environment variables or options: | |
| TGM_WORKSHOP_DIR= OR ${prog} --workshop-dir ... | |
| TGM_MOD_DIR= OR ${prog} --mod-dir ... | |
| TGM_KEY_DIR= OR ${prog} --key-dir ... | |
| Next section: '${prog} help fixes' | |
| """, | |
| "fixes": """ | |
| The following fixes and utilities are defined: | |
| ${prog} fix-meta | |
| Add publishedid= to meta.cpp files to assist in automatic | |
| downloads when using the Arma 3 Launcher and verifySignatures=2. | |
| ${prog} link-keys | |
| Symlink .bikey files from the mod directory to the keys directory. | |
| ${prog} link-mods | |
| Symlink mods from the workshop directory to the mod directory. | |
| ${prog} lowercase | |
| Lowercase PBO files to help mods load on Linux servers. | |
| (obsolete since Arma 3 v2.22) | |
| Commands like install, update, and remove will automatically invoke these | |
| utilities after completion. To disable this, use the --no-fix flag: | |
| ${prog} install --no-fix 450814997 | |
| ${prog} update --all --no-fix | |
| ${prog} remove --no-fix @cba_a3 | |
| Next section: '${prog} help items' | |
| """, | |
| "items": """ | |
| For the install, update, and remove commands, the following formats | |
| can be used to provide workshop mods: | |
| 1. Bare IDs (450814997) | |
| 2. Workshop URLs (https://steamcommunity.com/sharedfiles/filedetails/?id=3489945148) | |
| 3. Mod symlinks (@cba_a3) | |
| 4. Text files (path/to/modpack.html) | |
| Multiple items can be specified at once in any format: | |
| ${prog} install 450814997 my_modpack.html | |
| ${prog} update my_modpack.html https://steamcommunity.com/sharedfiles/filedetails/?id=3489945148 | |
| ${prog} remove @cba_a3 @warriors_haven_framework | |
| Workshop collections are automatically expanded. | |
| Items that cannot be fetched from the Steam Web API, such as unlisted or deleted mods, | |
| will be ignored by default. To raise these as errors, use the --api-strict flag: | |
| ${prog} --api-strict install ... | |
| Due to limitations with the Steam Web API, dependencies listed on items | |
| cannot be automatically fetched. | |
| Next section: '${prog} help details' | |
| """, | |
| "details": """ | |
| To view a list of all installed mods, use the details subcommand: | |
| ${prog} details | |
| This will fetch every mod from the Steam Web API to show their real titles | |
| and update timestamps. To skip this and only show the URLs and file paths, | |
| use the --no-fetch flag: | |
| ${prog} details --no-fetch | |
| """, | |
| } | |
| @dataclass | |
| class Undefined: | |
| placeholder: str | |
| def __str__(self) -> str: | |
| return self.placeholder | |
| IS_WINDOWS = platform.system() == "Windows" | |
| KEY_DIR = os.getenv("TGM_KEY_DIR", "serverfiles/keys") | |
| MOD_DIR = os.getenv("TGM_MOD_DIR", "serverfiles/mods") | |
| STEAM_USER = os.getenv("TGM_STEAM_USER", Undefined("USER")) | |
| STEAMCMD = os.getenv("TGM_STEAMCMD") | |
| APP_ID = 107410 | |
| WORKSHOP_DIR = os.getenv( | |
| "TGM_WORKSHOP_DIR", | |
| ( | |
| f"C:/Program Files (x86)/Steam/steamapps/workshop/content/{APP_ID}" | |
| if IS_WINDOWS | |
| else f"~/.local/share/Steam/steamapps/workshop/content/{APP_ID}" | |
| ), | |
| ) | |
| # https://steamapi.xpaw.me/#ISteamRemoteStorage/GetCollectionDetails | |
| STEAMAPI_COLLECTION_URL = ( | |
| "https://api.steampowered.com/ISteamRemoteStorage/GetCollectionDetails/v1/" | |
| ) | |
| # https://steamapi.xpaw.me/#ISteamRemoteStorage/GetPublishedFileDetails | |
| STEAMAPI_FILEDETAILS_URL = ( | |
| "https://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1/" | |
| ) | |
| WORKSHOP_URL_PATTERN = re.compile( | |
| r"https://steamcommunity.com/sharedfiles/filedetails/\?id=(\d+)" | |
| ) | |
| WORKSHOP_URL_TEMPLATE = "https://steamcommunity.com/sharedfiles/filedetails/?id={}" | |
| SubParser: TypeAlias = "argparse._SubParsersAction[argparse.ArgumentParser]" | |
| log = logging.getLogger(__name__) | |
| def main() -> None: | |
| try: | |
| config = Config.parse_args() | |
| config.configure_logging() | |
| try: | |
| config.invoke() | |
| except CommandError as e: | |
| if config.verbose: | |
| log.exception(e) | |
| else: | |
| log.error("%s\n(To show full traceback, use -v/--verbose)", e) | |
| sys.exit(1) | |
| except EOFError: | |
| print() | |
| sys.exit(1) | |
| except KeyboardInterrupt: | |
| print() | |
| sys.exit(130) | |
| except Exception as e: | |
| log.exception(e) | |
| sys.exit(1) | |
| @dataclass(kw_only=True) | |
| class Config: | |
| args: argparse.Namespace | |
| command_cls: Type[Command] | |
| ignore_api_errors: bool | |
| key_dir: Path | |
| mod_dir: Path | |
| steamcmd: Path | None | |
| steamcmd_user: str | Undefined | |
| verbose: int | |
| workshop_dir: Path | |
| COMMANDS: ClassVar[list[Type[Command]]] = [] | |
| _commands_loaded: ClassVar[bool] = False | |
| @classmethod | |
| def parse_args(cls) -> Self: | |
| if not cls._commands_loaded: | |
| cls.COMMANDS.append(CheckUpdate) | |
| cls.COMMANDS.append(Details) | |
| cls.COMMANDS.append(FixMeta) | |
| cls.COMMANDS.append(Help) | |
| cls.COMMANDS.append(Install) | |
| cls.COMMANDS.append(LowercaseAddons) | |
| cls.COMMANDS.append(LinkKeys) | |
| cls.COMMANDS.append(LinkMods) | |
| cls.COMMANDS.append(Remove) | |
| cls.COMMANDS.append(Update) | |
| cls._commands_loaded = True | |
| parser = argparse.ArgumentParser( | |
| description=clean_doc(__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| parser.suggest_on_error = True # type: ignore # available in 3.14 | |
| parser.add_argument( | |
| "--api-strict", | |
| action="store_false", | |
| dest="ignore_api_errors", | |
| help="Fail if any mods cannot be resolved on the Steam Web API", | |
| ) | |
| parser.add_argument( | |
| "--key-dir", | |
| default=KEY_DIR, | |
| metavar="TGM_KEY_DIR", | |
| help="Path to directory where keys are linked (default: %(default)s)", | |
| type=lambda s: Path(s).expanduser().absolute(), | |
| ) | |
| parser.add_argument( | |
| "--mod-dir", | |
| default=MOD_DIR, | |
| metavar="TGM_MOD_DIR", | |
| help="Path to directory where mods are linked (default: %(default)s)", | |
| type=lambda s: Path(s).expanduser().absolute(), | |
| ) | |
| parser.add_argument( | |
| "--steamcmd", | |
| default=STEAMCMD, | |
| metavar="TGM_STEAMCMD", | |
| help="Full path to steamcmd script (default: %(default)s)", | |
| type=lambda s: Path(s).expanduser().absolute(), | |
| ) | |
| parser.add_argument( | |
| "--steamcmd-user", | |
| default=STEAM_USER, | |
| metavar="TGM_STEAM_USER", | |
| help="Username used for SteamCMD commands (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "-v", | |
| "--verbose", | |
| action="count", | |
| default=0, | |
| help="Increase logging verbosity", | |
| ) | |
| parser.add_argument( | |
| "--workshop-dir", | |
| default=WORKSHOP_DIR, | |
| metavar="TGM_WORKSHOP_DIR", | |
| help="Path to directory where mods are installed (default: %(default)s)", | |
| type=lambda s: Path(s).expanduser().absolute(), | |
| ) | |
| parser.set_defaults(command=None) | |
| subparsers = parser.add_subparsers() | |
| commands = sorted(cls.COMMANDS, key=lambda t: t.__name__.lower()) | |
| for command_cls in commands: | |
| command_cls.register(subparsers) | |
| args = parser.parse_args() | |
| if args.command is None: | |
| parser.print_help() | |
| sys.exit(1) | |
| return cls.from_args(args) | |
| @classmethod | |
| def from_args(cls, args: argparse.Namespace) -> Self: | |
| return cls( | |
| args=args, | |
| command_cls=args.command, | |
| ignore_api_errors=args.ignore_api_errors, | |
| key_dir=args.key_dir, | |
| mod_dir=args.mod_dir, | |
| steamcmd=args.steamcmd, | |
| steamcmd_user=args.steamcmd_user, | |
| verbose=args.verbose, | |
| workshop_dir=args.workshop_dir, | |
| ) | |
| def configure_logging(self) -> None: | |
| if self.verbose > 0: | |
| level = logging.DEBUG | |
| else: | |
| level = logging.INFO | |
| logging.basicConfig(format="%(levelname)s: %(message)s", level=level) | |
| def invoke(self) -> None: | |
| command = self.command_cls.from_config(self) | |
| command.invoke() | |
| @dataclass | |
| class Command(ABC): | |
| config: Config | |
| @classmethod | |
| @abstractmethod | |
| def register(cls, subparsers: SubParser) -> None: ... | |
| @classmethod | |
| @abstractmethod | |
| def from_config(cls, config: Config, /) -> Self: ... | |
| @abstractmethod | |
| def invoke(self) -> None: ... | |
| def expand_collection_file_details( | |
| self, | |
| items: Iterable[FileDetails], | |
| ) -> list[FileDetails]: | |
| return expand_collection_file_details( | |
| items, | |
| ignore_errors=self.config.ignore_api_errors, | |
| ) | |
| def find_mod_links(self) -> dict[Path, Path | None]: | |
| return find_mod_links( | |
| mod_dir=self.config.mod_dir, | |
| workshop_dir=self.config.workshop_dir, | |
| ) | |
| def get_published_file_details(self, items: Collection[int]) -> list[FileDetails]: | |
| return get_published_file_details( | |
| items, | |
| ignore_errors=self.config.ignore_api_errors, | |
| ) | |
| def item_modified_at(self, item_id: int) -> datetime.datetime: | |
| return item_modified_at(item_id, workshop_dir=self.config.workshop_dir) | |
| def list_installed_items(self) -> dict[int, Path]: | |
| return list_installed_items(workshop_dir=self.config.workshop_dir) | |
| @dataclass(kw_only=True) | |
| class CheckUpdate(Command): | |
| """Check workshop mods for updates based on their modification time. | |
| This command is deprecated and is equivalent to update --all --dry-run --no-fix. | |
| """ | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "check-update", | |
| aliases=["cu"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Check for workshop mod updates", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| return cls(config) | |
| def invoke(self) -> None: | |
| Update( | |
| self.config, | |
| all=True, | |
| dry_run=True, | |
| fix=False, | |
| force=False, | |
| item_ids=[], | |
| ).invoke() | |
| @dataclass(kw_only=True) | |
| class Details(Command): | |
| """Show details of current workshop mods.""" | |
| fetch: bool | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "details", | |
| aliases=["dt"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Show workshop mod details", | |
| ) | |
| parser.add_argument( | |
| "--fetch", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Fetch workshop items to show extra details", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| return cls(config, fetch=args.fetch) | |
| def invoke(self) -> None: | |
| installed = self.list_installed_items() | |
| installed = {item_id: mod.resolve() for item_id, mod in installed.items()} | |
| if not installed: | |
| return log.info("No workshop mods installed") | |
| mod_links = self.find_mod_links() | |
| details: dict[int, FileDetails] = {} | |
| if self.fetch: | |
| details_list = self.get_published_file_details(installed) | |
| details = {item.id: item for item in details_list} | |
| rows: list[tuple[int, str, Path, Path | None, FileDetails | None]] = [] | |
| for item_id, mod in installed.items(): | |
| link = mod_links[mod] | |
| item = details.get(item_id) | |
| title = ( | |
| item.title | |
| if item is not None | |
| else link.name if link is not None else str(item_id) | |
| ) | |
| rows.append((item_id, title, mod, link, item)) | |
| rows.sort(key=lambda t: (t[1].lower(), t[0])) | |
| for item_id, title, mod, link, item in rows: | |
| self._show_item_details( | |
| item_id=item_id, | |
| title=title, | |
| mod=mod, | |
| link=link, | |
| item=details.get(item_id), | |
| ) | |
| def _show_item_details( | |
| self, | |
| *, | |
| item_id: int, | |
| title: str, | |
| mod: Path, | |
| link: Path | None, | |
| item: FileDetails | None, | |
| ) -> None: | |
| url = WORKSHOP_URL_TEMPLATE.format(item_id) | |
| modified_at = self.item_modified_at(item_id) | |
| if item is not None: | |
| print(title) | |
| print(f" Workshop URL: {url}") | |
| print(f" Downloaded: {modified_at.isoformat(' ')}") | |
| print(f" First published: {item.created_at.isoformat(' ')}") | |
| print(f" Last updated: {item.updated_at.isoformat(' ')}") | |
| print(f" File size: {natural_bytes(item.size)}") | |
| print(f" Location: {mod}") | |
| print(f" Symlink: {link}") | |
| else: | |
| print(title) | |
| print(f" Workshop URL: {url}") | |
| print(f" Downloaded: {modified_at.isoformat(' ')}") | |
| print(f" Location: {mod}") | |
| print(f" Symlink: {link}") | |
| @dataclass(kw_only=True) | |
| class FixMeta(Command): | |
| """Validate and fix meta.cpp publishedid fields. | |
| This ensures that the Arma 3 Launcher can automatically install | |
| the server's mods when signature verification is enabled. | |
| """ | |
| dry_run: bool | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "fix-meta", | |
| aliases=["fm"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Fix meta.cpp files in workshop directory", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Don't modify any meta.cpp files", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| return cls(config, dry_run=args.dry_run) | |
| def invoke(self) -> None: | |
| for mod, link in self.find_mod_links().items(): | |
| item_id = int(mod.name) | |
| name = ( | |
| f"{item_id:<10} ({link.name})" if link is not None else f"{item_id:<10}" | |
| ) | |
| meta = mod / "meta.cpp" | |
| if not meta.exists(): | |
| log.warning("Missing meta.cpp in %s", name) | |
| continue | |
| content = meta.read_text("utf-8") | |
| self._fix_publishedid(meta, name, item_id, content) | |
| def _get_item_id(self, mod: Path) -> int | None: | |
| mod = mod.resolve() | |
| with suppress(ValueError): | |
| return int(mod.name) | |
| def _fix_publishedid( | |
| self, | |
| meta: Path, | |
| name: str, | |
| item_id: int, | |
| content: str, | |
| ) -> None: | |
| pattern = r"(publishedid\s*=\s*)(\d+)" | |
| m = re.search(pattern, content) | |
| if m is None: | |
| print(f"ADD: publishedid = {item_id:<10} <= {name}") | |
| content = content + f"publishedid = {item_id};\n" | |
| if not self.dry_run: | |
| meta.write_text(content, "utf-8") | |
| return | |
| publishedid = int(m[2]) | |
| if publishedid != item_id: | |
| print(f"FIX: {m[0]} <= {name}") | |
| content = re.sub(pattern, rf"\g<1>{item_id}", content, count=1) | |
| if not self.dry_run: | |
| meta.write_text(content, "utf-8") | |
| return | |
| @dataclass(kw_only=True) | |
| class Help(Command): | |
| """Show this program's documentation.""" | |
| section: str | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "help", | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Show this program's documentation", | |
| ) | |
| parser.add_argument( | |
| "section", | |
| choices=sorted(HELP_DOCSTRINGS), | |
| default="summary", | |
| help="The section to show help for", | |
| nargs="?", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| return cls(config, section=args.section) | |
| def invoke(self) -> None: | |
| message = HELP_DOCSTRINGS[self.section] | |
| message = clean_doc(message) | |
| message = Template(message).safe_substitute( | |
| all_sections=", ".join(sorted(HELP_DOCSTRINGS)), | |
| key_dir=self.config.key_dir, | |
| mod_dir=self.config.mod_dir, | |
| workshop_dir=self.config.workshop_dir, | |
| ) | |
| print(message) | |
| @dataclass(kw_only=True) | |
| class Install(Command): | |
| """Install workshop mods and collections. | |
| After installation, the lowercase, link-mods, fix-meta, and link-keys commands | |
| are automatically invoked unless --no-fix is used. | |
| """ | |
| dry_run: bool | |
| fix: bool | |
| item_ids: list[int] | |
| skip_installed: bool | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "install", | |
| aliases=["i"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Install workshop mods and collections", | |
| ) | |
| parser.add_argument( | |
| "-a", | |
| "--all", | |
| action="store_true", | |
| help="Deprecated alias of --force", | |
| ) | |
| parser.add_argument( | |
| "--fix", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Run fixes after install (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "-f", | |
| "--force", | |
| action="store_true", | |
| help="Try installing all mods even if already installed", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Don't install any mods", | |
| ) | |
| parser.add_argument( | |
| "mods", | |
| help="Workshop mods or collections to install", | |
| nargs="+", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| skip_installed = True | |
| if args.force: | |
| skip_installed = False | |
| elif args.all: | |
| log.warning("-a/--all is deprecated, use --force instead") | |
| skip_installed = False | |
| item_ids = parse_item_ids(args.mods, mod_dir=config.mod_dir) | |
| return cls( | |
| config, | |
| dry_run=args.dry_run, | |
| fix=args.fix, | |
| item_ids=item_ids, | |
| skip_installed=skip_installed, | |
| ) | |
| def invoke(self) -> None: | |
| items = self.get_published_file_details(self.item_ids) | |
| items = self.expand_collection_file_details(items) | |
| if self.skip_installed: | |
| installed = self.list_installed_items() | |
| items = [item for item in items if item.id not in installed] | |
| if not items: | |
| return log.info("No workshop mods need to be installed") | |
| command = SteamCommand.from_config(self.config) | |
| for item in items: | |
| print(f"INSTALL: {item.id:<10} ({item.title})") | |
| command.extend("+workshop_download_item", APP_ID, item.id) | |
| print(command) | |
| if not self.dry_run: | |
| command.call() | |
| if self.fix: | |
| LowercaseAddons(self.config, dry_run=self.dry_run).invoke() | |
| LinkMods( | |
| self.config, | |
| dry_run=self.dry_run, | |
| fetch=False, | |
| prompt=False, | |
| prune=False, | |
| ).invoke_with_items({item.id: item for item in items}) | |
| FixMeta(self.config, dry_run=self.dry_run).invoke() | |
| LinkKeys(self.config, dry_run=self.dry_run, prune=True).invoke() | |
| @dataclass(kw_only=True) | |
| class LowercaseAddons(Command): | |
| """Lowercase addons in workshop mods for Arma 3 Linux compatibility.""" | |
| dry_run: bool | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "lowercase", | |
| aliases=["lw"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Lowercase addons in workshop directory", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Don't rename any files", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| return cls(config, dry_run=args.dry_run) | |
| def invoke(self) -> None: | |
| if IS_WINDOWS: | |
| return log.info("Skipping addon lowercasing on Windows") | |
| for mod in self.list_installed_items().values(): | |
| addons = self._find_addons_directory(mod) | |
| if addons is None: | |
| log.warning("No addons directory found in %s", mod) | |
| continue | |
| self._iter_addons(addons) | |
| def _find_addons_directory(self, mod: Path) -> Path | None: | |
| workshop_dir = self.config.workshop_dir | |
| expected = mod / "addons" | |
| for path in mod.iterdir(): | |
| if not path.is_dir(): | |
| continue | |
| elif path.name.lower() != expected.name: | |
| continue | |
| elif path == expected: | |
| return expected | |
| print(f"RENAME: {expected.name:50} <= {path.relative_to(workshop_dir)}") | |
| if not self.dry_run: | |
| path.rename(expected) | |
| return expected | |
| return path | |
| def _iter_addons(self, addons: Path) -> None: | |
| types = ("*.pbo", "*.bisign") | |
| workshop_dir = self.config.workshop_dir | |
| files = (p for t in types for p in addons.glob(t)) | |
| files = ((old, old.with_name(old.name.lower())) for old in files) | |
| files = ((old, new) for old, new in files if old != new) | |
| for old, new in files: | |
| print(f"RENAME: {new.name:50} <= {old.relative_to(workshop_dir)}") | |
| if not self.dry_run: | |
| old.rename(new) | |
| @dataclass(kw_only=True) | |
| class LinkKeys(Command): | |
| """Link bikey files in the mod directory to the keys directory.""" | |
| dry_run: bool | |
| prune: bool | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "link-keys", | |
| aliases=["lk"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Symlink keys from mod directory", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Don't link any keys", | |
| ) | |
| parser.add_argument( | |
| "--prune", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Remove broken links if present (default: %(default)s)", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| return cls(config, dry_run=args.dry_run, prune=args.prune) | |
| def invoke(self) -> None: | |
| if self.prune: | |
| remove_broken_links(self.config.key_dir, dry_run=self.dry_run) | |
| keys = self._find_keys() | |
| for key in keys: | |
| self._link_key(key) | |
| def _find_keys(self) -> list[Path]: | |
| keys: dict[str, Path] = {} | |
| for mod in self.config.mod_dir.iterdir(): | |
| if not mod.is_dir(): | |
| continue | |
| self._find_keys_in_mod(keys, mod) | |
| self._check_unsigned_addons(mod) | |
| return sorted(keys.values()) | |
| def _find_keys_in_mod(self, keys: dict[str, Path], mod: Path) -> None: | |
| found = False | |
| for key in mod.rglob("*.bikey"): | |
| found = True | |
| if key.name not in keys: | |
| keys[key.name] = key | |
| else: | |
| self._check_conflict(key, keys[key.name]) | |
| if not found: | |
| log.warning("No keys found in mod: %s", mod.name) | |
| def _check_conflict(self, old: Path, new: Path) -> None: | |
| if old.read_bytes() == new.read_bytes(): | |
| return | |
| with suppress(ValueError): | |
| old = old.relative_to(self.config.mod_dir) | |
| with suppress(ValueError): | |
| new = new.relative_to(self.config.mod_dir) | |
| raise CommandError( | |
| f"Key contents differ between {old} and {new}, must be renamed" | |
| ) | |
| def _link_key(self, key: Path) -> None: | |
| link = self.config.key_dir / key.name | |
| if link.is_file() and not link.is_symlink(): | |
| log.warning("Replacing file with link: %s", link.name) | |
| if not self.dry_run: | |
| link.unlink() | |
| elif link.exists(): | |
| self._check_conflict(link, key) | |
| return | |
| print(f"LINK: {link.name:50} <= {key.relative_to(self.config.mod_dir)}") | |
| if not self.dry_run: | |
| assert key.is_absolute() | |
| link.symlink_to(key) | |
| def _check_unsigned_addons(self, mod: Path) -> None: | |
| addons = mod / "addons" | |
| if not addons.exists(): | |
| return log.warning( | |
| "Missing addons directory in mod: %s", | |
| mod.relative_to(self.config.mod_dir), | |
| ) | |
| pbos = list(addons.glob("*.pbo")) | |
| bisigns = list(addons.glob("*.bisign")) | |
| unsigned = [ | |
| p for p in pbos if not any(s.name.startswith(p.name) for s in bisigns) | |
| ] | |
| for pbo in unsigned: | |
| log.warning("Addon is unsigned: %s", pbo.relative_to(self.config.mod_dir)) | |
| @dataclass(kw_only=True) | |
| class LinkMods(Command): | |
| """Link each mod in the workshop directory to the server mods directory.""" | |
| dry_run: bool | |
| fetch: bool | |
| prompt: bool | |
| prune: bool | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "link-mods", | |
| aliases=["lm"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Symlink mods from workshop directory", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Don't create any links", | |
| ) | |
| parser.add_argument( | |
| "--fetch", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Fetch workshop items to suggest titles (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "--prompt", | |
| action=argparse.BooleanOptionalAction, | |
| default=False, | |
| help="Prompt to enter each link's name (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "--prune", | |
| action=argparse.BooleanOptionalAction, | |
| default=False, | |
| help="Remove broken links if present (default: %(default)s)", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| return cls( | |
| config, | |
| dry_run=args.dry_run, | |
| fetch=args.fetch, | |
| prompt=args.prompt, | |
| prune=args.prune, | |
| ) | |
| def invoke(self) -> None: | |
| if self.prune: | |
| remove_broken_links(self.config.mod_dir, dry_run=self.dry_run) | |
| missing = self._find_missing_links() | |
| if not missing: | |
| return log.info("No workshop mods need to be linked") | |
| items = {} | |
| if self.fetch: | |
| item_ids = [int(mod.name) for mod in missing] | |
| items = self.get_published_file_details(item_ids) | |
| items = {item.id: item for item in items} | |
| self._create_links(missing, items) | |
| def invoke_with_items(self, items: Mapping[int, FileDetails | None]) -> None: | |
| missing = self._find_missing_links() | |
| # missing = [mod for mod in missing if items.get(int(mod.name)) is not None] | |
| self._create_links(missing, items) | |
| def _create_links( | |
| self, | |
| missing: Collection[Path], | |
| items: Mapping[int, FileDetails | None], | |
| ) -> None: | |
| for mod in missing: | |
| id = int(mod.name) | |
| item = items.get(id) | |
| self._create_link(mod, item=item) | |
| def _find_missing_links(self) -> list[Path]: | |
| mod_links = self.find_mod_links() | |
| missing = [mod for mod, link in mod_links.items() if link is None] | |
| return missing | |
| def _create_link(self, mod: Path, item: FileDetails | None) -> None: | |
| item_id = int(mod.name) | |
| title = item.title if item is not None else mod.name | |
| link_name = self._normalize_name(title) | |
| if self.prompt: | |
| # TODO: add a way to skip mods? | |
| print(f"{item_id:<10} ({title})") | |
| overwrite = input(f" Link name ({link_name}): ") | |
| overwrite = self._normalize_name(overwrite) | |
| link_name = overwrite or link_name | |
| if not link_name: | |
| raise CommandError(f"No name available for mod: {mod}") | |
| link = self.config.mod_dir / link_name | |
| if link.exists(): | |
| raise CommandError(f"Link already exists: {link.name}") | |
| print(f"LINK: {link.name:50} <= {mod.relative_to(self.config.workshop_dir)}") | |
| if not self.dry_run: | |
| assert mod.is_absolute() | |
| link.symlink_to(mod, target_is_directory=True) | |
| @staticmethod | |
| def _normalize_name(name: str) -> str: | |
| name = re.sub(r"\s+", "_", name) | |
| name = re.sub(r"\W+", "", name) | |
| name = re.sub(r"_{2,}", "_", name) | |
| name = name.lower().strip("_ ") | |
| return f"@{name}" if name else "" | |
| @dataclass(kw_only=True) | |
| class Remove(Command): | |
| """Remove workshop mods and collections. | |
| After removal, the link-keys command is automatically invoked unless | |
| --no-fix is used. This ensures broken key symlinks are removed or symlinked | |
| from another mod. Mod symlinks are not affected. | |
| """ | |
| fetch: bool | |
| fix: bool | |
| dry_run: bool | |
| item_ids: list[int] | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "remove", | |
| aliases=["rm"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Remove workshop mods and collections", | |
| ) | |
| parser.add_argument( | |
| "--fetch", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Fetch workshop items to resolve titles and collections (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "--fix", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Run fixes after removal (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Don't remove any mods", | |
| ) | |
| parser.add_argument( | |
| "mods", | |
| help="Workshop mods or collections to remove", | |
| nargs="+", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| item_ids = parse_item_ids(args.mods, mod_dir=config.mod_dir) | |
| return cls( | |
| config, | |
| dry_run=args.dry_run, | |
| fetch=args.fetch, | |
| fix=args.fix, | |
| item_ids=item_ids, | |
| ) | |
| def invoke(self) -> None: | |
| if self.fetch: | |
| details = self.get_published_file_details(self.item_ids) | |
| details = self.expand_collection_file_details(details) | |
| items = {item.id: item for item in details} | |
| else: | |
| items = {item: None for item in self.item_ids} | |
| installed: dict[int, Path] = {} | |
| for item_id, item in items.items(): | |
| mod = self.config.workshop_dir / str(item_id) | |
| if mod.is_dir(): | |
| installed[item_id] = mod | |
| if not installed: | |
| return log.info("No workshop mods need to be removed") | |
| for item_id, mod in installed.items(): | |
| item = items.get(item_id) | |
| if item is not None: | |
| print(f"REMOVE: {item_id:<10} ({item.title})") | |
| else: | |
| print(f"REMOVE: {item_id:<10}") | |
| if not self.dry_run: | |
| shutil.rmtree(mod) | |
| if self.fix: | |
| LinkKeys(self.config, dry_run=self.dry_run, prune=True).invoke() | |
| @dataclass(kw_only=True) | |
| class Update(Command): | |
| """Check and update workshop mods based on their modification time. | |
| After updating, the lowercase, fix-meta, and link-keys commands | |
| are automatically invoked unless --no-fix is used. | |
| """ | |
| all: bool | |
| dry_run: bool | |
| fix: bool | |
| force: bool | |
| item_ids: list[int] | |
| @classmethod | |
| def register(cls, subparsers: SubParser) -> None: | |
| parser = subparsers.add_parser( | |
| "update", | |
| aliases=["u"], | |
| description=clean_doc(cls.__doc__), | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| help="Update workshop mods and collections", | |
| ) | |
| parser.add_argument( | |
| "-a", | |
| "--all", | |
| action="store_true", | |
| help="Check all installed mods for updates", | |
| ) | |
| parser.add_argument( | |
| "--fix", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Run fixes after update (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "-f", | |
| "--force", | |
| action="store_true", | |
| help="Skip modification time checks and always update mods", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Don't update any mods", | |
| ) | |
| parser.add_argument( | |
| "mods", | |
| help="Workshop mods or collections to update", | |
| nargs="*", | |
| ) | |
| parser.set_defaults(command=cls) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| args = config.args | |
| item_ids = parse_item_ids(args.mods, mod_dir=config.mod_dir) | |
| return cls( | |
| config, | |
| all=args.all, | |
| dry_run=args.dry_run, | |
| fix=args.fix, | |
| force=args.force, | |
| item_ids=item_ids, | |
| ) | |
| def invoke(self) -> None: | |
| installed = self.list_installed_items() | |
| if not installed: | |
| return log.info("No workshop mods installed") | |
| if not self.all and not self.item_ids: | |
| return log.info( | |
| "Use -a/--all to update all mods, or specify one or more " | |
| "workshop mods to update" | |
| ) | |
| if self.item_ids and not self.all: | |
| details = self.get_published_file_details(self.item_ids) | |
| details = self.expand_collection_file_details(details) | |
| details = [item for item in details if item.id in installed] | |
| else: | |
| details = self.get_published_file_details(installed) | |
| outdated = [item for item in details if self.force or self._is_outdated(item)] | |
| if not outdated: | |
| return log.info("No workshop mods need to be updated") | |
| command = SteamCommand.from_config(self.config) | |
| for item in outdated: | |
| date = item.updated_at.date() | |
| print(f"UPDATE: {date} {item.id:<10} ({item.title})") | |
| command.extend("+workshop_download_item", APP_ID, str(item.id)) | |
| print(command) | |
| if not self.dry_run: | |
| command.call() | |
| if self.fix: | |
| LowercaseAddons(self.config, dry_run=self.dry_run).invoke() | |
| FixMeta(self.config, dry_run=self.dry_run).invoke() | |
| LinkKeys(self.config, dry_run=self.dry_run, prune=True).invoke() | |
| def _is_outdated(self, item: FileDetails) -> bool: | |
| modified = self.item_modified_at(item.id) | |
| return item.updated_at > modified | |
| @dataclass(kw_only=True) | |
| class CollectionDetails: | |
| id: int | |
| children: list[int] | |
| @classmethod | |
| def from_dict(cls, data: dict[str, Any]) -> Self: | |
| log.debug("Parsing collection details: %s", data) | |
| item_id = int(data["publishedfileid"]) | |
| if data["result"] == 9: | |
| raise DetailsError(f"Item ID not found: {item_id}") | |
| elif data["result"] != 1: | |
| r = data["result"] | |
| raise DetailsError(f"Item ID {item_id} Unexpected result code: {r}") | |
| return cls( | |
| id=item_id, | |
| children=[int(child["publishedfileid"]) for child in data["children"]], | |
| ) | |
| @dataclass(kw_only=True) | |
| class FileDetails: | |
| id: int | |
| title: str | |
| description: str = field(repr=False) | |
| created_at: datetime.datetime | |
| updated_at: datetime.datetime | |
| size: int | |
| tags: list[Tag] | |
| @classmethod | |
| def from_dict(cls, data: dict[str, Any]) -> Self: | |
| log.debug("Parsing file details: %s", data) | |
| item_id = int(data["publishedfileid"]) | |
| if data["result"] == 9: | |
| raise DetailsError(f"Item ID not found: {item_id}") | |
| elif data["result"] != 1: | |
| r = data["result"] | |
| raise DetailsError(f"Item ID {item_id} Unexpected result code: {r}") | |
| created_at = datetime.datetime.fromtimestamp(data["time_created"]).astimezone() | |
| updated_at = datetime.datetime.fromtimestamp(data["time_updated"]).astimezone() | |
| return cls( | |
| id=item_id, | |
| title=data["title"], | |
| description=data["description"], | |
| created_at=created_at, | |
| updated_at=updated_at, | |
| size=int(data["file_size"]), | |
| tags=[Tag.from_dict(tag) for tag in data["tags"]], | |
| ) | |
| class SteamCommand: | |
| """A SteamCMD command.""" | |
| def __init__( | |
| self, | |
| *args: int | str, | |
| command: Path | None, | |
| user: str | Undefined | None, | |
| ) -> None: | |
| self.command = command | |
| self.user = user | |
| self._args: list[str] = [str(x) for x in args] | |
| def __repr__(self) -> str: | |
| args = ", ".join(map(repr, self._args)) | |
| return f"{type(self).__name__}({args}, command={self.command!r}, user={self.user!r})" | |
| def __str__(self) -> str: | |
| return shlex.join(self.args) | |
| @property | |
| def args(self) -> list[str]: | |
| command = "steamcmd" | |
| if self.command is not None: | |
| command = str(self.command.absolute()) | |
| args = [command] | |
| if self.user is not None: | |
| args.extend(("+login", str(self.user))) | |
| args.extend(self._args) | |
| args.append("+quit") | |
| return args | |
| @property | |
| def cwd(self) -> Path | None: | |
| if self.command is not None: | |
| return self.command.absolute().parent | |
| def call(self) -> None: | |
| args = self.args | |
| if self.command is None and shutil.which(args[0]) is None: | |
| raise SteamNotFoundError() | |
| if isinstance(self.user, Undefined): | |
| raise UndefinedSteamUserError() | |
| subprocess.check_call(self.args, cwd=self.cwd) | |
| def extend(self, *args: int | str) -> None: | |
| self._args.extend(str(x) for x in args) | |
| @classmethod | |
| def from_config(cls, config: Config) -> Self: | |
| return cls(command=config.steamcmd, user=config.steamcmd_user) | |
| @dataclass(kw_only=True) | |
| class Tag: | |
| name: str | |
| @classmethod | |
| def from_dict(cls, data: dict[str, Any]) -> Self: | |
| return cls(name=data["tag"]) | |
| class CommandError(Exception): | |
| """Raised when an error occurs during command execution.""" | |
| class DetailsError(CommandError): | |
| """Raised when file details cannot be parsed from the Steam Web API.""" | |
| class HTTPError(CommandError): | |
| """Raised when an error occurs with an HTTP request.""" | |
| class SteamNotFoundError(CommandError): | |
| """Raised when attempting to invoke SteamCMD without a valid steamcmd path.""" | |
| def __init__(self) -> None: | |
| message = clean_doc( | |
| "SteamCMD could not be found in PATH.\n" | |
| "Please specify one of --steamcmd, TGM_STEAMCMD=, or --dry-run if applicable.\n" | |
| "For more information, see '${prog} help auth'." | |
| ) | |
| super().__init__(message) | |
| class UndefinedSteamUserError(CommandError): | |
| """Raised when attempting to invoke SteamCMD without a valid steamcmd path.""" | |
| def __init__(self) -> None: | |
| message = clean_doc( | |
| "No username login was given for SteamCMD.\n" | |
| "Please specify one of --steamcmd-user, TGM_STEAM_USER=, or --dry-run if applicable.\n" | |
| "For more information, see '${prog} help auth'." | |
| ) | |
| super().__init__(message) | |
| @overload | |
| def clean_doc(doc: str) -> str: ... | |
| @overload | |
| def clean_doc(doc: None) -> None: ... | |
| def clean_doc(doc: str | None) -> str | None: | |
| if doc is None: | |
| return None | |
| first, _, rest = doc.partition("\n") | |
| rest = textwrap.dedent(rest) | |
| doc = f"{first}\n{rest}".strip() | |
| return Template(doc).safe_substitute( | |
| prog=Path(sys.argv[0]).name, | |
| ) | |
| def expand_collection_file_details( | |
| items: Iterable[FileDetails], | |
| *, | |
| ignore_errors: bool, | |
| ) -> list[FileDetails]: | |
| collections: list[FileDetails] = [] | |
| ret: list[FileDetails] = [] | |
| for item in items: | |
| if any(tag.name == "Collection" for tag in item.tags): | |
| collections.append(item) | |
| else: | |
| ret.append(item) | |
| if collections: | |
| item_ids = [item.id for item in collections] | |
| expanded = get_collection_details(item_ids, ignore_errors=ignore_errors) | |
| expanded = set(id for item in expanded for id in item.children) | |
| expanded -= {item.id for item in ret} | |
| expanded = get_published_file_details(expanded, ignore_errors=ignore_errors) | |
| ret.extend(expanded) | |
| return ret | |
| def find_mod_links(*, mod_dir: Path, workshop_dir: Path) -> dict[Path, Path | None]: | |
| mod_links: dict[Path, Path | None] = {} | |
| for mod in list_installed_items(workshop_dir=workshop_dir).values(): | |
| mod_links[mod.resolve()] = None | |
| for link in mod_dir.iterdir(): | |
| resolved = link.resolve() | |
| if resolved not in mod_links: | |
| continue | |
| mod_links[resolved] = link | |
| return mod_links | |
| def get_collection_details( | |
| item_ids: Collection[int], | |
| *, | |
| ignore_errors: bool, | |
| ) -> list[CollectionDetails]: | |
| if not item_ids: | |
| return [] | |
| data: dict[str, object] = {"collectioncount": len(item_ids)} | |
| for i, item in enumerate(item_ids): | |
| data[f"publishedfileids[{i}]"] = str(item) | |
| with http_post(STEAMAPI_COLLECTION_URL, data=data) as response: | |
| raise_for_status(response) | |
| payload = response.read() | |
| payload = json.loads(payload) | |
| details = payload["response"]["collectiondetails"] | |
| items = [] | |
| for item in details: | |
| try: | |
| item = CollectionDetails.from_dict(item) | |
| except DetailsError as e: | |
| if not ignore_errors: | |
| raise | |
| log.warning(e) | |
| else: | |
| items.append(item) | |
| if not ignore_errors and len(items) != len(item_ids): | |
| raise DetailsError(f"Expected {len(item_ids)} items, but received {len(items)}") | |
| return items | |
| def get_published_file_details( | |
| item_ids: Collection[int], | |
| *, | |
| ignore_errors: bool, | |
| ) -> list[FileDetails]: | |
| if not item_ids: | |
| return [] | |
| data: dict[str, object] = {"itemcount": len(item_ids)} | |
| for i, item in enumerate(item_ids): | |
| data[f"publishedfileids[{i}]"] = str(item) | |
| with http_post(STEAMAPI_FILEDETAILS_URL, data=data) as response: | |
| raise_for_status(response) | |
| payload = response.read() | |
| payload = json.loads(payload) | |
| details = payload["response"]["publishedfiledetails"] | |
| items = [] | |
| for item in details: | |
| try: | |
| item = FileDetails.from_dict(item) | |
| except DetailsError as e: | |
| if not ignore_errors: | |
| raise | |
| log.warning(e) | |
| else: | |
| items.append(item) | |
| if not ignore_errors and len(items) != len(item_ids): | |
| raise DetailsError(f"Expected {len(item_ids)} items, but received {len(items)}") | |
| return items | |
| @contextmanager | |
| def http_get(url: str) -> Iterator[HTTPResponse]: | |
| headers = {} | |
| request = urllib.request.Request(url, headers=headers) | |
| log.debug("%s %s", request.method, request.full_url) | |
| with urllib.request.urlopen(request, timeout=5) as response: | |
| assert isinstance(response, HTTPResponse) | |
| yield response | |
| @contextmanager | |
| def http_post(url: str, data: dict[str, object]) -> Iterator[HTTPResponse]: | |
| request = urllib.request.Request( | |
| url, | |
| data=urllib.parse.urlencode(data).encode(), | |
| headers={}, | |
| method="POST", | |
| ) | |
| log.debug("%s %s", request.method, request.full_url) | |
| with urllib.request.urlopen(request, timeout=5) as response: | |
| assert isinstance(response, HTTPResponse) | |
| yield response | |
| def item_modified_at(item_id: int, *, workshop_dir: Path) -> datetime.datetime: | |
| mod = workshop_dir / str(item_id) | |
| timestamp = int(mod.stat().st_mtime) | |
| return datetime.datetime.fromtimestamp(timestamp).astimezone() | |
| def list_installed_items(*, workshop_dir: Path) -> dict[int, Path]: | |
| # GetPublishedFileDetails doesn't work on legacy UGC, so we need | |
| # some heuristics to filter them out. Empty directories might have | |
| # been formerly installed mods that the Steam client didn't clean up. | |
| return { | |
| int(mod.name): mod | |
| for mod in workshop_dir.iterdir() | |
| if any(mod.iterdir()) | |
| and not (mod / "composition.sqe").exists() | |
| and not any(mod.glob("*_legacy.bin")) | |
| } | |
| def natural_bytes(n: int) -> str: | |
| if n < 1e3: | |
| return f"{n}B" | |
| elif n < 1e6: | |
| return f"{n / 1e3:.3f}KB" | |
| elif n < 1e9: | |
| return f"{n / 1e6:.3f}MB" | |
| return f"{n / 1e9:.3f}GB" | |
| def parse_item_id(s: str, *, mod_dir: Path) -> int: | |
| try: | |
| return int(s) | |
| except ValueError: | |
| log.debug("Could not parse %r as int", s) | |
| m = WORKSHOP_URL_PATTERN.match(s) | |
| if m is not None: | |
| return int(m[1]) | |
| else: | |
| log.debug("Could not parse %r as workshop URL", s) | |
| link = mod_dir / s | |
| mod = link.resolve() | |
| try: | |
| return int(mod.name) | |
| except ValueError: | |
| log.debug("Could not parse %r as symlink", s) | |
| raise ValueError(f"Cannot parse item ID: {s}") | |
| def parse_item_ids(args: Iterable[str], *, mod_dir: Path) -> list[int]: | |
| item_ids: dict[int, None] = {} # Retain insertion order | |
| for x in args: | |
| with suppress(ValueError): | |
| item_id = parse_item_id(x, mod_dir=mod_dir) | |
| item_ids[item_id] = None | |
| continue | |
| try: | |
| ids = _parse_item_ids_from_path(Path(x)) | |
| except (FileNotFoundError, UnicodeDecodeError, ValueError): | |
| log.debug("Could not parse %r as modpack file", x, exc_info=True) | |
| else: | |
| log.debug("Parsed %r as modpack file with item IDs: %s", x, ids) | |
| item_ids |= dict.fromkeys(ids) | |
| continue | |
| raise CommandError(f"Could not parse item ID, URL, or modpack file: {x}") | |
| return list(item_ids) | |
| def _parse_item_ids_from_path(path: Path) -> list[int]: | |
| item_ids: list[int] = [] | |
| with path.open(encoding="utf-8") as f: | |
| for line in f: | |
| for m in WORKSHOP_URL_PATTERN.finditer(line): | |
| item_ids.append(int(m[1])) | |
| if not item_ids: | |
| raise ValueError(f"No workshop URLs found in {path}") | |
| return item_ids | |
| def raise_for_status(response: HTTPResponse): | |
| if not 200 <= response.status < 300: | |
| raise HTTPError(f"Unexpected HTTP status code: {response.status}") | |
| def remove_broken_links(dir: Path, *, dry_run: bool) -> None: | |
| paths = dir.iterdir() | |
| links = [link for link in paths if link.is_symlink()] | |
| broken = [link for link in links if not link.exists()] | |
| for link in broken: | |
| log.warning("Removing broken link: %s", link.name) | |
| if not dry_run: | |
| link.unlink() | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Screenshots as of 2025-10-29 (CLI colouring in Python 3.14+ only):