Last active
July 31, 2025 20:52
-
-
Save thegamecracks/8b80a742fbe7d5ab1f6d3300b71fd2f3 to your computer and use it in GitHub Desktop.
Yet another script to backup Minecraft worlds
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| """Backup a Minecraft world into an archive. | |
| CAUTION: This script has not been thoroughly tested and you may encounter bugs. | |
| By default, this script will perform the following tasks: | |
| 1. Download mcrcon from https://github.com/Tiiffi/mcrcon, if needed | |
| 2. Send an RCON command to disable automatic saving and save the world | |
| 3. Create an archive of the world/ directory named "world-%Y-%m-%d.zip" | |
| 4. Send an RCON command to re-enable automatic saving | |
| Step 1 can be skipped with the --no-download flag or by skipping all steps | |
| that require RCON which, as of now, can be done by passing the --no-save flag. | |
| Steps 2 and 4 require the MCRCON_PASS environment variable to be defined | |
| and will raise an error if it is not set. These two steps can be skipped | |
| by passing the --no-save flag. The host and port can also be changed with | |
| MCRCON_HOST and MCRCON_PORT respectively. If the server is offline, mcrcon | |
| should timeout after about four seconds without an error, allowing this | |
| script to continue the backup. | |
| Step 3 creates an archive according to the -f/--file and -w/--world arguments, | |
| where -w/--world defines which world directory to backup, and -f/--file defines | |
| the destination archive to save to. | |
| Step 3 can also perform a rolling backup by passing the -b/--max-backups argument | |
| which takes a number indicating how many files to leave behind. For example, | |
| passing --max-backups 5 will keep the latest four archives, delete the remaining | |
| archives, and then write the new, fifth archive. | |
| With rolling backups, archives are deleted by matching files that have the world | |
| prefix set with -w/--world and an archive extension that the script can produce, | |
| that being .tar, .tar.bz2, .tar.gz, .tar.xz, and .zip. | |
| Matching archives are then sorted by modification time and filename. | |
| -f/--file supports absolute/relative filepaths, datetime placeholders, and a | |
| {world} placeholder which is substituted with the world name. This script will | |
| raise an error if the world directory does not exist beforehand. | |
| The archive format is automatically chosen from the provided filename which can | |
| be one of .tar, .tar.bz2, .tar.gz, .tar.xz, or .zip. Unknown file extensions will | |
| raise an error. | |
| """ | |
| # /// script | |
| # dependencies = [] | |
| # requires-python = ">=3.11" | |
| # /// | |
| import argparse | |
| import datetime | |
| import os | |
| import platform | |
| import shlex | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import urllib.parse | |
| import urllib.request | |
| from contextlib import contextmanager, nullcontext | |
| from pathlib import Path, PurePosixPath | |
| from typing import Callable, Iterator, Self | |
| IS_WINDOWS = platform.system() == "Windows" | |
| SUPPORTED_ARCHIVE_FORMATS = { | |
| ".tar": "tar", | |
| ".tar.bz2": "bztar", | |
| ".tar.gz": "gztar", | |
| ".tar.xz": "xztar", | |
| ".zip": "zip", | |
| } | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description=__doc__, | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| parser.add_argument( | |
| "--download", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Automatically download mcrcon when missing from PATH or CWD (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "-n", | |
| "--dry-run", | |
| action="store_true", | |
| help="Print actions this script would take without actually running them", | |
| ) | |
| parser.add_argument( | |
| "-f", | |
| "--file", | |
| default="{world}-%Y-%m-%d.zip", | |
| dest="file_format", | |
| help="The backup path to save to (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "-b", | |
| "--max-backups", | |
| default=0, | |
| help="The total number of backups to keep, or 0 to keep all backups (default: %(default)s)", | |
| type=int, | |
| ) | |
| parser.add_argument( | |
| "--save", | |
| action=argparse.BooleanOptionalAction, | |
| default=True, | |
| help="Trigger world save over RCON just before backing up (default: %(default)s)", | |
| ) | |
| parser.add_argument( | |
| "-w", | |
| "--world", | |
| default="world", | |
| help="The world directory to backup (default: %(default)s)", | |
| type=Path, | |
| ) | |
| args = parser.parse_args() | |
| download: bool = args.download | |
| dry_run: bool = args.dry_run | |
| file_format: str = args.file_format | |
| max_backups: int = args.max_backups | |
| save: bool = args.save | |
| world: Path = args.world | |
| mcrcon: MinecraftRCON | None = None | |
| if save: | |
| mcrcon = MinecraftRCON.from_environment( | |
| allow_offline=True, | |
| download=download, | |
| dry_run=dry_run, | |
| ) | |
| backups = find_sorted_backups(file_format, world) | |
| start_archive = create_archiver(file_format, world, dry_run=dry_run) | |
| if not world.is_dir(): | |
| raise ValueError(f"Could not find world directory: {world}") | |
| if save: | |
| assert mcrcon is not None | |
| save_context = start_save_context(mcrcon) | |
| else: | |
| save_context = nullcontext() | |
| with save_context: | |
| remove_old_backups(backups, max_backups, dry_run=dry_run) | |
| start_archive() | |
| class MinecraftRCON: | |
| """A simple interface for the mcrcon binary. | |
| :param bin_path: The path to the mcrcon binary. | |
| :param allow_offline: | |
| Suppress errors when the mcrcon binary cannot connect to the | |
| Minecraft server. | |
| :param dry_run: | |
| Only perform pre-run checks and skip invocation of the mcrcon binary. | |
| """ | |
| MCRCON_DOWNLOAD_URL = ( | |
| "https://github.com/Tiiffi/mcrcon/releases/download/v0.7.2/mcrcon-0.7.2-windows-x86-64.zip" | |
| if IS_WINDOWS | |
| else "https://github.com/Tiiffi/mcrcon/releases/download/v0.7.2/mcrcon-0.7.2-linux-x86-64-static.zip" | |
| ) | |
| _BINARY_PATH = Path("mcrcon.exe" if IS_WINDOWS else "mcrcon").resolve() | |
| def __init__( | |
| self, | |
| bin_path: Path, | |
| *, | |
| allow_offline: bool = False, | |
| dry_run: bool = False, | |
| ) -> None: | |
| self.bin_path = bin_path | |
| self.allow_offline = allow_offline | |
| self.dry_run = dry_run | |
| self._is_online = True | |
| def __call__(self, *args: str) -> str: | |
| if os.getenv("MCRCON_PASS") is None: | |
| raise RuntimeError("MCRCON_PASS environment variable must be defined") | |
| elif not self._is_online: | |
| if self.allow_offline: | |
| return "" | |
| else: | |
| raise RuntimeError("Could not connect to RCON server") | |
| print("RCON:", shlex.join(args)) | |
| if self.dry_run: | |
| return "" | |
| try: | |
| return subprocess.check_output( | |
| [self.bin_path, *args], | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| if "Connection failed." in e.output: | |
| self._is_online = False | |
| if self.allow_offline: | |
| print("RCON connection failed, skipping RCON commands") | |
| return e.output | |
| raise | |
| @classmethod | |
| def from_environment( | |
| cls, | |
| *, | |
| download: bool, | |
| dry_run: bool = False, | |
| **kwargs, | |
| ) -> Self: | |
| bin_path = cls._find_mcrcon_binary(download, dry_run=dry_run) | |
| return cls(bin_path, dry_run=dry_run, **kwargs) | |
| @classmethod | |
| def _find_mcrcon_binary( | |
| cls, | |
| download: bool, | |
| *, | |
| dry_run: bool = False, | |
| ) -> Path: | |
| path = shutil.which("mcrcon") | |
| if path is not None: | |
| print(f"Found mcrcon at {path}") | |
| return Path(path) | |
| if cls._BINARY_PATH.is_file(): | |
| print(f"Found mcrcon at {cls._BINARY_PATH}") | |
| return cls._BINARY_PATH | |
| if not download: | |
| raise RuntimeError("Could not find mcrcon binary in PATH or CWD") | |
| elif dry_run: | |
| print(f"Would download mcrcon to {cls._BINARY_PATH}") | |
| return cls._BINARY_PATH | |
| return cls._download_mcrcon_binary() | |
| @classmethod | |
| def _download_mcrcon_binary(cls) -> Path: | |
| url = cls.MCRCON_DOWNLOAD_URL | |
| archive_filename = PurePosixPath(urllib.parse.urlparse(url).path).name | |
| with tempfile.TemporaryDirectory() as dest_dir: | |
| dest_dir = Path(dest_dir) | |
| dest_archive = dest_dir / archive_filename | |
| print(f"Downloading mcrcon from {url}...") | |
| with urllib.request.urlopen(url) as request, dest_archive.open("wb") as f: | |
| while data := request.read(2**20): | |
| f.write(data) | |
| print(f"Extracting {cls._BINARY_PATH.name} to {cls._BINARY_PATH.parent}") | |
| shutil.unpack_archive(dest_archive, extract_dir=dest_dir) | |
| path = next(dest_dir.rglob(cls._BINARY_PATH.name)) | |
| path = path.rename(cls._BINARY_PATH).resolve() | |
| path.chmod(755) | |
| return path | |
| def find_sorted_backups(file_format: str, world: Path) -> list[Path]: | |
| """Find all backups matching the given world name and ending with | |
| a compatible archive extension. | |
| Backups are sorted by modification time and filename. | |
| """ | |
| dest_dir = Path(file_format).parent.resolve() | |
| key = lambda p: (p.stat().st_mtime, p.name) | |
| return [ | |
| path | |
| for path in sorted(dest_dir.iterdir(), key=key) | |
| if path.is_file() | |
| and path.name.startswith(world.name) | |
| and path.name.endswith(tuple(SUPPORTED_ARCHIVE_FORMATS)) | |
| ] | |
| @contextmanager | |
| def start_save_context(mcrcon: MinecraftRCON) -> Iterator[MinecraftRCON]: | |
| """Send RCON commands to temporarily disable automatic saving and force a save.""" | |
| mcrcon("say Running automated backup...", "save-off", "save-all") | |
| try: | |
| yield mcrcon | |
| except BaseException: | |
| mcrcon("say Automated backup failed.", "save-on") | |
| raise | |
| else: | |
| mcrcon("say Automated backup completed!", "save-on") | |
| def remove_old_backups( | |
| backups: list[Path], | |
| max_backups: int, | |
| *, | |
| dry_run: bool = False, | |
| ) -> None: | |
| """Remove the oldest backups in the given list. | |
| :param max_backups: | |
| The number of backups to keep. | |
| This assumes a new backup will be created after this function, | |
| so the actual files remaining after this will be at most 1 less | |
| than this number. | |
| If 0 or a negative number is passed, this function is a no-op. | |
| :param dry_run: | |
| Only print which files would be removed without actually removing them. | |
| """ | |
| if max_backups < 1: | |
| return | |
| index = max(0, len(backups) - max_backups + 1) | |
| for path in backups[:index]: | |
| if dry_run: | |
| print(f"Would remove {path}") | |
| else: | |
| print(f"Removing {path}...") | |
| path.unlink() | |
| def create_archiver( | |
| file_format: str, | |
| world: Path, | |
| *, | |
| dry_run: bool = False, | |
| ) -> Callable[[], Path]: | |
| """Return a callback that creates an archive for the given world. | |
| This validates the file format, evaluates any substitutions, | |
| and enumerates the file to avoid collisions exactly once on creation, | |
| and the filename will not be re-evaluated when the returned callback | |
| is invoked. | |
| """ | |
| def create_archive() -> Path: | |
| if dry_run: | |
| print(f"Would write archive to {dest}") | |
| return dest | |
| print(f"Writing archive to {dest}...") | |
| try: | |
| path = shutil.make_archive( | |
| str(base_dest), | |
| archive_format, | |
| root_dir=world.parent, | |
| base_dir=world.name, | |
| ) | |
| except BaseException: | |
| dest.unlink(missing_ok=True) | |
| raise | |
| return Path(path) | |
| file_format = datetime.datetime.now(datetime.timezone.utc).strftime(file_format) | |
| file_format = file_format.format(world=world.name) | |
| dest = Path(file_format) | |
| dest = maybe_enumerate_path(dest) | |
| for suffix, archive_format in SUPPORTED_ARCHIVE_FORMATS.items(): | |
| base_name = dest.name.removesuffix(suffix) | |
| if base_name != dest.name: | |
| break | |
| else: | |
| suffix = ''.join(dest.suffixes) | |
| raise ValueError(f"Unknown or unsupported archive format: {suffix}") | |
| base_dest = dest.with_name(base_name) | |
| return create_archive | |
| def maybe_enumerate_path(path: Path) -> Path: | |
| """Return the given path, potentially enumerated to avoid file collisions.""" | |
| stem, sep, suffix = path.name.partition(".") | |
| n = 1 | |
| while path.exists(): | |
| path = path.with_name(f"{stem}.{n}{sep}{suffix}") | |
| n += 1 | |
| return path | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment