|
#!/usr/bin/env python3 |
|
|
|
from dataclasses import dataclass |
|
import os |
|
from pathlib import Path |
|
import re |
|
import subprocess |
|
import sys |
|
from typing import Literal, Optional |
|
|
|
from pydantic import BaseModel |
|
import pydantic |
|
|
|
|
|
class Plan(BaseModel): |
|
commits: list["Commit"] |
|
|
|
|
|
class Commit(BaseModel): |
|
sha: str |
|
ts: int |
|
msg: str |
|
|
|
def env(self) -> dict[str, str]: |
|
return { |
|
**os.environ.copy(), |
|
"GPG_FAKETIME": str(self.ts), |
|
"GIT_AUTHOR_DATE": f"{self.ts} +0200", |
|
"GIT_COMMITTER_DATE": f"{self.ts} +0200", |
|
} |
|
|
|
|
|
def main() -> None: |
|
p = plan() |
|
|
|
status = git("status") |
|
if "interactive rebase in progress" in status.stdout: |
|
if "fix conflicts" in status.stdout: |
|
raise NotImplementedError("fix conflicts") |
|
|
|
elif "currently editing a commit" in status.stdout: |
|
ts = get_current_commit_ts() |
|
commit = next((c for c in p.commits if c.ts == ts), None) |
|
if not commit: |
|
raise ValueError(f"Could not find commit with timestamp {ts} in plan") |
|
do_amend(commit) |
|
result = git("rebase", "--continue", check=False) |
|
|
|
elif "all conflicts fixed" in status.stdout: |
|
commit = get_current_rebase_commit(p) |
|
if not commit: |
|
raise ValueError("Could not find current rebase commit in plan") |
|
do_commit(commit) |
|
result = git("rebase", "--continue", check=False) |
|
else: |
|
raise NotImplementedError( |
|
f"unexpected git status output:\n" |
|
f"stdout: {status.stdout.strip()}\n" |
|
f"stderr: {status.stderr.strip()}" |
|
) |
|
else: |
|
result = do_rebase() |
|
|
|
while True: |
|
commit = None |
|
ok = result.returncode == 0 |
|
msg = "Applied commit" if ok else "Failed to apply commit" |
|
|
|
if ok: |
|
ts = get_current_commit_ts() |
|
commit = next((c for c in p.commits if c.ts == ts), None) |
|
if not commit: |
|
raise NotImplementedError( |
|
f"could not find commit with timestamp {ts} in plan" |
|
) |
|
|
|
elif m := re.search("could not apply ([0-9a-f]+)", result.stderr): |
|
assert m is not None, "Failed to parse commit SHA from error message" |
|
short_sha = m.group(1) |
|
commit = next((c for c in p.commits if c.sha.startswith(short_sha)), None) |
|
if not commit: |
|
raise NotImplementedError( |
|
f"could not find commit with SHA {short_sha} in plan" |
|
) |
|
|
|
elif ( |
|
"The previous cherry-pick is now empty, possibly due to conflict resolution" |
|
in result.stderr |
|
): |
|
result = git("rebase", "--skip", check=False) |
|
continue |
|
|
|
else: |
|
raise NotImplementedError( |
|
f"Unexpected error message format:\n" |
|
f"stdout: {result.stdout.strip()}\n" |
|
f"stderr: {result.stderr.strip()}" |
|
) |
|
|
|
print(f"\n\n{80 * '-'}") |
|
print(f"{msg} {commit.sha[:7]}: {commit.msg}\n") |
|
|
|
edit: bool = ok |
|
|
|
if not ok: |
|
resp = input("\nCheckout theirs and edit? [Y/n] ").strip().lower() |
|
if resp in ("y", "yes", ""): |
|
git("checkout", "--theirs", "**/*.rs") |
|
edit = True |
|
else: |
|
if input("\nContinue? [Y/n] ").strip().lower() not in ("y", "yes"): |
|
print("Aborting") |
|
return |
|
|
|
if edit and do_edit(): |
|
if input("\nContinue? [Y/n] ").strip().lower() not in ("y", "yes"): |
|
print("Aborting") |
|
return |
|
|
|
if ok: |
|
do_amend(commit) |
|
else: |
|
if not do_commit(commit): |
|
print("Nothing to commit, skipping...") |
|
|
|
result = git("rebase", "--continue", check=False) |
|
|
|
|
|
def get_current_rebase_commit(plan: Plan) -> Optional[Commit]: |
|
with open(".git/rebase-merge/done") as f: |
|
done = f.read().strip().splitlines() |
|
if not done: |
|
return None |
|
latest_sha = done[-1].strip().split(maxsplit=2) |
|
return next(c for c in plan.commits if c.sha == latest_sha[1]) |
|
|
|
|
|
def do_rebase() -> subprocess.CompletedProcess[str]: |
|
# Begin a new rebase |
|
tmp = Path("/tmp/change_rebase_todo.sh") |
|
tmp.write_text("""#!/usr/bin/env bash |
|
# This script is auto-generated by scrub.py. |
|
sed -i '' "s/^pick /edit /g" "$1" |
|
""") |
|
tmp.chmod(0o755) |
|
|
|
# Start a rebase |
|
return git( |
|
"rebase", |
|
"--interactive", |
|
"--root", |
|
env={**os.environ.copy(), "GIT_SEQUENCE_EDITOR": str(tmp)}, |
|
) |
|
|
|
|
|
def do_edit() -> Optional[Literal["sus", "edited", "aborted"]]: |
|
# Process each include statement |
|
@dataclass |
|
class Edit: |
|
rs: str |
|
abs_old_txt: str |
|
old_txt: str |
|
new_txt: str |
|
|
|
def find(rs: str, old_txt: str) -> Optional[Edit]: |
|
""" |
|
Given a Rust source file and the included text file (relative to the |
|
Rust file), this function returns a tuple with the new path for the text |
|
file (relative to the Rust file). |
|
|
|
Examples: |
|
- rs=src/day01.rs, txt="input/day01.txt" -> "../input/2020/01.txt" |
|
- rs=year2020/day01.rs, txt="input/day01.txt" -> "../input/2020/01.txt" |
|
- rs=src/year2020/day01.rs, txt="input/day01.txt" -> "../../input/2020/01.txt" |
|
""" |
|
m = re.search(r"(year(?P<year>\d{4})/)?day?(?P<day>\d+)", rs) |
|
if not m: |
|
m = re.search(r"(?P<year>\d{4})/(?P<day>\d{2})", rs) |
|
if not m: |
|
return None |
|
|
|
day = m.group("day").zfill(2) # Ensure day is two digits |
|
year = m.group("year") or "2020" # Default to 2020 if no year is found |
|
|
|
depth = len(Path(rs).parent.parts) |
|
new_txt = f"{'../' * depth}input/{year}/{day}.txt" |
|
|
|
abs_old_txt = Path(rs).parent.joinpath(old_txt).as_posix() |
|
|
|
return Edit( |
|
rs=rs, |
|
abs_old_txt=abs_old_txt, |
|
old_txt=old_txt, |
|
new_txt=new_txt, |
|
) |
|
|
|
# Find Rust files with include_str! directives |
|
includes = subprocess.run( |
|
[ |
|
"rg", |
|
"--no-heading", |
|
"-Pn", |
|
r'include_str!\s*\(\s*"(?!(\.\./)+input)[^"]*"', |
|
"--glob", |
|
"*.rs", |
|
], |
|
capture_output=True, |
|
text=True, |
|
).stdout.splitlines() |
|
includes = [incl.split(":", maxsplit=2) for incl in includes] |
|
|
|
edits: list[Edit] = [] |
|
for rs, _, line in includes: |
|
m = re.search(r'include_str!\s*\(\s*"([^"]*)"', line) |
|
if not m: |
|
print(f"warning: could not parse include_str! in {rs}: {line}") |
|
continue |
|
old_txt = m.group(1) |
|
if not old_txt.endswith(".txt") or "ex" in Path(old_txt).name: |
|
continue |
|
if edit := find(rs, old_txt): |
|
edits.append(edit) |
|
else: |
|
print(f"warning: could not find a suitable edit for {rs}: {line}") |
|
|
|
# Print the edits |
|
if not edits: |
|
# But are there files? |
|
files = [ |
|
f |
|
for f in subprocess.run( |
|
["fd", "-g", "*.txt"], check=True, capture_output=True, text=True |
|
).stdout.splitlines() |
|
if "ex" not in Path(f).name |
|
] |
|
|
|
return "sus" if files else None |
|
|
|
print("Edits to be made:") |
|
for edit in edits: |
|
print( |
|
f" {edit.rs}: {edit.old_txt} -> {edit.new_txt}, remove {edit.abs_old_txt}" |
|
) |
|
|
|
result = input("\nMake these edits? [Y/n] ") |
|
if result.strip().lower() not in ("y", "yes"): |
|
print("Aborting edits") |
|
return "aborted" |
|
|
|
# Perform the edits |
|
for edit in edits: |
|
with open(edit.rs, "r") as f: |
|
old_content = f.read() |
|
pattern = re.compile( |
|
rf'(include_str!\s*\(\s*"){re.escape(edit.old_txt)}("\s*\))' |
|
) |
|
new_content = pattern.sub( |
|
lambda m: f"{m.group(1)}{edit.new_txt}{m.group(2)}", |
|
old_content, |
|
count=1, |
|
) |
|
with open(edit.rs, "w") as f: |
|
f.write(new_content) |
|
|
|
print(f"updated {edit.rs}: {edit.old_txt} -> {edit.new_txt}") |
|
try: |
|
os.remove(edit.abs_old_txt) |
|
except FileNotFoundError: |
|
print(f"warning: {edit.abs_old_txt} not found") |
|
|
|
git("rm", edit.abs_old_txt, check=False) |
|
git("add", edit.rs) |
|
|
|
# Run cargo check |
|
subprocess.run( |
|
["cargo", "check", "--workspace", "--all-targets"], |
|
check=False, |
|
) |
|
return "edited" |
|
|
|
|
|
def do_amend(commit: Commit) -> None: |
|
git("commit", "-S", "--amend", "--no-edit", env=commit.env()) |
|
|
|
|
|
def do_commit(commit: Commit) -> bool: |
|
result = git("commit", "-S", "-m", commit.msg, env=commit.env(), check=False) |
|
if result.returncode > 0: |
|
if "nothing to commit" in result.stdout: |
|
return False |
|
else: |
|
result.check_returncode() |
|
return True |
|
|
|
|
|
def plan() -> Plan: |
|
p = Path("/tmp/git-rebase-edit-plan.json") |
|
|
|
try: |
|
with p.open("r") as f: |
|
return Plan.model_validate_json(f.read()) |
|
except (FileNotFoundError, pydantic.ValidationError): |
|
pass |
|
|
|
sep = "#" * 8 |
|
result = git( |
|
"log", |
|
"--topo-order", |
|
"--no-show-signature", |
|
"--no-notes", |
|
"--reverse", |
|
f"--format={sep}\n%H\n%at\n%B", |
|
"trunk", |
|
) |
|
commits = [] |
|
for commit in result.stdout.strip().split(sep + "\n"): |
|
if not commit.strip(): |
|
continue |
|
sha, ts, msg = commit.split("\n", maxsplit=2) |
|
commits.append(Commit(sha=sha.strip(), ts=int(ts.strip()), msg=msg.strip())) |
|
|
|
plan = Plan(commits=commits) |
|
with p.open("w") as f: |
|
f.write(plan.model_dump_json(indent=2)) |
|
|
|
return plan |
|
|
|
|
|
def get_current_commit_ts() -> int: |
|
result = git( |
|
"show", "-s", "--no-show-signature", "--no-notes", "--format=%at", "HEAD" |
|
) |
|
return int(result.stdout.strip()) |
|
|
|
|
|
def get_commit(sha: str) -> Commit: |
|
ts = get_commit_ts(sha) |
|
msg = get_commit_message(sha) |
|
return Commit(sha=sha, ts=ts, msg=msg) |
|
|
|
|
|
def get_commit_message(sha: str) -> str: |
|
result = git("log", "-1", "--no-show-signature", "--no-notes", "--format=%B", sha) |
|
return result.stdout.strip() |
|
|
|
|
|
def get_commit_ts(sha: str) -> int: |
|
result = git("log", "-1", "--no-show-signature", "--no-notes", "--format=%at", sha) |
|
return int(result.stdout.strip()) |
|
|
|
|
|
def git(*args, **kwargs) -> subprocess.CompletedProcess[str]: |
|
kwargs.setdefault("capture_output", True) |
|
kwargs.setdefault("check", True) |
|
return subprocess.run(["git", *args], text=True, **kwargs) |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
main() |
|
except subprocess.CalledProcessError as e: |
|
print(f"Error:{e}\nstdout:{e.stdout.strip()}\nstderr:{e.stderr.strip()}") |
|
exit(1) |