Created
January 14, 2026 13:16
-
-
Save dmd/620fa7fe5f273f74e8be199df06bdd9d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import json | |
| import re | |
| from pathlib import Path | |
| import xml.etree.ElementTree as ET | |
| from datetime import datetime | |
| DOCBOOK_NS = "http://docbook.org/ns/docbook" | |
| NS = {"db": DOCBOOK_NS} | |
| def parse_number(text): | |
| if text is None: | |
| return None | |
| match = re.search(r"[-+]?\d*\.?\d+", text) | |
| if not match: | |
| return None | |
| return float(match.group(0)) | |
| def truncate_number(value, decimals=4): | |
| if value is None: | |
| return None | |
| if isinstance(value, int): | |
| return value | |
| factor = 10 ** decimals | |
| truncated = int(value * factor) / factor | |
| if truncated.is_integer(): | |
| return int(truncated) | |
| return truncated | |
| def find_table_by_title(root, title): | |
| for table in root.findall(".//db:table", NS): | |
| t = table.find("db:title", NS) | |
| if t is not None and (t.text or "").strip() == title: | |
| return table | |
| return None | |
| def get_table_label_value(table, label): | |
| for row in table.findall(".//db:row", NS): | |
| entries = [ | |
| (e.text or "").strip() | |
| for e in row.findall("db:entry", NS) | |
| ] | |
| if len(entries) >= 2 and entries[0] == label: | |
| return entries[1] | |
| return None | |
| def get_column_values(table, header_name): | |
| header_entries = [ | |
| (e.text or "").strip() | |
| for e in table.findall(".//db:thead//db:entry", NS) | |
| ] | |
| col_idx = None | |
| for i, h in enumerate(header_entries): | |
| if h == header_name or h.startswith(header_name): | |
| col_idx = i | |
| break | |
| if col_idx is None: | |
| return [] | |
| values = [] | |
| for row in table.findall(".//db:tbody/db:row", NS): | |
| entries = [ | |
| (e.text or "").strip() | |
| for e in row.findall("db:entry", NS) | |
| ] | |
| if len(entries) > col_idx: | |
| values.append(entries[col_idx]) | |
| return values | |
| def load_qasnr_reports(study_path): | |
| return sorted(study_path.rglob("QASnrReport.xml")) | |
| def load_qaghost_reports(study_path): | |
| return sorted(study_path.rglob("QAGhostReport.xml")) | |
| def extract_frequency(qasnr_paths): | |
| for path in qasnr_paths: | |
| root = ET.parse(path).getroot() | |
| table = find_table_by_title(root, "System Information") | |
| if table is None: | |
| continue | |
| value = get_table_label_value(table, "Frequency") | |
| if value: | |
| return parse_number(value) | |
| return None | |
| def extract_normalized_snr_max(qasnr_paths): | |
| max_value = None | |
| for path in qasnr_paths: | |
| root = ET.parse(path).getroot() | |
| table = find_table_by_title(root, "Image specific SNR values") | |
| if table is None: | |
| continue | |
| values = get_column_values(table, "Normalized SNR") | |
| for v in values: | |
| num = parse_number(v) | |
| if num is None: | |
| continue | |
| if max_value is None or num > max_value: | |
| max_value = num | |
| if max_value is None: | |
| return None | |
| if max_value.is_integer(): | |
| return int(max_value) | |
| return max_value | |
| def extract_inhomogeneity_min(qasnr_paths): | |
| min_value = None | |
| min_fraction = None | |
| for path in qasnr_paths: | |
| root = ET.parse(path).getroot() | |
| table = find_table_by_title(root, "Slice specific Homogeneity values") | |
| if table is None: | |
| continue | |
| values = get_column_values(table, "Homogeneity") | |
| for v in values: | |
| num = parse_number(v) | |
| if num is None: | |
| continue | |
| if min_value is None or num < min_value: | |
| min_value = num | |
| min_fraction = num / 100.0 if "%" in v else num | |
| return min_fraction | |
| def extract_maxghost(first_ghost_path): | |
| root = ET.parse(first_ghost_path).getroot() | |
| table = find_table_by_title(root, "Ghost Quantification") | |
| if table is None: | |
| return None | |
| values = get_column_values(table, "Max. Ghost ROI / Max. Signal") | |
| max_value = None | |
| max_fraction = None | |
| for v in values: | |
| num = parse_number(v) | |
| if num is None: | |
| continue | |
| if max_value is None or num > max_value: | |
| max_value = num | |
| max_fraction = num / 100.0 if "%" in v else num | |
| return max_fraction | |
| def pick_first_ghost_report(ghost_paths, study_path): | |
| candidates = [] | |
| for p in ghost_paths: | |
| try: | |
| rel = p.relative_to(study_path) | |
| except ValueError: | |
| continue | |
| if not rel.parts: | |
| continue | |
| scan_dir = rel.parts[0] | |
| if scan_dir.isdigit(): | |
| candidates.append((int(scan_dir), p)) | |
| if not candidates: | |
| return None | |
| candidates.sort(key=lambda x: x[0]) | |
| return candidates[0][1] | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Extract QA metrics from a study directory." | |
| ) | |
| parser.add_argument("study_path", help="Path to the study directory") | |
| args = parser.parse_args() | |
| study_path = Path(args.study_path) | |
| if not study_path.exists(): | |
| raise SystemExit(f"Study path does not exist: {study_path}") | |
| study_name = study_path.name | |
| match = re.match(r"(\d{8})_(\d{6})", study_name) | |
| if not match: | |
| raise SystemExit("Study name does not start with YYYYMMDD_HHmmSS.") | |
| study_date, study_time = match.groups() | |
| study_dt = datetime.strptime( | |
| f"{study_date}{study_time}", | |
| "%Y%m%d%H%M%S", | |
| ) | |
| try: | |
| from zoneinfo import ZoneInfo | |
| study_dt = study_dt.replace(tzinfo=ZoneInfo("America/New_York")) | |
| datetime_value = study_dt.isoformat(timespec="milliseconds") | |
| except Exception: | |
| datetime_value = study_dt.strftime("%Y-%m-%dT%H:%M:%S.000") | |
| qasnr_paths = load_qasnr_reports(study_path) | |
| if not qasnr_paths: | |
| raise SystemExit("No QASnrReport.xml files found.") | |
| ghost_paths = load_qaghost_reports(study_path) | |
| if not ghost_paths: | |
| raise SystemExit("No QAGhostReport.xml files found.") | |
| frequency = extract_frequency(qasnr_paths) | |
| normalized_snr = extract_normalized_snr_max(qasnr_paths) | |
| inhomogeneity = extract_inhomogeneity_min(qasnr_paths) | |
| first_ghost = pick_first_ghost_report(ghost_paths, study_path) | |
| if first_ghost is None: | |
| raise SystemExit("No numbered QAGhostReport.xml files found.") | |
| maxghost = extract_maxghost(first_ghost) | |
| if frequency is None: | |
| raise SystemExit("Frequency not found.") | |
| if normalized_snr is None: | |
| raise SystemExit("Normalized SNR not found.") | |
| if inhomogeneity is None: | |
| raise SystemExit("Inhomogeneity not found.") | |
| if maxghost is None: | |
| raise SystemExit("Max ghost value not found.") | |
| output = { | |
| "study": study_name, | |
| "datetime": datetime_value, | |
| "frequency": truncate_number(frequency), | |
| "normalized_snr": truncate_number(normalized_snr), | |
| "inhomogeneity": truncate_number(inhomogeneity), | |
| "maxghost": truncate_number(maxghost), | |
| } | |
| output_path = Path.cwd() / f"{study_path.name}-qa.json" | |
| with output_path.open("w", encoding="utf-8") as f: | |
| json.dump(output, f, indent=2) | |
| f.write("\n") | |
| print(output_path) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment