Last active
October 23, 2025 07:57
-
-
Save ndevenish/b70861dcc0924d6391321540fad2791c to your computer and use it in GitHub Desktop.
Jenkins: Find/download all plugins for a given Jenkins version
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "bs4", | |
| # "packaging", | |
| # "requests", | |
| # "rich", | |
| # "typer", | |
| # "tqdm", | |
| # ] | |
| # /// | |
| from collections.abc import Iterable | |
| import hashlib | |
| import sys | |
| from typing import Annotated | |
| import requests | |
| import zipfile | |
| from bs4 import BeautifulSoup | |
| from pathlib import Path | |
| import json | |
| from rich import print | |
| import typer | |
| import re | |
| import packaging.version as pversion | |
| # Plugins that we know are not present on the Jenkins plugins repo | |
| KNOWN_MISSING = [ | |
| "ace-editor", | |
| "handlebars", | |
| "momentjs", | |
| "windows-slaves", | |
| ] | |
| def get_plugin_page(plugin_name: str) -> str: | |
| """ | |
| Get the version compatibility page for a specific plugin. | |
| Caches to disk in the ./cache/ folder, and reads that in preference | |
| to downloading again. | |
| """ | |
| cache = Path.cwd() / "cache" / f"{plugin_name}.html" | |
| if cache.is_file(): | |
| return cache.read_text() | |
| url = f"https://updates.jenkins.io/download/plugins/{plugin_name}/" | |
| print(f"Fetching {plugin_name} from {url}") | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| cache.parent.mkdir(parents=True, exist_ok=True) | |
| cache.write_text(response.text) | |
| return response.text | |
| def read_plugin(name: str) -> list[dict[str, str]]: | |
| """Read and parse a plugin compatibility page, returning the data.""" | |
| soup = BeautifulSoup(get_plugin_page(name), features="html.parser") | |
| # breakpoint() | |
| data = [] | |
| ul = soup.find("ul", class_="artifact-list") | |
| if not ul: | |
| raise RuntimeError(f"Could not parse versions out of plugin page for {name}") | |
| for li in ul.find_all("li"): | |
| version_link = li.find("a", class_="version") | |
| if not version_link: | |
| continue | |
| version_text = version_link.text.strip() | |
| version_url = version_link["href"] | |
| # Skip placeholder 'permalink to latest' | |
| if "permalink" in version_text.lower(): | |
| continue | |
| def _find(tag) -> bool: | |
| return ( | |
| tag.name == "div" | |
| and tag.get("class") == ["checksums"] | |
| and "SHA-256" in tag.get_text() | |
| ) | |
| sha256_div = li.find(_find) | |
| # breakpoint() | |
| sha256 = sha256_div.code.text | |
| dependency_div = li.find("div", class_="core-dependency") | |
| dependency = ( | |
| dependency_div.get_text(strip=True).replace("Requires Jenkins", "").strip() | |
| ) | |
| data.append( | |
| { | |
| "version": version_text, | |
| "url": version_url, | |
| "sha256": sha256, | |
| "requires": dependency, | |
| } | |
| ) | |
| return data | |
| def get_plugin_lookup(plugins: Iterable[str]) -> dict[str, list[dict[str, str]]]: | |
| """ | |
| Given a list of plugin names, fetch the version compatibility lists. | |
| """ | |
| cache = Path.cwd() / "plugins.json" | |
| out = {} | |
| for plugin in sorted(plugins): | |
| try: | |
| out[plugin] = read_plugin(plugin) | |
| except requests.exceptions.HTTPError as e: | |
| print(f"Error: {e}") | |
| cache.write_text(json.dumps(out, indent=4)) | |
| return out | |
| def parse_manifest(manifest_text: str) -> dict: | |
| """ | |
| Parse a MANIFEST.MF file into a dictionary. | |
| Handles continuation lines (those starting with a space). | |
| """ | |
| manifest_dict = {} | |
| current_key = None | |
| current_value = [] | |
| for line in manifest_text.splitlines(): | |
| if not line.strip(): # blank line separates sections (ignore) | |
| continue | |
| if line.startswith(" "): | |
| # continuation line: append to current value | |
| if current_key is not None: | |
| current_value.append(line[1:]) | |
| else: | |
| # commit previous key/value | |
| if current_key is not None: | |
| if current_key.endswith("-Version"): | |
| try: | |
| manifest_dict[current_key] = parse_maven_version( | |
| "".join(current_value).strip() | |
| ) | |
| except pversion.InvalidVersion: | |
| manifest_dict[current_key] = "".join(current_value).strip() | |
| else: | |
| manifest_dict[current_key] = "".join(current_value).strip() | |
| # start new key/value | |
| if ":" in line: | |
| key, value = line.split(":", 1) | |
| current_key = key.strip() | |
| current_value = [value.strip()] | |
| else: | |
| current_key = None # malformed line, ignore | |
| # commit last entry | |
| if current_key is not None: | |
| manifest_dict[current_key] = "".join(current_value).strip() | |
| # Handle splitting dependencies | |
| manifest_dict["Plugin-Dependencies"] = parse_plugin_dependencies( | |
| manifest_dict.get("Plugin-Dependencies", "") | |
| ) | |
| return manifest_dict | |
| def parse_maven_version(version: str) -> pversion.Version: | |
| return pversion.parse(version.replace(".v", "+").rstrip("_")) | |
| def parse_plugin_dependencies(dependencies_str: str): | |
| """ | |
| Parse the MANIFEST.MF Plugin-Dependencies field into a list of dictionaries. | |
| """ | |
| deps = [] | |
| if not dependencies_str: | |
| return deps | |
| # split by commas not within parentheses (safe for Jenkins manifests) | |
| parts = re.split(r",\s*", dependencies_str) | |
| for dep in parts: | |
| if not dep.strip(): | |
| continue | |
| # Split into name, version, and optional flags | |
| name_version, *flags = dep.split(";") | |
| if ":" in name_version: | |
| name, version = name_version.split(":", 1) | |
| else: | |
| name, version = name_version, None | |
| dep_info = { | |
| "name": name.strip(), | |
| "version": version.strip() if version else None, | |
| } | |
| # parse flags like resolution:=optional | |
| for flag in flags: | |
| if ":=" in flag: | |
| k, v = flag.split(":=", 1) | |
| dep_info[k.strip()] = v.strip() | |
| deps.append(dep_info) | |
| return deps | |
| def parse_version(version: str) -> tuple[int, int, int]: | |
| """ | |
| Given a jenkins version string, parse into integers. | |
| Missing patch version is assumed to be zero. | |
| """ | |
| ver = [int(x) for x in version.split(".")] | |
| if len(ver) == 2: | |
| return (ver[0], ver[1], 0) | |
| if len(ver) != 3: | |
| raise ValueError("Invalid jenkins version number: more than three parts") | |
| parse_maven_version(version) | |
| return tuple(ver) # type: ignore | |
| def get_plugin_manifest(plugin_file: Path): | |
| zip = zipfile.ZipFile(plugin_file) | |
| if "META-INF/MANIFEST.MF" not in zip.namelist(): | |
| raise RuntimeError("Plugin does not have MANIFEST.MF") | |
| return parse_manifest(zip.read("META-INF/MANIFEST.MF").decode("utf-8")) | |
| def sha256(to_hash: Path | bytes) -> str: | |
| """Calculate the SHA256 hash of a file or byte array""" | |
| hash = hashlib.sha256() | |
| if isinstance(to_hash, Path): | |
| hash.update(to_hash.read_bytes()) | |
| else: | |
| hash.update(to_hash) | |
| return hash.hexdigest() | |
| def download_plugin(plugin_name: str, version: str, target_path: Path): | |
| filename = f"{plugin_name}.jpi" | |
| dest = target_path / filename | |
| if dest.is_file(): | |
| sys.exit("Error: Asked to download missing plugin, but file already exists") | |
| print(f"Fetching {plugin_name}") | |
| url = f"https://updates.jenkins.io/download/plugins/{plugin_name}/{version}/{plugin_name}.hpi" | |
| resp = requests.get(url) | |
| resp.raise_for_status() | |
| dest.write_bytes(resp.content) | |
| # # Hash this | |
| # if sha256(resp.content) != spec["sha256"]: | |
| # sys.exit( | |
| # f"Fatal error: Downloaded file {dest} did not match expected hash {spec['sha256']}" | |
| # ) | |
| print(f"Downloaded {plugin_name} {version}") | |
| def main( | |
| # jenkins_version: Annotated[ | |
| # str, | |
| # typer.Argument(help="Target Jenkins version", metavar="TARGET_JENKINS_VERSION"), | |
| # ], | |
| plugins_dir: Annotated[ | |
| Path, typer.Argument(help="Path to current plugins directory") | |
| ], | |
| download: Annotated[ | |
| bool, | |
| typer.Option(help="Download missing plugins"), | |
| ] = False, | |
| ): | |
| # target = parse_version(jenkins_version) | |
| # # Convert this <= to a strict < | |
| # if jenkins_version.count(".") == 1: | |
| # # If we only specified X.Y, then ask for next minor version | |
| # target = (target[0], target[1] + 1, 0) | |
| # else: | |
| # target = (target[0], target[1], target[2] + 1) | |
| plugins = {} | |
| # Read the current version of all | |
| print("Reading current plugins...") | |
| for plugin in list(plugins_dir.iterdir()): | |
| plugins[plugin.stem] = get_plugin_manifest(plugin) | |
| # Now, walk the whole plugin set again | |
| missing = {} | |
| for plugin, manifest in plugins.items(): | |
| print(f"Checking {plugin}") | |
| for dep in manifest["Plugin-Dependencies"]: | |
| if dep["name"] not in plugins and not dep.get("resolution") == "optional": | |
| print( | |
| f" Plugin [b]{dep['name']}[/b] not found, but is listed required dependency" | |
| ) | |
| existing_ver = missing.setdefault(dep["name"], dep["version"]) | |
| if parse_maven_version(dep["version"]) > parse_maven_version( | |
| existing_ver | |
| ): | |
| missing[dep["name"]] = dep["version"] | |
| if missing: | |
| print("Missing dependencies:") | |
| print(missing) | |
| else: | |
| print("No missing dependencies.") | |
| if missing and not download: | |
| print("\nPass [i]--download[/i] to automatically download these.") | |
| if missing and download: | |
| for plugin, version in missing.items(): | |
| download_plugin(plugin, version, plugins_dir) | |
| if __name__ == "__main__": | |
| typer.run(main) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "bs4", | |
| # "requests", | |
| # "rich", | |
| # "typer", | |
| # "tqdm", | |
| # ] | |
| # /// | |
| from collections.abc import Iterable | |
| import hashlib | |
| import shutil | |
| import sys | |
| from typing import Annotated | |
| import requests | |
| import zipfile | |
| from bs4 import BeautifulSoup | |
| from pathlib import Path | |
| import json | |
| from rich import print | |
| from rich.table import Table | |
| import typer | |
| import tqdm | |
| import zipfile | |
| # Plugins that we know are not present on the Jenkins plugins repo | |
| KNOWN_MISSING = [ | |
| "ace-editor", | |
| "handlebars", | |
| "momentjs", | |
| "windows-slaves", | |
| ] | |
| def get_plugin_page(plugin_name: str) -> str: | |
| """ | |
| Get the version compatibility page for a specific plugin. | |
| Caches to disk in the ./cache/ folder, and reads that in preference | |
| to downloading again. | |
| """ | |
| cache = Path.cwd() / "cache" / f"{plugin_name}.html" | |
| if cache.is_file(): | |
| return cache.read_text() | |
| url = f"https://updates.jenkins.io/download/plugins/{plugin_name}/" | |
| print(f"Fetching {plugin_name} from {url}") | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| cache.parent.mkdir(parents=True, exist_ok=True) | |
| cache.write_text(response.text) | |
| return response.text | |
| def read_plugin(name: str) -> list[dict[str, str]]: | |
| """Read and parse a plugin compatibility page, returning the data.""" | |
| soup = BeautifulSoup(get_plugin_page(name), features="html.parser") | |
| # breakpoint() | |
| data = [] | |
| ul = soup.find("ul", class_="artifact-list") | |
| if not ul: | |
| raise RuntimeError(f"Could not parse versions out of plugin page for {name}") | |
| for li in ul.find_all("li"): | |
| version_link = li.find("a", class_="version") | |
| if not version_link: | |
| continue | |
| version_text = version_link.text.strip() | |
| version_url = version_link["href"] | |
| # Skip placeholder 'permalink to latest' | |
| if "permalink" in version_text.lower(): | |
| continue | |
| def _find(tag) -> bool: | |
| return ( | |
| tag.name == "div" | |
| and tag.get("class") == ["checksums"] | |
| and "SHA-256" in tag.get_text() | |
| ) | |
| sha256_div = li.find(_find) | |
| # breakpoint() | |
| sha256 = sha256_div.code.text | |
| dependency_div = li.find("div", class_="core-dependency") | |
| dependency = ( | |
| dependency_div.get_text(strip=True).replace("Requires Jenkins", "").strip() | |
| ) | |
| data.append( | |
| { | |
| "version": version_text, | |
| "url": version_url, | |
| "sha256": sha256, | |
| "requires": dependency, | |
| } | |
| ) | |
| return data | |
| def get_plugin_lookup(plugins: Iterable[str]) -> dict[str, list[dict[str, str]]]: | |
| """ | |
| Given a list of plugin names, fetch the version compatibility lists. | |
| """ | |
| cache = Path.cwd() / "plugins.json" | |
| out = {} | |
| for plugin in sorted(plugins): | |
| try: | |
| out[plugin] = read_plugin(plugin) | |
| except requests.exceptions.HTTPError as e: | |
| print(f"Error: {e}") | |
| cache.write_text(json.dumps(out, indent=4)) | |
| return out | |
| def parse_version(version: str) -> tuple[int, int, int]: | |
| """ | |
| Given a jenkins version string, parse into integers. | |
| Missing patch version is assumed to be zero. | |
| """ | |
| ver = [int(x) for x in version.split(".")] | |
| if len(ver) == 2: | |
| return (ver[0], ver[1], 0) | |
| if len(ver) != 3: | |
| raise ValueError("Invalid jenkins version number: more than three parts") | |
| return tuple(ver) # type: ignore | |
| def read_plugin_version(plugin_file: Path) -> str: | |
| """Given a Jenkins plugin file, get the plugin version contained within""" | |
| zip = zipfile.ZipFile(plugin_file) | |
| if "META-INF/MANIFEST.MF" not in zip.namelist(): | |
| raise RuntimeError("Plugin does not have MANIFEST.MF") | |
| plugin = [ | |
| x | |
| for x in zip.read("META-INF/MANIFEST.MF").decode("utf-8").splitlines() | |
| if x.startswith("Plugin-Version:") | |
| ][0] | |
| return plugin.split(":", maxsplit=1)[1].strip() | |
| def sha256(to_hash: Path | bytes) -> str: | |
| """Calculate the SHA256 hash of a file or byte array""" | |
| hash = hashlib.sha256() | |
| if isinstance(to_hash, Path): | |
| hash.update(to_hash.read_bytes()) | |
| else: | |
| hash.update(to_hash) | |
| return hash.hexdigest() | |
| def main( | |
| jenkins_version: Annotated[ | |
| str, | |
| typer.Argument(help="Target Jenkins version", metavar="TARGET_JENKINS_VERSION"), | |
| ], | |
| plugins_dir: Annotated[ | |
| Path, typer.Argument(help="Path to current plugins directory") | |
| ], | |
| download: Annotated[ | |
| bool, | |
| typer.Option(help="Download all plugins compatible with this target version"), | |
| ] = False, | |
| exist_ok: bool = False, | |
| ): | |
| target = parse_version(jenkins_version) | |
| # Convert this <= to a strict < | |
| if jenkins_version.count(".") == 1: | |
| # If we only specified X.Y, then ask for next minor version | |
| target = (target[0], target[1] + 1, 0) | |
| else: | |
| target = (target[0], target[1], target[2] + 1) | |
| current_plugins = {} | |
| # Read the current version of all | |
| print("Reading current plugins...") | |
| for plugin in list(plugins_dir.iterdir()): | |
| if plugin.stem in KNOWN_MISSING: | |
| print( | |
| f"[yellow]Skipping plugin known to be missing online: {plugin.stem}[/yellow]" | |
| ) | |
| continue | |
| if plugin.suffix not in {".hpi", ".jpi"}: | |
| continue | |
| current_plugins[plugin.stem] = read_plugin_version(plugin) | |
| print("Deriving lookup tables...") | |
| data = get_plugin_lookup(current_plugins.keys()) | |
| # For each plugin, find the first version that is compatible with this Jenkins | |
| table = Table(title=f"Plugin Versions vs Jenkins {jenkins_version}") | |
| table.add_column("Plugin") | |
| table.add_column("Current Version") | |
| table.add_column("Compatible Version") | |
| table.add_column("Latest Release") | |
| results = {} | |
| count_latest = 0 | |
| count_compatible = 0 | |
| for plugin, plugin_data in data.items(): | |
| for spec in plugin_data: | |
| # < compare, because we mutated target to account for this | |
| if parse_version(spec["requires"]) < target: | |
| results[plugin] = spec | |
| cur_ver = current_plugins.get(plugin, "-") | |
| if cur_ver == plugin_data[0]["version"]: | |
| style = "green" | |
| count_latest += 1 | |
| elif cur_ver == spec["version"]: | |
| style = "yellow" | |
| count_compatible += 1 | |
| else: | |
| style = "red" | |
| table.add_row( | |
| plugin, | |
| cur_ver, | |
| spec["version"], | |
| plugin_data[0]["version"], | |
| style=style, | |
| ) | |
| break | |
| assert results.keys() == data.keys() | |
| maxlen = max(len(x) for x in results.keys()) | |
| print(table) | |
| print( | |
| f"[green]{count_latest:4}[/green] plugins already fully up-to-date (latest release)." | |
| ) | |
| print( | |
| f"[yellow]{count_compatible:4}[/yellow] plugins already the latest version compatible with {jenkins_version}." | |
| ) | |
| print( | |
| f"[red]{len(results) - count_compatible - count_latest:4}[/red] plugins out-of-date." | |
| ) | |
| if not download: | |
| print( | |
| f"\nPass [i]--download[/i] to automatically download all compatible versions to ./{jenkins_version}/" | |
| ) | |
| else: | |
| target_path = Path(jenkins_version) | |
| if target_path.exists() and not exist_ok: | |
| print( | |
| f"[red]Error: Cannot download as target path {target_path} already exists. Pass [i]--exist-ok[/i] to ignore this.[/red]", | |
| ) | |
| return | |
| print(f"Downloading all plugins to {target_path}") | |
| target_path.mkdir(exist_ok=True) | |
| for plugin, spec in ( | |
| progress := tqdm.tqdm(sorted(results.items()), leave=False) | |
| ): | |
| filename = spec["url"].split("/")[-1] | |
| assert filename == f"{plugin}.hpi" | |
| dest = target_path / filename | |
| if dest.is_file(): | |
| dest_hex = sha256(dest) | |
| if dest_hex == spec["sha256"]: | |
| progress.write(f"Cached {plugin} {spec['version']} is OK") | |
| continue | |
| print(f"Warning: {dest_hex} != expected {spec['sha256']}") | |
| # Does our source plugins dir have this? Use that if sha256 matches | |
| if (exist_dest := (plugins_dir / filename)).is_file() and sha256( | |
| exist_dest | |
| ) == spec["sha256"]: | |
| shutil.copy2(exist_dest, dest) | |
| progress.write(f"Existing {plugin} {spec['version']} is OK") | |
| continue | |
| progress.desc = f"Fetching {plugin:{maxlen}}" | |
| progress.refresh() | |
| resp = requests.get(spec["url"]) | |
| resp.raise_for_status() | |
| dest.write_bytes(resp.content) | |
| # Hash this | |
| if sha256(resp.content) != spec["sha256"]: | |
| sys.exit( | |
| f"Fatal error: Downloaded file {dest} did not match expected hash {spec['sha256']}" | |
| ) | |
| progress.write(f"Downloaded {plugin} {spec['version']}") | |
| print(f"Done. Written compatible to {target_path}") | |
| if __name__ == "__main__": | |
| typer.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment