|
#!/usr/bin/env python3 |
|
""" |
|
A utility to detect changes in Kubernetes resources in Git repos using `dyff`. |
|
|
|
This script runs a user‑supplied command twice: once against the current |
|
working directory (including staged and unstaged changes) and once against a |
|
clean Git revision (default is `HEAD`). It compares the two YAML outputs |
|
using `dyff between` and prints the result. Optionally, the intermediate |
|
YAML files can be retained for inspection. |
|
|
|
Usage example: |
|
|
|
./git_dyff.py -r main -k -- helm template path/to/chart |
|
|
|
The script's options must appear before a `--` separator. Everything after |
|
`--` is treated as the command to run. The command should write its YAML |
|
output to stdout. |
|
|
|
Exit codes: |
|
0 – no differences (or only exit code from `dyff`) |
|
1 – differences detected by `dyff` |
|
>1 – an error occurred during execution |
|
|
|
This program requires `git`, `dyff`, and Python's `yaml` module (PyYAML). |
|
""" |
|
|
|
import argparse |
|
import os |
|
import shutil |
|
import subprocess |
|
import sys |
|
import tempfile |
|
from typing import List, Optional |
|
|
|
try: |
|
import yaml # type: ignore |
|
except ImportError as exc: # pragma: no cover |
|
sys.stderr.write( |
|
"Error: PyYAML is required to run this script. Install it with `pip install pyyaml`\n" |
|
) |
|
raise |
|
|
|
|
|
def run_command(cmd: List[str], *, cwd: Optional[str] = None) -> subprocess.CompletedProcess: |
|
"""Run a command and return its CompletedProcess, capturing stdout and stderr. |
|
|
|
Args: |
|
cmd: The command and its arguments to run. |
|
cwd: An optional working directory. |
|
|
|
Returns: |
|
A subprocess.CompletedProcess object. |
|
""" |
|
return subprocess.run( |
|
cmd, |
|
cwd=cwd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
) |
|
|
|
|
|
def validate_yaml(content: str) -> None: |
|
"""Validate that a string contains valid YAML. |
|
|
|
Raises a ValueError if the input is not valid YAML. |
|
|
|
Args: |
|
content: YAML content as a string. |
|
""" |
|
try: |
|
# safe_load_all will parse all YAML documents in the stream. |
|
list(yaml.safe_load_all(content)) |
|
except yaml.YAMLError as exc: |
|
raise ValueError(f"Invalid YAML output: {exc}") from exc |
|
|
|
|
|
def get_git_ref() -> str: |
|
"""Return the current commit hash. |
|
|
|
Returns: |
|
The SHA1 of the current HEAD. |
|
""" |
|
result = run_command(["git", "rev-parse", "HEAD"]) |
|
if result.returncode != 0: |
|
raise RuntimeError(f"Failed to determine current git commit: {result.stderr.strip()}") |
|
return result.stdout.strip() |
|
|
|
|
|
def get_git_branch() -> str: |
|
"""Return the current branch name or 'HEAD' if detached. |
|
|
|
Returns: |
|
The symbolic name of the current branch or 'HEAD'. |
|
""" |
|
result = run_command(["git", "rev-parse", "--abbrev-ref", "HEAD"]) |
|
if result.returncode != 0: |
|
raise RuntimeError(f"Failed to determine current git branch: {result.stderr.strip()}") |
|
return result.stdout.strip() |
|
|
|
|
|
def has_uncommitted_changes() -> bool: |
|
"""Check if the repository has uncommitted (staged or unstaged) changes. |
|
|
|
Returns: |
|
True if there are changes, False otherwise. |
|
""" |
|
status = run_command(["git", "status", "--porcelain"]) |
|
# Any output indicates either staged or unstaged changes |
|
return bool(status.stdout.strip()) |
|
|
|
|
|
def stash_changes() -> bool: |
|
"""Stash all changes, including untracked files, if there are any. |
|
|
|
Returns: |
|
True if a stash was created, False otherwise. |
|
""" |
|
if not has_uncommitted_changes(): |
|
return False |
|
# Create a stash. The message helps identify it in the stash list. |
|
result = run_command(["git", "stash", "push", "-u", "-m", "git-dyff-temp"]) |
|
if result.returncode != 0: |
|
raise RuntimeError(f"Failed to stash changes: {result.stderr.strip()}") |
|
return True |
|
|
|
|
|
def pop_stash() -> None: |
|
"""Pop the most recent stash entry. |
|
|
|
Raises: |
|
RuntimeError: If the stash cannot be popped. |
|
""" |
|
result = run_command(["git", "stash", "pop", "--quiet"]) |
|
if result.returncode != 0: |
|
# Attempt to apply stash without dropping to allow manual resolution if needed. |
|
# This fallback is best-effort; if it still fails, propagate the error. |
|
apply_result = run_command(["git", "stash", "apply", "--quiet"]) |
|
if apply_result.returncode != 0: |
|
raise RuntimeError( |
|
f"Failed to restore stashed changes: {result.stderr.strip()}" |
|
) |
|
# drop the stash entry after applying |
|
run_command(["git", "stash", "drop", "--quiet"]) |
|
|
|
|
|
def checkout_revision(revision: str) -> None: |
|
"""Checkout a given revision. |
|
|
|
Args: |
|
revision: The revision to checkout (branch, tag, or commit). |
|
|
|
Raises: |
|
RuntimeError: If the checkout fails. |
|
""" |
|
result = run_command(["git", "checkout", revision, "--quiet"]) |
|
if result.returncode != 0: |
|
raise RuntimeError( |
|
f"Failed to checkout revision '{revision}': {result.stderr.strip()}" |
|
) |
|
|
|
|
|
def run_and_capture_yaml(command: List[str], output_path: str) -> None: |
|
"""Run a command and write its stdout to a file after validating YAML. |
|
|
|
Args: |
|
command: The command and its arguments to run. |
|
output_path: Path where the YAML output will be saved. |
|
|
|
Raises: |
|
RuntimeError: If the command fails or the output is not valid YAML. |
|
""" |
|
result = run_command(command) |
|
if result.returncode != 0: |
|
raise RuntimeError( |
|
f"Command {' '.join(command)} failed with exit code {result.returncode}\n" |
|
f"stderr:\n{result.stderr.strip()}" |
|
) |
|
# Validate YAML |
|
try: |
|
validate_yaml(result.stdout) |
|
except ValueError as exc: |
|
raise RuntimeError(str(exc)) from exc |
|
# Write to file |
|
with open(output_path, "w", encoding="utf-8") as f: |
|
f.write(result.stdout) |
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> int: |
|
"""Entry point. Parses arguments and orchestrates the diff process. |
|
|
|
Args: |
|
argv: Optional list of arguments to parse (for testing). |
|
|
|
Returns: |
|
An integer exit code. |
|
""" |
|
parser = argparse.ArgumentParser( |
|
description=( |
|
"Compare the output of a command (e.g. `helm template`) between the current working " |
|
"tree and a specified git revision using dyff." |
|
), |
|
epilog=( |
|
"Examples:\n" |
|
" git_dyff.py -- helm template ./chart\n" |
|
" git_dyff.py -r main -k -- helm template ./chart -f values.yaml\n" |
|
"\n" |
|
"All script options must appear before the `--` separator. Everything after `--` " |
|
"is treated as the command to run." |
|
), |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
allow_abbrev=False, |
|
) |
|
parser.add_argument( |
|
"-r", |
|
"--revision", |
|
default="HEAD", |
|
help="Git revision to compare against (default: HEAD)", |
|
) |
|
parser.add_argument( |
|
"-k", |
|
"--keep-temp", |
|
action="store_true", |
|
help=( |
|
"Keep the temporary directory and print the paths of the generated YAML files. " |
|
"By default, the temporary directory is removed." |
|
), |
|
) |
|
parser.add_argument( |
|
"--", |
|
dest="separator", |
|
help=argparse.SUPPRESS, |
|
) |
|
parser.add_argument( |
|
"command", |
|
nargs=argparse.REMAINDER, |
|
help="The command to run for generating YAML (e.g. helm template ...)", |
|
) |
|
args = parser.parse_args(argv) |
|
|
|
# The remainder of the arguments contains the command. Argparse includes the |
|
# separator (`--`) as the first element if present; remove it. |
|
command = args.command |
|
if command and command[0] == "--": |
|
command = command[1:] |
|
|
|
if not command: |
|
parser.error( |
|
"No command specified. Use `--` to separate script options from the command." |
|
) |
|
|
|
# Create a temporary directory for outputs |
|
temp_dir_obj: Optional[tempfile.TemporaryDirectory] = None |
|
if args.keep_temp: |
|
# Create a persistent temporary directory |
|
temp_dir = tempfile.mkdtemp(prefix="git_dyff_") |
|
else: |
|
temp_dir_obj = tempfile.TemporaryDirectory(prefix="git_dyff_") |
|
temp_dir = temp_dir_obj.name |
|
|
|
old_yaml_path = os.path.join(temp_dir, "old.yaml") |
|
new_yaml_path = os.path.join(temp_dir, "new.yaml") |
|
|
|
# Record the original git state |
|
try: |
|
original_branch = get_git_branch() |
|
original_commit = get_git_ref() |
|
except RuntimeError as exc: |
|
sys.stderr.write(f"{exc}\n") |
|
return 2 |
|
|
|
stash_created = False |
|
switched_revision = False |
|
diff_result: Optional[subprocess.CompletedProcess] = None |
|
|
|
try: |
|
# Run the command against current working directory |
|
run_and_capture_yaml(command, new_yaml_path) |
|
|
|
# Stash current changes and switch to the target revision if necessary |
|
stash_created = stash_changes() |
|
target_revision = args.revision |
|
|
|
# Determine if we need to checkout a different revision |
|
# If the target revision resolves to the same commit as current HEAD, skip checkout |
|
try: |
|
resolved_target = run_command(["git", "rev-parse", target_revision]) |
|
except Exception: |
|
raise RuntimeError(f"Invalid revision '{target_revision}' specified.") |
|
if resolved_target.returncode != 0: |
|
raise RuntimeError( |
|
f"Failed to resolve revision '{target_revision}': {resolved_target.stderr.strip()}" |
|
) |
|
target_commit = resolved_target.stdout.strip() |
|
if target_commit != original_commit: |
|
checkout_revision(target_revision) |
|
switched_revision = True |
|
|
|
# Run the command against the clean revision |
|
run_and_capture_yaml(command, old_yaml_path) |
|
|
|
# Compare the two YAML files using dyff. Do not capture stdout/stderr so |
|
# that dyff can detect whether its output is attached to a TTY and choose |
|
# appropriate colourisation. The output will be printed directly to |
|
# Python's own stdout/stderr. |
|
diff_result = subprocess.run( |
|
["dyff", "between", old_yaml_path, new_yaml_path] |
|
) |
|
except RuntimeError as exc: |
|
# Print error message to stderr |
|
sys.stderr.write(f"Error: {exc}\n") |
|
return 2 |
|
finally: |
|
# Restore git state: switch back to original branch/commit and apply stash |
|
try: |
|
if switched_revision: |
|
# If the original branch is 'HEAD', we were detached; checkout the commit directly |
|
if original_branch == "HEAD": |
|
checkout_revision(original_commit) |
|
else: |
|
checkout_revision(original_branch) |
|
except RuntimeError as exc: |
|
sys.stderr.write(f"Warning: failed to switch back to original revision: {exc}\n") |
|
try: |
|
if stash_created: |
|
pop_stash() |
|
except RuntimeError as exc: |
|
sys.stderr.write(f"Warning: failed to restore stashed changes: {exc}\n") |
|
|
|
# Clean up the temporary directory if not requested to keep it |
|
if not args.keep_temp and temp_dir_obj is not None: |
|
temp_dir_obj.cleanup() |
|
|
|
# At this point diff_result should be set if no fatal errors occurred |
|
if diff_result is None: |
|
# Should not happen; treat as error |
|
sys.stderr.write("Unexpected error: diff_result is None\n") |
|
return 3 |
|
|
|
# Dyff's output has already been written to stdout/stderr by the subprocess call |
|
# above. There is nothing further to print here. |
|
|
|
# If user wants to keep temp files, print their locations |
|
if args.keep_temp: |
|
sys.stderr.write( |
|
f"\nTemporary YAML files kept in: {temp_dir}\n" |
|
f" old: {old_yaml_path}\n" |
|
f" new: {new_yaml_path}\n" |
|
) |
|
|
|
# Propagate dyff exit code (0 for no diff, 1 for diff, 255 for error) |
|
return diff_result.returncode |
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover |
|
sys.exit(main()) |