Created
January 14, 2026 13:29
-
-
Save dmd/3e9e2817896ef51eb2248a67a6a4e60c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import html | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import textwrap | |
| import readline | |
| from bisect import bisect_left, bisect_right | |
| try: | |
| from prompt_toolkit import prompt as pt_prompt | |
| HAS_PROMPT_TOOLKIT = True | |
| except Exception: | |
| HAS_PROMPT_TOOLKIT = False | |
| import subprocess | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dataclasses import dataclass | |
| from datetime import date, datetime, time, timedelta | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional, Tuple | |
| from zoneinfo import ZoneInfo | |
| import shutil | |
| from threading import Lock | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402 | |
| from calcium_events import build_vevent, build_vevents, parse_calcium_events | |
| from tqdm import tqdm | |
| from create_scandium_clinical import extract_scanned_initials, has_contrast_indicator | |
| DEFAULT_RESOURCE = "P1 Prisma" | |
| DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = { | |
| "P1 Prisma": "data/3T_Prisma_P1.Events", | |
| "P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events", | |
| "4T Varian": "data/4T_Varian_Inova.Events", | |
| } | |
| DEFAULT_TZ = "America/New_York" | |
| PI_ATTRIBUTE_ID = 1 | |
| SCANNED_INITIALS_ATTRIBUTE_ID = 2 | |
| BOLD = "\033[1m" | |
| RESET = "\033[0m" | |
| ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") | |
| GREEN = "\033[92m" | |
| ORANGE = "\033[93m" | |
| BLUE = "\033[94m" | |
| CACHE_LOCK = Lock() | |
| EMAIL_RE = re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.IGNORECASE) | |
| INITIALS_RE = re.compile(r"(?:Confirmed[- ]|[^A-Z])([A-Z]{2,3})(?:[^A-Z]|$)") | |
| EMAIL_DOMAIN_SWAPS = { | |
| "mgb.org": "mclean.harvard.edu", | |
| "mclean.harvard.edu": "mgb.org", | |
| } | |
| @dataclass(frozen=True) | |
| class EventView: | |
| start: datetime | |
| end: datetime | |
| title: str | |
| description: Optional[str] | |
| details: List[Tuple[str, str]] | |
| recurrence: str | |
| reference: Optional[str] | |
| compare_fields: Dict[str, str] | |
| ai_payload: Dict[str, object] | |
| calcium_event: Optional[Dict[str, str]] | |
| scandium_detail: Optional[Dict[str, object]] | |
| @dataclass(frozen=True) | |
| class ParsedCalciumVEvent: | |
| event: Dict[str, str] | |
| start_dt: datetime | |
| end_dt: datetime | |
| duration: timedelta | |
| rrule: Optional[str] | |
| rule: Optional[Dict] | |
| until_date: Optional[date] | |
| exdates: set[date] | |
| def parse_date(value: str) -> date: | |
| return datetime.strptime(value, "%Y-%m-%d").date() | |
| def parse_time_value(value: str) -> time: | |
| return datetime.strptime(value, "%H:%M").time() | |
| def normalize_summary(summary: str) -> str: | |
| normalized = summary.replace("\\n", "\n") | |
| normalized = normalized.replace("\\,", ",").replace("\\;", ";").replace("\\\\", "\\") | |
| normalized = html.unescape(normalized) | |
| return normalized.strip() | |
| def extract_emails(text: str) -> List[str]: | |
| return EMAIL_RE.findall(text or "") | |
| def contains_spi_blocker(text: str) -> bool: | |
| if not text: | |
| return False | |
| lowered = text.lower() | |
| if "acr qa" in lowered: | |
| return True | |
| if "hd cleanup" in lowered or "hd clean up" in lowered: | |
| return True | |
| return False | |
| def swap_email_domain(email: str) -> Optional[str]: | |
| if "@" not in email: | |
| return None | |
| local, domain = email.rsplit("@", 1) | |
| swapped = EMAIL_DOMAIN_SWAPS.get(domain.lower()) | |
| if not swapped: | |
| return None | |
| return f"{local}@{swapped}" | |
| def extract_initials(text: str) -> List[str]: | |
| if not text: | |
| return [] | |
| if contains_spi_blocker(text): | |
| return [] | |
| cleaned = re.sub(r"fbirn\s*&\s*hd\s*cleanup", "", text, flags=re.IGNORECASE) | |
| return [match.group(1) for match in INITIALS_RE.finditer(cleaned)] | |
| def extract_patient_source(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| cleaned = text.strip() | |
| for prefix in ("OP-", "IP-"): | |
| if cleaned.startswith(prefix): | |
| return prefix[:-1] | |
| for prefix in ("OP ", "IP "): | |
| if cleaned.startswith(prefix): | |
| return prefix.strip() | |
| if "-" in cleaned: | |
| lead = cleaned.split("-", 1)[0].strip() | |
| if lead and lead.isalnum(): | |
| return lead | |
| return None | |
| def parse_clinical_prefix(summary: str) -> tuple[Optional[str], Optional[str], Optional[str], str]: | |
| lines = summary.splitlines() if summary else [] | |
| if not lines: | |
| return None, None, None, "" | |
| first = lines[0] | |
| match = re.match( | |
| r"^\s*(?P<pat>[A-Za-z0-9]+)\s*-\s*(?P<body>.+?)\s*-\s*(?P<spi>[A-Za-z]{2,3})(?P<rest>.*)$", | |
| first, | |
| ) | |
| if not match: | |
| return None, None, None, summary | |
| pat = match.group("pat").strip() | |
| body = match.group("body").strip() | |
| spi = match.group("spi").strip() | |
| rest = match.group("rest").strip().lstrip("-").strip() | |
| new_lines: list[str] = [] | |
| if rest: | |
| new_lines.append(rest) | |
| new_lines.extend(lines[1:]) | |
| return pat, body, spi, "\n".join(new_lines).strip() | |
| def extract_body_part(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| cleaned = text.strip() | |
| for prefix in ("OP-", "IP-"): | |
| if cleaned.startswith(prefix): | |
| parts = cleaned.split("-", 2) | |
| if len(parts) >= 2: | |
| return parts[1].strip() | |
| if "-" in cleaned: | |
| parts = cleaned.split("-", 2) | |
| if len(parts) >= 2: | |
| return parts[1].strip() | |
| return None | |
| def normalize_compare_value(value: str) -> str: | |
| cleaned = (value or "").lower() | |
| cleaned = re.sub(r"\band\b", "", cleaned) | |
| return re.sub(r"[^a-z0-9]+", "", cleaned) | |
| def canonical_lines( | |
| details: Optional[List[Tuple[str, str]]], | |
| swap_body_label: bool = False, | |
| ) -> List[str]: | |
| if not details: | |
| return ["missing"] | |
| lines: List[str] = [] | |
| for label, value in details: | |
| clean_label = label | |
| if swap_body_label: | |
| if clean_label == "BOD": | |
| clean_label = "Body part" | |
| elif clean_label == "Body part": | |
| clean_label = "BOD" | |
| parts = (value or "").splitlines() or [""] | |
| for idx, part in enumerate(parts): | |
| if idx == 0: | |
| lines.append(f"{clean_label}: {part}".strip()) | |
| else: | |
| lines.append(part.strip()) | |
| return lines | |
| def canonical_hash( | |
| cal: Optional[EventView], | |
| sca: Optional[EventView], | |
| swap_body_label: bool = False, | |
| ) -> str: | |
| left = "\n".join(canonical_lines(cal.details if cal else None, swap_body_label)) | |
| right = "\n".join(canonical_lines(sca.details if sca else None, swap_body_label)) | |
| payload = f"LEFT\n{left}\nRIGHT\n{right}" | |
| import hashlib | |
| return hashlib.sha256(payload.encode("utf-8")).hexdigest() | |
| def resolve_accepted_hash( | |
| cal: Optional[EventView], | |
| sca: Optional[EventView], | |
| accepted_hashes: set[str], | |
| ) -> Optional[str]: | |
| primary = canonical_hash(cal, sca, swap_body_label=False) | |
| if primary in accepted_hashes: | |
| return primary | |
| alternate = canonical_hash(cal, sca, swap_body_label=True) | |
| if alternate in accepted_hashes: | |
| return alternate | |
| return None | |
| def load_accepted_hashes(path: Path) -> set[str]: | |
| if not path.exists(): | |
| return set() | |
| try: | |
| return {line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()} | |
| except OSError: | |
| return set() | |
| def store_accepted_hash(path: Path, hash_value: str) -> None: | |
| with path.open("a", encoding="utf-8") as handle: | |
| handle.write(hash_value + "\n") | |
| def parse_exdate_dates(value: str) -> set[date]: | |
| if not value: | |
| return set() | |
| dates: set[date] = set() | |
| for part in value.split(","): | |
| chunk = part.strip() | |
| if len(chunk) >= 8: | |
| try: | |
| dates.add(datetime.strptime(chunk[:8], "%Y%m%d").date()) | |
| except ValueError: | |
| continue | |
| return dates | |
| def parse_ics_datetime(value: str, tz: ZoneInfo) -> Optional[datetime]: | |
| if not value: | |
| return None | |
| if "T" not in value: | |
| return None | |
| if value.endswith("Z"): | |
| iso = ics_datetime_to_iso(value) | |
| dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z") | |
| return dt.astimezone(tz) | |
| dt = datetime.strptime(value, "%Y%m%dT%H%M%S") | |
| return dt.replace(tzinfo=tz) | |
| def parse_api_datetime(value: str, tz: ZoneInfo) -> Optional[datetime]: | |
| if not value: | |
| return None | |
| cleaned = value | |
| if cleaned.endswith("Z"): | |
| cleaned = cleaned[:-1] + "+0000" | |
| if len(cleaned) >= 6 and cleaned[-3] == ":": | |
| cleaned = cleaned[:-3] + cleaned[-2:] | |
| try: | |
| dt = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z") | |
| except ValueError: | |
| return None | |
| return dt.astimezone(tz) | |
| def parse_termination_date(value: Optional[str], tz: ZoneInfo) -> Optional[date]: | |
| if not value: | |
| return None | |
| cleaned = value | |
| if cleaned.endswith("Z"): | |
| cleaned = cleaned[:-1] + "+0000" | |
| if cleaned[-3] == ":": | |
| cleaned = cleaned[:-3] + cleaned[-2:] | |
| return datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date() | |
| def parse_booked_until_date(value: Optional[str]) -> Optional[date]: | |
| if not value: | |
| return None | |
| cleaned = value | |
| if cleaned.endswith("Z"): | |
| cleaned = cleaned[:-1] + "+0000" | |
| if cleaned[-3] == ":": | |
| cleaned = cleaned[:-3] + cleaned[-2:] | |
| try: | |
| dt = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z") | |
| except ValueError: | |
| return None | |
| return dt.date() | |
| def parse_ics_until(value: str) -> Optional[date]: | |
| if not value: | |
| return None | |
| cleaned = value.strip() | |
| if len(cleaned) >= 8: | |
| try: | |
| return datetime.strptime(cleaned[:8], "%Y%m%d").date() | |
| except ValueError: | |
| return None | |
| return None | |
| def format_weekdays(weekdays: Iterable[int]) -> str: | |
| names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] | |
| output = [] | |
| for day in weekdays: | |
| if 0 <= day < len(names): | |
| output.append(names[day]) | |
| return ", ".join(output) | |
| def format_ics_recurrence(rrule: Optional[str], start_dt: datetime) -> str: | |
| if not rrule: | |
| return "none" | |
| parts = {} | |
| for piece in rrule.split(";"): | |
| if "=" in piece: | |
| key, value = piece.split("=", 1) | |
| parts[key] = value | |
| freq = parts.get("FREQ", "").upper() | |
| interval = parts.get("INTERVAL") or "1" | |
| byday = parts.get("BYDAY", "") | |
| until = parts.get("UNTIL", "") | |
| freq_map = { | |
| "DAILY": "daily", | |
| "WEEKLY": "weekly", | |
| "MONTHLY": "monthly", | |
| "YEARLY": "yearly", | |
| } | |
| label = freq_map.get(freq, "none") | |
| if label == "none": | |
| return "none" | |
| tokens = [f"{label} every {interval}"] | |
| if label == "weekly": | |
| day_map = {"SU": 0, "MO": 1, "TU": 2, "WE": 3, "TH": 4, "FR": 5, "SA": 6} | |
| days = [] | |
| if byday: | |
| for code in byday.split(","): | |
| code = code.strip().upper() | |
| if code in day_map: | |
| days.append(day_map[code]) | |
| if not days: | |
| days = [(start_dt.weekday() + 1) % 7] | |
| readable = format_weekdays(days) | |
| if readable: | |
| tokens.append(f"on {readable}") | |
| until_date = parse_ics_until(until) | |
| if until_date: | |
| tokens.append(f"until {until_date.isoformat()}") | |
| return " ".join(tokens) | |
| def recurrence_occurs_on( | |
| target_date: date, | |
| start_date: date, | |
| rule: Dict, | |
| ) -> bool: | |
| if target_date < start_date: | |
| return False | |
| interval = int(rule.get("interval", 1)) | |
| rtype = rule.get("type") | |
| if rtype == "daily": | |
| diff_days = (target_date - start_date).days | |
| return diff_days % interval == 0 | |
| if rtype == "weekly": | |
| weekdays = rule.get("weekdays") or [] | |
| booked_weekday = (target_date.weekday() + 1) % 7 | |
| if booked_weekday not in weekdays: | |
| return False | |
| diff_days = (target_date - start_date).days | |
| return diff_days % (interval * 7) == 0 | |
| if rtype == "monthly": | |
| if target_date.day != start_date.day: | |
| return False | |
| months = (target_date.year - start_date.year) * 12 + ( | |
| target_date.month - start_date.month | |
| ) | |
| return months % interval == 0 | |
| if rtype == "yearly": | |
| if (target_date.month, target_date.day) != (start_date.month, start_date.day): | |
| return False | |
| return (target_date.year - start_date.year) % interval == 0 | |
| return False | |
| def calcium_occurrence_for_date( | |
| event: Dict[str, str], | |
| target_date: date, | |
| tz: ZoneInfo, | |
| ) -> Optional[Tuple[datetime, datetime]]: | |
| start_raw = event.get("DTSTART", "") | |
| end_raw = event.get("DTEND", "") | |
| start_dt = parse_ics_datetime(start_raw, tz) | |
| end_dt = parse_ics_datetime(end_raw, tz) | |
| if not start_dt or not end_dt: | |
| return None | |
| duration = end_dt - start_dt | |
| rrule = event.get("RRULE") | |
| exdates = parse_exdate_dates(event.get("EXDATE", "")) | |
| if not rrule: | |
| if start_dt.date() != target_date: | |
| return None | |
| return start_dt, end_dt | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) | |
| until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) | |
| if until_date and target_date > until_date: | |
| return None | |
| if target_date in exdates: | |
| return None | |
| if not recurrence_occurs_on(target_date, start_dt.date(), rule): | |
| return None | |
| new_start = datetime.combine(target_date, start_dt.timetz()) | |
| new_end = new_start + duration | |
| return new_start, new_end | |
| def build_event_view_from_calcium_occurrence( | |
| event: Dict[str, str], | |
| start_dt: datetime, | |
| end_dt: datetime, | |
| ) -> EventView: | |
| summary = normalize_summary(event.get("SUMMARY", "")) or "(no summary)" | |
| description = event.get("DESCRIPTION") | |
| categories = event.get("CATEGORIES") | |
| rrule = event.get("RRULE") | |
| recurrence_label = format_ics_recurrence(rrule, start_dt) | |
| details: List[Tuple[str, str]] = [] | |
| if categories: | |
| details.append(("TYP", categories)) | |
| details.append(("SUM", summary)) | |
| if recurrence_label != "none": | |
| details.append(("REC", recurrence_label)) | |
| compare_fields = { | |
| "TYP": categories or "", | |
| "SUM": summary or "", | |
| "REC": recurrence_label or "none", | |
| } | |
| patient_source = extract_patient_source(summary or "") or "" | |
| body_part = extract_body_part(summary or "") or "" | |
| spi_values = extract_initials(summary or "") | |
| if categories and categories.strip().lower() == "open": | |
| patient_source = "" | |
| body_part = "" | |
| spi_values = [] | |
| elif categories and "Clinical" in categories: | |
| pat_parsed, body_parsed, spi_parsed, _cleaned = parse_clinical_prefix(summary or "") | |
| if pat_parsed: | |
| patient_source = pat_parsed | |
| if body_parsed: | |
| body_part = body_parsed | |
| if spi_parsed: | |
| spi_values = [spi_parsed] | |
| ai_payload = { | |
| "categories": categories or "", | |
| "summary": summary or "", | |
| "description": description or "", | |
| "emails": extract_emails(summary or ""), | |
| "initials": spi_values, | |
| "patient_source": patient_source, | |
| "body_part": body_part, | |
| "recurrence": recurrence_label or "none", | |
| } | |
| return EventView( | |
| start=start_dt, | |
| end=end_dt, | |
| title=summary, | |
| description=description, | |
| details=details, | |
| recurrence=recurrence_label, | |
| reference=None, | |
| compare_fields=compare_fields, | |
| ai_payload=ai_payload, | |
| calcium_event=event, | |
| scandium_detail=None, | |
| ) | |
| def build_calcium_events_for_date( | |
| events_path: Path, | |
| preferences_path: Optional[Path], | |
| tz: ZoneInfo, | |
| target_date: date, | |
| start_time: Optional[time], | |
| end_time: Optional[time], | |
| ) -> List[EventView]: | |
| vevents = build_vevents( | |
| events_path, | |
| category_filter=None, | |
| tz_name=str(tz.key), | |
| preferences_path=preferences_path, | |
| exclude_categories=None, | |
| debug=False, | |
| use_utc=True, | |
| ) | |
| results: List[EventView] = [] | |
| extra_open: List[Dict[str, str]] = [] | |
| for calcium_event in parse_calcium_events( | |
| events_path, | |
| category_filter=None, | |
| exclude_categories=None, | |
| preferences_path=preferences_path, | |
| debug=False, | |
| ): | |
| if calcium_event.categories: | |
| continue | |
| summary_text = (calcium_event.text or "").lstrip() | |
| if summary_text[:4].lower() != "open": | |
| continue | |
| for vevent in build_vevent( | |
| calcium_event, | |
| tz, | |
| category_value="Open", | |
| use_utc=True, | |
| debug=False, | |
| ): | |
| if vevent: | |
| extra_open.append(vevent) | |
| def append_event(event: Dict[str, str]) -> None: | |
| occurrence = calcium_occurrence_for_date(event, target_date, tz) | |
| if not occurrence: | |
| return | |
| start_dt, end_dt = occurrence | |
| if start_time and end_time: | |
| if start_dt.timetz().replace(tzinfo=None) != start_time: | |
| return | |
| if end_dt.timetz().replace(tzinfo=None) != end_time: | |
| return | |
| results.append(build_event_view_from_calcium_occurrence(event, start_dt, end_dt)) | |
| for item in vevents: | |
| append_event(item.event) | |
| for event in extra_open: | |
| append_event(event) | |
| return results | |
| def build_calcium_events_by_date( | |
| events_path: Path, | |
| preferences_path: Optional[Path], | |
| tz: ZoneInfo, | |
| dates: Iterable[date], | |
| start_time: Optional[time], | |
| end_time: Optional[time], | |
| ) -> Dict[date, List[EventView]]: | |
| dates_sorted = sorted(dates) | |
| results: Dict[date, List[EventView]] = {value: [] for value in dates_sorted} | |
| if not dates_sorted: | |
| return results | |
| calcium_events = parse_calcium_events( | |
| events_path, | |
| category_filter=None, | |
| exclude_categories=None, | |
| preferences_path=preferences_path, | |
| debug=False, | |
| ) | |
| vevents: List[Dict[str, str]] = [] | |
| for calcium_event in calcium_events: | |
| category_value = calcium_event.categories[0] if calcium_event.categories else None | |
| if category_value: | |
| for vevent in build_vevent( | |
| calcium_event, | |
| tz, | |
| category_value=category_value, | |
| use_utc=True, | |
| debug=False, | |
| ): | |
| if vevent: | |
| vevents.append(vevent) | |
| if calcium_event.categories: | |
| continue | |
| summary_text = (calcium_event.text or "").lstrip() | |
| if summary_text[:4].lower() != "open": | |
| continue | |
| for vevent in build_vevent( | |
| calcium_event, | |
| tz, | |
| category_value="Open", | |
| use_utc=True, | |
| debug=False, | |
| ): | |
| if vevent: | |
| vevents.append(vevent) | |
| parsed: List[ParsedCalciumVEvent] = [] | |
| for event in vevents: | |
| start_dt = parse_ics_datetime(event.get("DTSTART", ""), tz) | |
| end_dt = parse_ics_datetime(event.get("DTEND", ""), tz) | |
| if not start_dt or not end_dt: | |
| continue | |
| rrule = event.get("RRULE") | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) if rrule else None | |
| until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) if rule else None | |
| parsed.append( | |
| ParsedCalciumVEvent( | |
| event=event, | |
| start_dt=start_dt, | |
| end_dt=end_dt, | |
| duration=end_dt - start_dt, | |
| rrule=rrule, | |
| rule=rule, | |
| until_date=until_date, | |
| exdates=parse_exdate_dates(event.get("EXDATE", "")), | |
| ) | |
| ) | |
| min_date = dates_sorted[0] | |
| max_date = dates_sorted[-1] | |
| for item in parsed: | |
| start_date = item.start_dt.date() | |
| if not item.rrule: | |
| if start_date not in results: | |
| continue | |
| if start_time and end_time: | |
| if item.start_dt.timetz().replace(tzinfo=None) != start_time: | |
| continue | |
| if item.end_dt.timetz().replace(tzinfo=None) != end_time: | |
| continue | |
| results[start_date].append( | |
| build_event_view_from_calcium_occurrence(item.event, item.start_dt, item.end_dt) | |
| ) | |
| continue | |
| range_start = max(start_date, min_date) | |
| range_end = min(item.until_date, max_date) if item.until_date else max_date | |
| if range_end < range_start: | |
| continue | |
| start_idx = bisect_left(dates_sorted, range_start) | |
| end_idx = bisect_right(dates_sorted, range_end) | |
| for target_date in dates_sorted[start_idx:end_idx]: | |
| if target_date in item.exdates: | |
| continue | |
| if not item.rule or not recurrence_occurs_on(target_date, start_date, item.rule): | |
| continue | |
| new_start = datetime.combine(target_date, item.start_dt.timetz()) | |
| new_end = new_start + item.duration | |
| if start_time and end_time: | |
| if new_start.timetz().replace(tzinfo=None) != start_time: | |
| continue | |
| if new_end.timetz().replace(tzinfo=None) != end_time: | |
| continue | |
| results[target_date].append( | |
| build_event_view_from_calcium_occurrence(item.event, new_start, new_end) | |
| ) | |
| return results | |
| def format_recurrence(rule: Optional[Dict], start_dt: datetime) -> str: | |
| if not rule: | |
| return "none" | |
| rule_type = rule.get("type") or "none" | |
| if rule_type == "none": | |
| return "none" | |
| parts = [rule_type] | |
| interval = rule.get("interval") | |
| if interval: | |
| parts.append(f"every {interval}") | |
| weekdays = rule.get("weekdays") or [] | |
| if rule_type == "weekly": | |
| normalized_days: List[int] = [] | |
| for day in weekdays: | |
| try: | |
| normalized_days.append(int(day)) | |
| except (TypeError, ValueError): | |
| continue | |
| if not normalized_days: | |
| normalized_days = [(start_dt.weekday() + 1) % 7] | |
| readable = format_weekdays(normalized_days) | |
| if readable: | |
| parts.append(f"on {readable}") | |
| until = rule.get("repeatTerminationDate") | |
| if until: | |
| until_date = parse_booked_until_date(until) | |
| if until_date: | |
| parts.append(f"until {until_date.isoformat()}") | |
| return " ".join(parts) | |
| def build_scandium_event_view_from_detail( | |
| detail: Dict[str, object], | |
| tz: ZoneInfo, | |
| ) -> Optional[EventView]: | |
| start_raw = detail.get("startDate") or detail.get("startDateTime") | |
| end_raw = detail.get("endDate") or detail.get("endDateTime") | |
| start_dt = parse_api_datetime(start_raw, tz) if start_raw else None | |
| end_dt = parse_api_datetime(end_raw, tz) if end_raw else None | |
| if not start_dt or not end_dt: | |
| return None | |
| title = (detail.get("title") or "").strip() | |
| description = detail.get("description") | |
| reference = detail.get("referenceNumber") or None | |
| participant_items = detail.get("participants", []) | |
| invitee_items = detail.get("invitees", []) | |
| participants: List[str] = [] | |
| for entry in list(participant_items) + list(invitee_items): | |
| if isinstance(entry, dict): | |
| first = (entry.get("firstName") or "").strip() | |
| last = (entry.get("lastName") or "").strip() | |
| email = (entry.get("emailAddress") or "").strip() | |
| label = " ".join(piece for piece in [first, last] if piece).strip() | |
| if email: | |
| if label: | |
| label = f"{label} <{email}>" | |
| else: | |
| label = email | |
| if label: | |
| participants.append(label) | |
| elif isinstance(entry, str): | |
| label = entry.strip() | |
| if label: | |
| participants.append(label) | |
| participant_text = ", ".join(p.strip() for p in participants if p.strip()) | |
| label_map = { | |
| "patient source": "PAT", | |
| "scanned person initials": "SPI", | |
| "body part": "BOD", | |
| "contrast": "CON", | |
| } | |
| attributes: List[Tuple[str, str]] = [] | |
| scan_type_value: Optional[str] = None | |
| spi_value: Optional[str] = None | |
| patient_source: Optional[str] = None | |
| body_part: Optional[str] = None | |
| for attr in detail.get("customAttributes", []): | |
| label = (attr.get("label") or "").strip() | |
| value = (attr.get("value") or "").strip() | |
| label_key = label.lower() | |
| if not label or not value: | |
| continue | |
| if label_key == "scan type / pi" and not scan_type_value: | |
| scan_type_value = value | |
| continue | |
| if label_key == "scanned person initials" and not spi_value: | |
| spi_value = value | |
| if label_key == "patient source" and not patient_source: | |
| patient_source = value | |
| if label_key == "body part" and not body_part: | |
| body_part = value | |
| attributes.append((label_map.get(label_key, label), value)) | |
| recurrence = format_recurrence(detail.get("recurrenceRule"), start_dt) | |
| details: List[Tuple[str, str]] = [] | |
| compare_fields: Dict[str, str] = {} | |
| if scan_type_value: | |
| details.append(("TYP", scan_type_value)) | |
| compare_fields["TYP"] = scan_type_value | |
| if scan_type_value and scan_type_value.strip().lower() == "notice": | |
| if description and title and description.strip() == title.strip(): | |
| title = "" | |
| else: | |
| if description and title and description.strip() == title.strip(): | |
| description = "" | |
| if description: | |
| details.append(("SUM", description)) | |
| compare_fields["SUM"] = description | |
| if title: | |
| details.append(("TIT", title)) | |
| if not compare_fields.get("SUM"): | |
| compare_fields["SUM"] = title | |
| if recurrence != "none": | |
| details.append(("REC", recurrence)) | |
| compare_fields["REC"] = recurrence | |
| if participant_text: | |
| details.append(("PAR", participant_text)) | |
| for label, value in attributes: | |
| details.append((label, value)) | |
| ai_payload = { | |
| "scan_type_pi": scan_type_value or "", | |
| "description": description or "", | |
| "title": title, | |
| "recurrence": recurrence or "none", | |
| "participants": participants, | |
| "attributes": {label: value for label, value in attributes}, | |
| "spi": spi_value or "", | |
| "patient_source": patient_source or "", | |
| "body_part": body_part or "", | |
| } | |
| return EventView( | |
| start=start_dt, | |
| end=end_dt, | |
| title=title, | |
| description=description, | |
| details=details, | |
| recurrence=recurrence, | |
| reference=reference, | |
| compare_fields=compare_fields, | |
| ai_payload=ai_payload, | |
| calcium_event=None, | |
| scandium_detail=detail, | |
| ) | |
| def build_scandium_events_for_date( | |
| api: BookedAPI, | |
| resource_name: str, | |
| tz: ZoneInfo, | |
| target_date: date, | |
| start_time: Optional[time], | |
| end_time: Optional[time], | |
| resource: Optional[Dict[str, object]] = None, | |
| ) -> List[EventView]: | |
| if resource is None: | |
| resource = api.find_resource_by_name(resource_name) | |
| if not resource: | |
| raise RuntimeError(f"Resource not found: {resource_name}") | |
| start_date = target_date.strftime("%Y-%m-%d") | |
| end_date = (target_date + timedelta(days=1)).strftime("%Y-%m-%d") | |
| reservations = api.list_reservations( | |
| start_date=start_date, | |
| end_date=end_date, | |
| resource_id=resource["resourceId"], | |
| ) | |
| results: List[EventView] = [] | |
| for reservation in reservations: | |
| reference = reservation.get("referenceNumber", "") | |
| detail = reservation | |
| needs_detail = False | |
| if reference: | |
| for field in ("customAttributes", "participants", "invitees", "recurrenceRule"): | |
| if field not in reservation: | |
| needs_detail = True | |
| break | |
| if not needs_detail: | |
| if not reservation.get("customAttributes") or reservation.get("recurrenceRule") is None: | |
| needs_detail = True | |
| if needs_detail: | |
| try: | |
| detail = api.get_reservation(reference) | |
| except Exception: | |
| detail = reservation | |
| view = build_scandium_event_view_from_detail(detail, tz) | |
| if not view: | |
| continue | |
| if view.start.date() != target_date: | |
| continue | |
| if start_time and end_time: | |
| if view.start.timetz().replace(tzinfo=None) != start_time: | |
| continue | |
| if view.end.timetz().replace(tzinfo=None) != end_time: | |
| continue | |
| results.append(view) | |
| return results | |
| def group_by_time(events: Iterable[EventView]) -> Dict[Tuple[datetime, datetime], List[EventView]]: | |
| grouped: Dict[Tuple[datetime, datetime], List[EventView]] = {} | |
| for event in events: | |
| key = (event.start, event.end) | |
| grouped.setdefault(key, []).append(event) | |
| return grouped | |
| def format_event_label(event: EventView, width: int) -> str: | |
| if not event.details: | |
| title = event.title.strip() if event.title else "(no title)" | |
| if event.description: | |
| return f"{title}\n{event.description.strip()}" | |
| return title | |
| lines: List[str] = [] | |
| for key, value in event.details: | |
| label = f"{key}: " | |
| available = max(10, width - len(label)) | |
| wrapped = textwrap.wrap(value, width=available) or [""] | |
| for idx, chunk in enumerate(wrapped): | |
| prefix = label if idx == 0 else " " * len(label) | |
| lines.append(f"{prefix}{BOLD}{chunk}{RESET}") | |
| return "\n".join(lines) | |
| def build_compare_key(calcium: EventView, scandium: EventView) -> str: | |
| payload = { | |
| "calcium": calcium.ai_payload, | |
| "scandium": scandium.ai_payload, | |
| } | |
| return json.dumps(payload, sort_keys=True) | |
| def build_ai_prompt(calcium: EventView, scandium: EventView) -> str: | |
| return ( | |
| "You are comparing a Calcium calendar event (unstructured) to a Scandium event " | |
| "(structured). Determine whether they refer to the same booking. The Scandium " | |
| "side has structured fields (PAT, Body part, SPI, Participants, Title) that may " | |
| "appear embedded in Calcium SUM. The first words of Calcium SUM often map to " | |
| "Scandium TIT. Scanned person initials might be in Calcium SUM and Scandium SPI. " | |
| "Body part wording can differ (e.g., 'L/Spine' vs 'L Spine'). " | |
| "Emails or names in Calcium SUM often correspond to Scandium Participants. " | |
| "If Scandium fields are contained in Calcium SUM, treat that as a match even if " | |
| "the strings are not identical. If both sides provide PAT or Body part and they " | |
| "conflict, mark as mismatch.\n\n" | |
| "Calcium event:\n" | |
| f"{json.dumps(calcium.ai_payload, indent=2)}\n\n" | |
| "Scandium event:\n" | |
| f"{json.dumps(scandium.ai_payload, indent=2)}\n\n" | |
| "Respond with JSON:\n" | |
| "{\n" | |
| ' "match": true|false,\n' | |
| ' "field_matches": {"TYP": "match|mismatch|unclear", "SUM": "match|mismatch|unclear", "REC": "match|mismatch|unclear"},\n' | |
| ' "notes": "short reason"\n' | |
| "}\n" | |
| ) | |
| def ai_compare( | |
| calcium: EventView, | |
| scandium: EventView, | |
| model: str, | |
| api_key: str, | |
| cache: Dict[str, Dict[str, object]], | |
| ) -> Dict[str, object]: | |
| key = build_compare_key(calcium, scandium) | |
| with CACHE_LOCK: | |
| if key in cache: | |
| return cache[key] | |
| import requests | |
| prompt = build_ai_prompt(calcium, scandium) | |
| response = requests.post( | |
| "https://api.openai.com/v1/responses", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| }, | |
| json={ | |
| "model": model, | |
| "input": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": prompt, | |
| } | |
| ], | |
| } | |
| ], | |
| }, | |
| timeout=30, | |
| ) | |
| if response.status_code != 200: | |
| raise RuntimeError(f"OpenAI error {response.status_code}: {response.text}") | |
| payload = response.json() | |
| text = "" | |
| for item in payload.get("output", []): | |
| if item.get("type") == "message": | |
| for content in item.get("content", []): | |
| if content.get("type") == "output_text": | |
| text += content.get("text", "") | |
| try: | |
| result = json.loads(text) | |
| except json.JSONDecodeError: | |
| result = {"match": False, "field_matches": {}, "notes": "AI response parse error"} | |
| with CACHE_LOCK: | |
| cache[key] = result | |
| return result | |
| def compare_events( | |
| calcium: EventView, | |
| scandium: EventView, | |
| use_ai: bool, | |
| model: str, | |
| api_key: Optional[str], | |
| cache: Dict[str, Dict[str, object]], | |
| ) -> Dict[str, object]: | |
| calcium_spi = set(calcium.ai_payload.get("initials", []) or []) | |
| scandium_spi = (scandium.ai_payload.get("spi") or "").strip() | |
| if scandium_spi and calcium_spi and scandium_spi not in calcium_spi: | |
| return { | |
| "match": False, | |
| "field_matches": {"SUM": "mismatch"}, | |
| "notes": "initials mismatch", | |
| "source": "guard", | |
| } | |
| calcium_typ = calcium.compare_fields.get("TYP", "") | |
| scandium_typ = scandium.compare_fields.get("TYP", "") | |
| if calcium_typ in {"PM / Service", "PM-Service", "PM Service", "PM/Service"} and scandium_typ == "Service": | |
| calcium_typ = "Service" | |
| exact = { | |
| "TYP": calcium_typ == scandium_typ, | |
| "SUM": calcium.compare_fields.get("SUM", "") == scandium.compare_fields.get("SUM", ""), | |
| "REC": calcium.compare_fields.get("REC", "") == scandium.compare_fields.get("REC", ""), | |
| } | |
| if all(exact.values()): | |
| return { | |
| "match": True, | |
| "field_matches": {key: "match" for key in exact}, | |
| "notes": "exact match", | |
| "source": "exact", | |
| } | |
| if not use_ai or not api_key: | |
| mismatched = [key for key, ok in exact.items() if not ok] | |
| return { | |
| "match": False, | |
| "field_matches": {key: "mismatch" if key in mismatched else "match" for key in exact}, | |
| "notes": "exact mismatch", | |
| "source": "exact", | |
| } | |
| try: | |
| result = ai_compare(calcium, scandium, model, api_key, cache) | |
| # Post-check: PAT/body mismatches override AI match when both sides provide values. | |
| calcium_pat = (calcium.ai_payload.get("patient_source") or "").strip() | |
| scandium_pat = (scandium.ai_payload.get("patient_source") or "").strip() | |
| calcium_body = (calcium.ai_payload.get("body_part") or "").strip() | |
| scandium_body = (scandium.ai_payload.get("body_part") or "").strip() | |
| if result.get("match"): | |
| if scandium_pat and calcium_pat: | |
| if normalize_compare_value(scandium_pat) != normalize_compare_value(calcium_pat): | |
| result["match"] = False | |
| field_matches = result.get("field_matches") or {} | |
| field_matches["SUM"] = "mismatch" | |
| result["field_matches"] = field_matches | |
| result["notes"] = "patient source mismatch" | |
| if result.get("match") and scandium_body and calcium_body: | |
| norm_scandium_body = normalize_compare_value(scandium_body) | |
| norm_calcium_body = normalize_compare_value(calcium_body) | |
| if norm_scandium_body != norm_calcium_body: | |
| if norm_scandium_body not in norm_calcium_body and norm_calcium_body not in norm_scandium_body: | |
| result["match"] = False | |
| field_matches = result.get("field_matches") or {} | |
| field_matches["SUM"] = "mismatch" | |
| result["field_matches"] = field_matches | |
| result["notes"] = "body part mismatch" | |
| result["source"] = "ai" | |
| return result | |
| except Exception as exc: | |
| mismatched = [key for key, ok in exact.items() if not ok] | |
| return { | |
| "match": False, | |
| "field_matches": {key: "mismatch" if key in mismatched else "match" for key in exact}, | |
| "notes": f"ai error: {exc}", | |
| "source": "ai-error", | |
| } | |
| def input_with_default(label: str, current: str) -> str: | |
| prompt = f"{label}: " | |
| if HAS_PROMPT_TOOLKIT: | |
| response = pt_prompt(prompt, default=current or "").strip() | |
| else: | |
| def hook() -> None: | |
| readline.insert_text(current) | |
| readline.redisplay() | |
| readline.set_startup_hook(hook) | |
| try: | |
| response = input(prompt).strip() | |
| finally: | |
| readline.set_startup_hook(None) | |
| if response == "": | |
| return current | |
| if response == "-": | |
| return "" | |
| return response | |
| def input_sum_with_default(current: str, calcium_sum: str) -> str: | |
| suffix = " (c=copy from calcium, -=clear)" | |
| label = f"SUM (Description){suffix}" | |
| prompt = f"{label}: " | |
| if HAS_PROMPT_TOOLKIT: | |
| response = pt_prompt(prompt, default=current or "").strip() | |
| if response == "": | |
| return current | |
| else: | |
| def hook() -> None: | |
| readline.insert_text(current) | |
| readline.redisplay() | |
| readline.set_startup_hook(hook) | |
| try: | |
| response = input(prompt).strip() | |
| finally: | |
| readline.set_startup_hook(None) | |
| if response == "": | |
| return current | |
| if response == "-": | |
| return "" | |
| if response.lower() == "c" and calcium_sum: | |
| return calcium_sum | |
| return response | |
| def prompt_yes_no(prompt: str, default: bool = False) -> bool: | |
| suffix = " [Y/n/h]: " if default else " [y/N/h]: " | |
| while True: | |
| response = input(prompt + suffix).strip().lower() | |
| if response == "h": | |
| print("Options: y=yes, n=no.") | |
| continue | |
| if not response: | |
| return default | |
| return response in {"y", "yes"} | |
| def prompt_yes_no_accept( | |
| prompt: str, | |
| default: bool = False, | |
| include_reload: bool = False, | |
| include_clear: bool = False, | |
| include_update_rec: bool = False, | |
| ) -> str: | |
| if include_reload and include_clear and include_update_rec: | |
| suffix = " [y/N/a/r/c/u/h]: " if not default else " [Y/n/a/r/c/u/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, r=reload, c=clear SUM, u=update REC." | |
| elif include_reload and include_clear: | |
| suffix = " [y/N/a/r/c/h]: " if not default else " [Y/n/a/r/c/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, r=reload, c=clear SUM." | |
| elif include_reload and include_update_rec: | |
| suffix = " [y/N/a/r/u/h]: " if not default else " [Y/n/a/r/u/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, r=reload, u=update REC." | |
| elif include_clear and include_update_rec: | |
| suffix = " [y/N/a/c/u/h]: " if not default else " [Y/n/a/c/u/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, c=clear SUM, u=update REC." | |
| elif include_reload: | |
| suffix = " [y/N/a/r/h]: " if not default else " [Y/n/a/r/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, r=reload." | |
| elif include_clear: | |
| suffix = " [y/N/a/c/h]: " if not default else " [Y/n/a/c/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, c=clear SUM." | |
| elif include_update_rec: | |
| suffix = " [y/N/a/u/h]: " if not default else " [Y/n/a/u/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, u=update REC." | |
| else: | |
| suffix = " [y/N/a/h]: " if not default else " [Y/n/a/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept." | |
| while True: | |
| response = input(prompt + suffix).strip().lower() | |
| if response == "h": | |
| print(help_text) | |
| continue | |
| if response == "r" and include_reload: | |
| return "reload" | |
| if response == "c" and include_clear: | |
| return "clear" | |
| if response == "u" and include_update_rec: | |
| return "update_rec" | |
| if response == "a": | |
| return "accept" | |
| if not response: | |
| return "yes" if default else "no" | |
| return "yes" if response in {"y", "yes"} else "no" | |
| def prompt_edit_scope( | |
| is_recurring: bool, | |
| default_all: bool = False, | |
| ) -> str: | |
| if not is_recurring: | |
| return "this" | |
| while True: | |
| prompt = "Edit scope [t]his/[a]ll/[h]: " | |
| if default_all: | |
| prompt = "Edit scope [t]his/[A]ll/[h]: " | |
| response = input(prompt).strip().lower() | |
| if response == "h": | |
| print("Options: t=this instance, a=all instances.") | |
| continue | |
| if not response: | |
| return "full" if default_all else "this" | |
| if response.startswith("a"): | |
| return "full" | |
| return "this" | |
| def update_reservation_raw( | |
| api: BookedAPI, | |
| reference: str, | |
| data: Dict[str, object], | |
| update_scope: str, | |
| ) -> Dict: | |
| return api._request( | |
| "POST", | |
| f"Reservations/{reference}", | |
| params={"updateScope": update_scope}, | |
| json=data, | |
| ) | |
| def update_scandium_event( | |
| api: BookedAPI, | |
| sca: EventView, | |
| tz: ZoneInfo, | |
| fields: Optional[set[str]] = None, | |
| calcium: Optional[EventView] = None, | |
| prefill: Optional[Dict[str, str]] = None, | |
| prompt: bool = True, | |
| update_scope_override: Optional[str] = None, | |
| recurrence_rule_override: Optional[Dict[str, object]] = None, | |
| ) -> Optional[EventView]: | |
| detail = sca.scandium_detail or {} | |
| reference = sca.reference | |
| if not reference: | |
| return sca | |
| title_current = (detail.get("title") or "").strip() | |
| desc_current = (detail.get("description") or "").strip() | |
| typ_current = "" | |
| pat_current = "" | |
| body_current = "" | |
| spi_current = "" | |
| attr_map: Dict[str, Tuple[int, str]] = {} | |
| for attr in detail.get("customAttributes", []): | |
| label = (attr.get("label") or "").strip() | |
| value = (attr.get("value") or "").strip() | |
| if label and value: | |
| attr_map[label.lower()] = (int(attr.get("id")), value) | |
| if "scan type / pi" in attr_map: | |
| typ_current = attr_map["scan type / pi"][1] | |
| if "patient source" in attr_map: | |
| pat_current = attr_map["patient source"][1] | |
| if "body part" in attr_map: | |
| body_current = attr_map["body part"][1] | |
| if "scanned person initials" in attr_map: | |
| spi_current = attr_map["scanned person initials"][1] | |
| if not typ_current: | |
| typ_current = sca.compare_fields.get("TYP", "") | |
| if not pat_current: | |
| pat_current = (sca.ai_payload.get("patient_source") or "") | |
| if not body_current: | |
| body_current = (sca.ai_payload.get("body_part") or "") | |
| if not spi_current: | |
| spi_current = (sca.ai_payload.get("spi") or "") | |
| fields = fields or {"TYP", "PAT", "BODY", "SPI", "SUM"} | |
| typ_input = typ_current | |
| sum_input = desc_current | |
| tit_input = title_current | |
| pat_input = pat_current | |
| body_input = body_current | |
| spi_input = spi_current | |
| par_input = "" | |
| rec_input = sca.recurrence or "none" | |
| if prefill and "TYP" in prefill: | |
| typ_current = prefill["TYP"] | |
| typ_input = prefill["TYP"] | |
| if "TYP" in fields and prompt: | |
| typ_input = input_with_default("TYP (Scan Type / PI)", typ_current) | |
| if prefill and "SPI" in prefill: | |
| spi_current = prefill["SPI"] | |
| spi_input = prefill["SPI"] | |
| if "SPI" in fields and prompt: | |
| spi_input = input_with_default("SPI (Scanned Person Initials)", spi_current) | |
| typ_is_clinical = (typ_input or "").strip().lower() == "clinical" | |
| typ_changed_from_clinical_to_open = ( | |
| (typ_current or "").strip().lower() == "clinical" | |
| and (typ_input or "").strip().lower() == "open" | |
| ) | |
| if prefill and "PAT" in prefill: | |
| pat_current = prefill["PAT"] | |
| pat_input = prefill["PAT"] | |
| if "PAT" in fields and typ_is_clinical and prompt: | |
| pat_input = input_with_default("PAT (Patient Source)", pat_current) | |
| else: | |
| pat_input = "" | |
| if prefill and "BODY" in prefill: | |
| body_current = prefill["BODY"] | |
| body_input = prefill["BODY"] | |
| if "BODY" in fields and typ_is_clinical and prompt: | |
| body_input = input_with_default("Body part", body_current) | |
| else: | |
| body_input = "" | |
| if typ_changed_from_clinical_to_open: | |
| body_input = "" | |
| spi_input = "" | |
| # Prompt SUM last. | |
| if prefill and "SUM" in prefill: | |
| sum_input = prefill["SUM"] | |
| if "SUM" in fields and prompt: | |
| calcium_sum = "" | |
| if calcium: | |
| calcium_sum = calcium.compare_fields.get("SUM", "") | |
| sum_input = input_sum_with_default(sum_input, calcium_sum) | |
| participants_current = [] | |
| for p in list(detail.get("participants", [])) + list(detail.get("invitees", [])): | |
| if isinstance(p, dict): | |
| email = (p.get("emailAddress") or "").strip() | |
| if email: | |
| participants_current.append(email) | |
| elif isinstance(p, str) and p.strip(): | |
| participants_current.append(p.strip()) | |
| par_current = ", ".join(participants_current) | |
| par_input = par_current | |
| rec_current = sca.recurrence or "none" | |
| rec_input = rec_current | |
| if update_scope_override: | |
| update_scope = update_scope_override | |
| else: | |
| update_scope = prompt_edit_scope(bool(detail.get("isRecurring"))) | |
| custom_attrs: List[Dict[str, object]] = [] | |
| label_to_id = { | |
| "Scan Type / PI": 1, | |
| "Patient Source": 11, | |
| "Body part": 10, | |
| "Scanned Person Initials": 2, | |
| } | |
| def apply_attr(label: str, value: str) -> None: | |
| custom_attrs.append({"attributeId": label_to_id[label], "attributeValue": value}) | |
| apply_attr("Scan Type / PI", typ_input or "") | |
| apply_attr("Patient Source", pat_input or "") | |
| apply_attr("Body part", body_input or "") | |
| apply_attr("Scanned Person Initials", spi_input or "") | |
| participant_ids: List[int] = [] | |
| if par_input.strip(): | |
| for raw in par_input.split(","): | |
| email = raw.strip() | |
| if not email: | |
| continue | |
| user = api.find_user_by_email(email) | |
| if user: | |
| participant_ids.append(user["id"]) | |
| else: | |
| print(f"Participant not found: {email}") | |
| recurrence_rule = None | |
| if rec_input.strip().lower() == "none": | |
| recurrence_rule = {"type": "none"} | |
| elif rec_input.strip() and rec_input.strip() != rec_current: | |
| rrule = rec_input.strip() | |
| if rrule.upper().startswith("RRULE:"): | |
| rrule = rrule.split(":", 1)[1] | |
| recurrence_rule = parse_rrule(rrule, sca.start.replace(tzinfo=None)) | |
| data = { | |
| "resourceId": int(detail.get("resourceId")), | |
| "userId": api.get_user_id(), | |
| "startDateTime": detail.get("startDate") or detail.get("startDateTime"), | |
| "endDateTime": detail.get("endDate") or detail.get("endDateTime"), | |
| "title": tit_input, | |
| "description": sum_input or "", | |
| "participants": participant_ids, | |
| "customAttributes": custom_attrs, | |
| } | |
| if recurrence_rule_override is not None: | |
| data["recurrenceRule"] = recurrence_rule_override | |
| elif recurrence_rule is not None: | |
| data["recurrenceRule"] = recurrence_rule | |
| update_reservation_raw(api, reference, data, update_scope) | |
| new_detail = api.get_reservation(reference) | |
| return build_scandium_event_view_from_detail(new_detail, tz) | |
| def build_event_payload_simple( | |
| api: BookedAPI, | |
| event: Dict[str, str], | |
| resource: Dict[str, object], | |
| tz: ZoneInfo, | |
| ) -> Dict[str, object]: | |
| dtstart = event.get("DTSTART", "") | |
| dtend = event.get("DTEND", "") | |
| if not dtstart or not dtend: | |
| raise ValueError("DTSTART and DTEND are required") | |
| start_iso = ics_datetime_to_iso(dtstart) | |
| end_iso = ics_datetime_to_iso(dtend) | |
| summary_raw = event.get("SUMMARY", "").strip() | |
| summary = normalize_summary(summary_raw) | |
| categories = event.get("CATEGORIES", "") | |
| is_open_category = categories.strip().lower() == "open" | |
| summary_clean = summary | |
| title = summary_clean.splitlines()[0].strip() if summary_clean else "" | |
| matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories) | |
| if not matched_pi and categories.strip(): | |
| matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories.split()[0]) | |
| if not matched_pi: | |
| matched_pi = categories.strip() or "Unknown" | |
| initials: List[str] = [] | |
| if not is_open_category: | |
| initials = extract_initials(summary) | |
| custom_attributes = [ | |
| {"attributeId": PI_ATTRIBUTE_ID, "attributeValue": matched_pi} | |
| ] | |
| if initials: | |
| custom_attributes.append( | |
| { | |
| "attributeId": SCANNED_INITIALS_ATTRIBUTE_ID, | |
| "attributeValue": initials[0], | |
| } | |
| ) | |
| participant_ids: List[int] = [] | |
| seen_user_ids: set[int] = set() | |
| if not is_open_category: | |
| for email in extract_emails(summary): | |
| user = api.find_user_by_email(email) | |
| if not user: | |
| swapped = swap_email_domain(email) | |
| if swapped: | |
| user = api.find_user_by_email(swapped) | |
| if user: | |
| user_id = int(user["id"]) | |
| if user_id not in seen_user_ids: | |
| participant_ids.append(user_id) | |
| seen_user_ids.add(user_id) | |
| recurrence_rule = None | |
| if event.get("RRULE"): | |
| start_dt = parse_ics_datetime(dtstart, tz) | |
| if start_dt: | |
| recurrence_rule = parse_rrule(event.get("RRULE", ""), start_dt.replace(tzinfo=None)) | |
| is_notice_category = categories.strip().lower() == "notice" | |
| if title and summary_clean and title.strip() == summary_clean.strip(): | |
| if is_notice_category: | |
| title = "" | |
| else: | |
| summary_clean = "" | |
| if is_open_category: | |
| title = "" | |
| return { | |
| "resource_id": resource["resourceId"], | |
| "start_datetime": start_iso, | |
| "end_datetime": end_iso, | |
| "title": title, | |
| "description": summary_clean or None, | |
| "participants": participant_ids if participant_ids else None, | |
| "custom_attributes": custom_attributes, | |
| "recurrence_rule": recurrence_rule, | |
| } | |
| def create_scandium_from_calcium( | |
| api: BookedAPI, | |
| resource: Dict[str, object], | |
| tz: ZoneInfo, | |
| calcium_event: Dict[str, str], | |
| ) -> Optional[EventView]: | |
| categories = calcium_event.get("CATEGORIES", "") or "" | |
| clinical_created = False | |
| if "Clinical" in categories: | |
| description_raw = calcium_event.get("SUMMARY", "") | |
| pat, body, spi, description = parse_clinical_prefix(description_raw) | |
| if spi and contains_spi_blocker(description_raw): | |
| spi = None | |
| if not spi: | |
| spi = extract_scanned_initials(description_raw) | |
| pi_value = api.find_attribute_value(1, "Clinical") or "Clinical" | |
| custom_attrs = [{"attributeId": 1, "attributeValue": pi_value}] | |
| if pat: | |
| custom_attrs.append({"attributeId": 11, "attributeValue": pat}) | |
| if body: | |
| custom_attrs.append({"attributeId": 10, "attributeValue": body}) | |
| if spi: | |
| custom_attrs.append({"attributeId": 2, "attributeValue": spi}) | |
| if has_contrast_indicator(description): | |
| custom_attrs.append({"attributeId": 3, "attributeValue": "Yes"}) | |
| recurrence_rule = None | |
| if calcium_event.get("RRULE"): | |
| start_dt = parse_ics_datetime(calcium_event.get("DTSTART", ""), tz) | |
| if start_dt: | |
| recurrence_rule = parse_rrule( | |
| calcium_event.get("RRULE", ""), start_dt.replace(tzinfo=None) | |
| ) | |
| result = api.create_reservation( | |
| resource_id=resource["resourceId"], | |
| start_datetime=ics_datetime_to_iso(calcium_event.get("DTSTART", "")), | |
| end_datetime=ics_datetime_to_iso(calcium_event.get("DTEND", "")), | |
| title="", | |
| description=description, | |
| custom_attributes=custom_attrs, | |
| recurrence_rule=recurrence_rule, | |
| ) | |
| clinical_created = True | |
| elif "Service" in categories: | |
| service_value = api.find_attribute_value(PI_ATTRIBUTE_ID, "Service") or "Service" | |
| start_iso = ics_datetime_to_iso(calcium_event.get("DTSTART", "")) | |
| end_iso = ics_datetime_to_iso(calcium_event.get("DTEND", "")) | |
| summary = calcium_event.get("SUMMARY", "") | |
| recurrence_rule = None | |
| if calcium_event.get("RRULE"): | |
| start_dt = parse_ics_datetime(calcium_event.get("DTSTART", ""), tz) | |
| if start_dt: | |
| recurrence_rule = parse_rrule( | |
| calcium_event.get("RRULE", ""), start_dt.replace(tzinfo=None) | |
| ) | |
| result = api.create_reservation( | |
| resource_id=resource["resourceId"], | |
| start_datetime=start_iso, | |
| end_datetime=end_iso, | |
| title="", | |
| description=normalize_summary(summary), | |
| custom_attributes=[{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": service_value}], | |
| recurrence_rule=recurrence_rule, | |
| ) | |
| else: | |
| payload = build_event_payload_simple(api, calcium_event, resource, tz) | |
| result = api.create_reservation( | |
| resource_id=payload["resource_id"], | |
| start_datetime=payload["start_datetime"], | |
| end_datetime=payload["end_datetime"], | |
| title=payload["title"], | |
| description=payload.get("description"), | |
| participants=payload.get("participants"), | |
| custom_attributes=payload.get("custom_attributes"), | |
| recurrence_rule=payload.get("recurrence_rule"), | |
| ) | |
| reference = result.get("referenceNumber") | |
| if not reference: | |
| print(f"Create failed: {result}") | |
| return None | |
| new_detail = api.get_reservation(reference) | |
| view = build_scandium_event_view_from_detail(new_detail, tz) | |
| return view | |
| def build_pairs( | |
| calcium_events: List[EventView], | |
| scandium_events: List[EventView], | |
| date_value: date, | |
| ) -> List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]]: | |
| def normalize_match_text(text: str) -> str: | |
| cleaned = " ".join(text.strip().split()) | |
| return cleaned.lower() | |
| def normalize_open_summary(text: str) -> str: | |
| cleaned = normalize_match_text(text) | |
| return re.sub(r"^open\b\s*[-:]*\s*", "", cleaned, flags=re.IGNORECASE) | |
| def normalize_typ_for_pairing(raw: str) -> str: | |
| cleaned = raw.strip().lower() | |
| if not cleaned: | |
| return "" | |
| if "service" in cleaned and "pm" in cleaned: | |
| return "service" | |
| if "/" in cleaned: | |
| cleaned = cleaned.split("/")[-1].strip() | |
| if cleaned: | |
| cleaned = cleaned.split()[0] | |
| return cleaned | |
| def match_fields(event: EventView) -> Tuple[str, str]: | |
| typ = (event.compare_fields.get("TYP") or "").strip() | |
| summary = (event.compare_fields.get("SUM") or "").strip() | |
| if not typ and summary.lower().startswith("open"): | |
| typ = "Open" | |
| typ_key = normalize_typ_for_pairing(typ) | |
| if typ_key == "open": | |
| summary = normalize_open_summary(summary) | |
| else: | |
| summary = normalize_match_text(summary) | |
| return (typ_key, summary) | |
| calcium_grouped = group_by_time(calcium_events) | |
| scandium_grouped = group_by_time(scandium_events) | |
| all_keys = sorted( | |
| set(calcium_grouped) | set(scandium_grouped), | |
| key=lambda key: (key[0], key[1]), | |
| ) | |
| pairs: List[ | |
| Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date] | |
| ] = [] | |
| for key in all_keys: | |
| calcium_list = calcium_grouped.get(key, []) | |
| scandium_list = scandium_grouped.get(key, []) | |
| if not calcium_list or not scandium_list: | |
| max_count = max(len(calcium_list), len(scandium_list)) | |
| for idx in range(max_count): | |
| cal = calcium_list[idx] if idx < len(calcium_list) else None | |
| sca = scandium_list[idx] if idx < len(scandium_list) else None | |
| pairs.append((cal, sca, key, date_value)) | |
| continue | |
| cal_remaining = list(range(len(calcium_list))) | |
| sca_remaining = list(range(len(scandium_list))) | |
| matched: List[Tuple[int, int]] = [] | |
| sca_by_exact: Dict[Tuple[str, str], List[int]] = {} | |
| for idx in sca_remaining: | |
| sca_key = match_fields(scandium_list[idx]) | |
| sca_by_exact.setdefault(sca_key, []).append(idx) | |
| for cal_idx in cal_remaining[:]: | |
| cal_key = match_fields(calcium_list[cal_idx]) | |
| candidates = sca_by_exact.get(cal_key, []) | |
| if candidates: | |
| sca_idx = candidates.pop(0) | |
| matched.append((cal_idx, sca_idx)) | |
| cal_remaining.remove(cal_idx) | |
| sca_remaining.remove(sca_idx) | |
| sca_by_typ: Dict[str, List[int]] = {} | |
| for idx in sca_remaining: | |
| typ, _ = match_fields(scandium_list[idx]) | |
| sca_by_typ.setdefault(typ, []).append(idx) | |
| for cal_idx in cal_remaining[:]: | |
| typ, _ = match_fields(calcium_list[cal_idx]) | |
| candidates = sca_by_typ.get(typ, []) | |
| if candidates: | |
| sca_idx = candidates.pop(0) | |
| matched.append((cal_idx, sca_idx)) | |
| cal_remaining.remove(cal_idx) | |
| sca_remaining.remove(sca_idx) | |
| for cal_idx, sca_idx in matched: | |
| pairs.append((calcium_list[cal_idx], scandium_list[sca_idx], key, date_value)) | |
| for cal_idx in cal_remaining: | |
| pairs.append((calcium_list[cal_idx], None, key, date_value)) | |
| for sca_idx in sca_remaining: | |
| pairs.append((None, scandium_list[sca_idx], key, date_value)) | |
| return pairs | |
| def precompute_ai_results( | |
| pairs: List[ | |
| Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date] | |
| ], | |
| hashes: List[str], | |
| accepted_hashes: set[str], | |
| use_ai: bool, | |
| model: str, | |
| api_key: Optional[str], | |
| ) -> Dict[int, Dict[str, object]]: | |
| cache: Dict[str, Dict[str, object]] = {} | |
| results: Dict[int, Dict[str, object]] = {} | |
| to_schedule: List[Tuple[int, EventView, EventView]] = [] | |
| for idx, (cal, sca, _key, _date_value) in enumerate(pairs): | |
| accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes) | |
| if not cal or not sca: | |
| if accepted_hash: | |
| results[idx] = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| continue | |
| if accepted_hash: | |
| results[idx] = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| continue | |
| prelim = compare_events(cal, sca, False, model, api_key, cache) | |
| if not use_ai or not api_key or prelim.get("source") == "guard" or prelim.get("match") is True: | |
| results[idx] = prelim | |
| continue | |
| to_schedule.append((idx, cal, sca)) | |
| if not use_ai or not api_key or not to_schedule: | |
| return results | |
| max_workers = min(20, len(to_schedule)) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_map = { | |
| executor.submit(compare_events, cal, sca, True, model, api_key, cache): idx | |
| for (idx, cal, sca) in to_schedule | |
| } | |
| for future in tqdm(as_completed(future_map), total=len(future_map), desc="AI compare"): | |
| idx = future_map[future] | |
| results[idx] = future.result() | |
| return results | |
| def compute_compare_results_for_pairs( | |
| pairs: List[ | |
| Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date] | |
| ], | |
| hashes: List[str], | |
| accepted_hashes: set[str], | |
| use_ai: bool, | |
| model: str, | |
| api_key: Optional[str], | |
| cache: Optional[Dict[str, Dict[str, object]]] = None, | |
| ) -> List[Optional[Dict[str, object]]]: | |
| if cache is None: | |
| cache = {} | |
| results_list: List[Optional[Dict[str, object]]] = [None] * len(pairs) | |
| to_schedule: List[Tuple[int, EventView, EventView]] = [] | |
| for idx, (cal, sca, _key, _date_value) in enumerate(pairs): | |
| accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes) | |
| if not cal or not sca: | |
| if accepted_hash: | |
| results_list[idx] = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| continue | |
| if accepted_hash: | |
| results_list[idx] = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| continue | |
| prelim = compare_events(cal, sca, False, model, api_key, cache) | |
| if not use_ai or not api_key or prelim.get("source") == "guard" or prelim.get("match") is True: | |
| results_list[idx] = prelim | |
| continue | |
| to_schedule.append((idx, cal, sca)) | |
| if not use_ai or not api_key or not to_schedule: | |
| return results_list | |
| max_workers = min(20, len(to_schedule)) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_map = { | |
| executor.submit(compare_events, cal, sca, True, model, api_key, cache): idx | |
| for (idx, cal, sca) in to_schedule | |
| } | |
| for future in tqdm(as_completed(future_map), total=len(future_map), desc="AI compare"): | |
| idx = future_map[future] | |
| results_list[idx] = future.result() | |
| return results_list | |
| def side_by_side_lines( | |
| left: str, | |
| right: str, | |
| width: int, | |
| gutter: int, | |
| ) -> List[str]: | |
| left_lines = left.splitlines() or [""] | |
| right_lines = right.splitlines() or [""] | |
| rows = max(len(left_lines), len(right_lines)) | |
| output: List[str] = [] | |
| def pad_to_width(text: str) -> str: | |
| visible_len = len(ANSI_RE.sub("", text)) | |
| if visible_len >= width: | |
| return text | |
| return text + (" " * (width - visible_len)) | |
| for idx in range(rows): | |
| left_text = left_lines[idx] if idx < len(left_lines) else "" | |
| right_text = right_lines[idx] if idx < len(right_lines) else "" | |
| output.append( | |
| f"{pad_to_width(left_text)}{' ' * gutter}{pad_to_width(right_text)}" | |
| ) | |
| return output | |
| def display_pairs( | |
| dates_sorted: List[date], | |
| pairs_by_date: Dict[ | |
| date, | |
| List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]], | |
| ], | |
| hashes_by_date: Dict[date, List[str]], | |
| compare_results_by_date: Dict[date, List[Optional[Dict[str, object]]]], | |
| calcium_by_date: Dict[date, List[EventView]], | |
| scandium_by_date: Dict[date, List[EventView]], | |
| accepted_hashes: set[str], | |
| memory_path: Path, | |
| tz: ZoneInfo, | |
| use_ai: bool, | |
| model: str, | |
| api_key: Optional[str], | |
| api: BookedAPI, | |
| resource: Dict[str, object], | |
| resource_name: str, | |
| typ_filter: Optional[str], | |
| start_time: Optional[time], | |
| end_time: Optional[time], | |
| ) -> None: | |
| terminal_width = shutil.get_terminal_size((120, 20)).columns | |
| gutter = 4 | |
| column_width = max(30, (terminal_width - gutter) // 2) | |
| if not pairs_by_date: | |
| print("No events found on either side.") | |
| return | |
| def recurrence_dates_for_event( | |
| calcium_event: Dict[str, str], | |
| dates: Iterable[date], | |
| ) -> List[date]: | |
| affected: List[date] = [] | |
| for candidate in dates: | |
| occurrence = calcium_occurrence_for_date(calcium_event, candidate, tz) | |
| if not occurrence: | |
| continue | |
| if start_time and end_time: | |
| occ_start, occ_end = occurrence | |
| if occ_start.timetz().replace(tzinfo=None) != start_time: | |
| continue | |
| if occ_end.timetz().replace(tzinfo=None) != end_time: | |
| continue | |
| affected.append(candidate) | |
| return affected | |
| def refresh_scandium_for_dates(dates_to_refresh: Iterable[date]) -> None: | |
| for date_value in dates_to_refresh: | |
| scandium_events = build_scandium_events_for_date( | |
| api=api, | |
| resource_name=resource_name, | |
| tz=tz, | |
| target_date=date_value, | |
| start_time=start_time, | |
| end_time=end_time, | |
| resource=resource, | |
| ) | |
| if typ_filter: | |
| scandium_events = [ | |
| event | |
| for event in scandium_events | |
| if (event.compare_fields.get("TYP") or "").lower() == typ_filter.lower() | |
| ] | |
| scandium_by_date[date_value] = scandium_events | |
| pairs_by_date[date_value] = build_pairs( | |
| calcium_by_date.get(date_value, []), | |
| scandium_events, | |
| date_value, | |
| ) | |
| hashes = [ | |
| canonical_hash(cal, sca) | |
| for cal, sca, _key, _date in pairs_by_date[date_value] | |
| ] | |
| hashes_by_date[date_value] = hashes | |
| compare_results_by_date[date_value] = compute_compare_results_for_pairs( | |
| pairs_by_date[date_value], | |
| hashes, | |
| accepted_hashes, | |
| use_ai=use_ai, | |
| model=model, | |
| api_key=api_key, | |
| ) | |
| for date_value in dates_sorted: | |
| pairs = pairs_by_date.get(date_value, []) | |
| hashes = hashes_by_date.get(date_value, []) | |
| compare_results = compare_results_by_date.get(date_value, []) | |
| if not pairs: | |
| continue | |
| for idx, (cal, sca, key, _date_value) in enumerate(pairs): | |
| start_dt, end_dt = key | |
| compare_result = compare_results[idx] if idx < len(compare_results) else None | |
| while True: | |
| current_hash = canonical_hash(cal, sca) | |
| if idx < len(hashes): | |
| hashes[idx] = current_hash | |
| accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes) | |
| print("\n\n\n\n") | |
| start_label = start_dt.astimezone(tz).strftime("%Y-%m-%d %H:%M") | |
| end_label = end_dt.astimezone(tz).strftime("%H:%M") | |
| header = f"{start_label} -> {end_label}" | |
| print("=" * terminal_width) | |
| print(header.center(terminal_width)) | |
| print("-" * terminal_width) | |
| sca_label = "Scandium" | |
| if sca and sca.reference: | |
| sca_label = f"Scandium ({sca.reference})" | |
| sca_link = f"https://scandium.mclean.harvard.edu/reservation/?rn={sca.reference}" | |
| print(f"{''.ljust(column_width)}{' ' * gutter}{sca_link.ljust(column_width)}") | |
| print(f"{'Calcium'.ljust(column_width)}{' ' * gutter}{sca_label.ljust(column_width)}") | |
| print("-" * terminal_width) | |
| left_text = format_event_label(cal, column_width) if cal else "missing" | |
| right_text = format_event_label(sca, column_width) if sca else "missing" | |
| for line in side_by_side_lines( | |
| left_text, right_text, width=column_width, gutter=gutter | |
| ): | |
| print(line) | |
| if accepted_hash: | |
| compare_result = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| if compare_result: | |
| match_label = "MATCH" if compare_result.get("match") else "DIFF" | |
| field_matches = compare_result.get("field_matches", {}) | |
| mismatched = [ | |
| key for key, status in field_matches.items() if status == "mismatch" | |
| ] | |
| diff_line = "DIFF: none" if not mismatched else f"DIFF: {', '.join(mismatched)}" | |
| source = compare_result.get("source", "exact") | |
| notes = compare_result.get("notes", "") | |
| note_suffix = "" | |
| if source == "ai-error" and notes: | |
| note_suffix = f" | {notes[:60]}" | |
| is_match = compare_result.get("match") is True | |
| is_clean = not mismatched | |
| if source == "accepted": | |
| color = BLUE | |
| else: | |
| color = GREEN if is_match and is_clean else ORANGE | |
| compare_text = f"CHK: {match_label} | {diff_line} | {source}{note_suffix}" | |
| left_only = f"{color}{compare_text}{RESET}" | |
| for line in side_by_side_lines( | |
| left_only, | |
| "", | |
| width=column_width, | |
| gutter=gutter, | |
| ): | |
| print(line) | |
| if source == "accepted": | |
| short_hash = accepted_hash[:8] | |
| accepted_text = f"{BLUE}[ ACCEPTED - HASH {short_hash} ]{RESET}" | |
| for line in side_by_side_lines( | |
| accepted_text, | |
| "", | |
| width=column_width, | |
| gutter=gutter, | |
| ): | |
| print(line) | |
| break | |
| if compare_result and compare_result.get("match"): | |
| action = prompt_yes_no_accept( | |
| "Edit?", | |
| default=False, | |
| include_reload=True, | |
| include_clear=True, | |
| ) | |
| if action == "clear": | |
| sca = update_scandium_event( | |
| api, | |
| sca, | |
| tz, | |
| fields={"SUM"}, | |
| calcium=cal, | |
| prefill={"SUM": ""}, | |
| prompt=False, | |
| update_scope_override="this", | |
| ) | |
| if sca and cal: | |
| compare_result = compare_events(cal, sca, False, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| if action == "reload": | |
| if sca and sca.reference: | |
| new_detail = api.get_reservation(sca.reference) | |
| sca = build_scandium_event_view_from_detail(new_detail, tz) | |
| if sca and cal: | |
| compare_result = compare_events(cal, sca, use_ai, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| if action == "accept": | |
| current_hash = canonical_hash(cal, sca) | |
| if idx < len(hashes): | |
| hashes[idx] = current_hash | |
| if current_hash not in accepted_hashes: | |
| store_accepted_hash(memory_path, current_hash) | |
| accepted_hashes.add(current_hash) | |
| compare_result = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| break | |
| if action == "yes": | |
| sca = update_scandium_event(api, sca, tz, calcium=cal) | |
| if sca and cal: | |
| compare_result = compare_events(cal, sca, False, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| break | |
| if cal is None and sca is not None: | |
| api.delete_reservation(sca.reference, update_scope="this") | |
| sca = None | |
| compare_result = None | |
| if idx < len(compare_results): | |
| compare_results[idx] = None | |
| break | |
| if sca is None and cal is not None: | |
| action = prompt_yes_no_accept("Scandium missing. Create from Calcium?", default=True) | |
| if action == "accept": | |
| current_hash = canonical_hash(cal, sca) | |
| if idx < len(hashes): | |
| hashes[idx] = current_hash | |
| if current_hash not in accepted_hashes: | |
| store_accepted_hash(memory_path, current_hash) | |
| accepted_hashes.add(current_hash) | |
| compare_result = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| break | |
| if action == "yes": | |
| sca = create_scandium_from_calcium(api, resource, tz, cal.calcium_event) | |
| if sca and cal: | |
| compare_result = compare_events(cal, sca, use_ai, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| if cal.calcium_event and cal.calcium_event.get("RRULE"): | |
| future_dates = [d for d in dates_sorted if d > date_value] | |
| affected_dates = recurrence_dates_for_event(cal.calcium_event, future_dates) | |
| if affected_dates: | |
| refresh_scandium_for_dates(affected_dates) | |
| continue | |
| break | |
| if cal is not None and sca is not None: | |
| field_matches = compare_result.get("field_matches", {}) if compare_result else {} | |
| rec_mismatch = field_matches.get("REC") == "mismatch" | |
| action = prompt_yes_no_accept( | |
| "Edit Scandium fields?", | |
| default=False, | |
| include_update_rec=rec_mismatch, | |
| ) | |
| if action == "accept": | |
| current_hash = canonical_hash(cal, sca) | |
| if idx < len(hashes): | |
| hashes[idx] = current_hash | |
| if current_hash not in accepted_hashes: | |
| store_accepted_hash(memory_path, current_hash) | |
| accepted_hashes.add(current_hash) | |
| compare_result = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| break | |
| if action == "update_rec": | |
| cal_rrule = (cal.calcium_event or {}).get("RRULE", "") if cal else "" | |
| if cal_rrule: | |
| recurrence_override = parse_rrule( | |
| cal_rrule, | |
| cal.start.replace(tzinfo=None), | |
| ) | |
| else: | |
| recurrence_override = {"type": "none"} | |
| is_recurring = bool(sca.scandium_detail.get("isRecurring") if sca.scandium_detail else False) | |
| if is_recurring: | |
| scope = prompt_edit_scope(is_recurring, default_all=True) | |
| else: | |
| scope = "full" | |
| sca = update_scandium_event( | |
| api, | |
| sca, | |
| tz, | |
| calcium=cal, | |
| recurrence_rule_override=recurrence_override, | |
| prompt=False, | |
| update_scope_override=scope, | |
| ) | |
| if sca: | |
| compare_result = compare_events(cal, sca, False, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| if action == "yes": | |
| recurrence_override = None | |
| update_scope_override = None | |
| if cal and compare_result: | |
| if rec_mismatch: | |
| if prompt_yes_no("Update REC to match Calcium?", default=True): | |
| cal_rrule = (cal.calcium_event or {}).get("RRULE", "") | |
| if cal_rrule: | |
| recurrence_override = parse_rrule( | |
| cal_rrule, | |
| cal.start.replace(tzinfo=None), | |
| ) | |
| else: | |
| recurrence_override = {"type": "none"} | |
| is_recurring = bool(sca.scandium_detail.get("isRecurring") if sca.scandium_detail else False) | |
| if is_recurring: | |
| update_scope_override = prompt_edit_scope( | |
| is_recurring, | |
| default_all=True, | |
| ) | |
| else: | |
| update_scope_override = "full" | |
| sca = update_scandium_event( | |
| api, | |
| sca, | |
| tz, | |
| calcium=cal, | |
| recurrence_rule_override=recurrence_override, | |
| update_scope_override=update_scope_override, | |
| ) | |
| if sca: | |
| compare_result = compare_events(cal, sca, False, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| break | |
| if compare_result and compare_result.get("match"): | |
| pass | |
| def resolve_preferences_path(events_path: Path, override: Optional[Path]) -> Optional[Path]: | |
| if override: | |
| return override | |
| candidate = events_path.with_suffix(".Preferences") | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Compare Calcium and Scandium events for a single date.", | |
| ) | |
| parser.add_argument( | |
| "dates", | |
| nargs="*", | |
| type=parse_date, | |
| help="Date(s) to compare (YYYY-MM-DD).", | |
| ) | |
| parser.add_argument( | |
| "--start-date", | |
| type=parse_date, | |
| default=None, | |
| help="Start date (YYYY-MM-DD) for inclusive range.", | |
| ) | |
| parser.add_argument( | |
| "--end-date", | |
| type=parse_date, | |
| default=None, | |
| help="End date (YYYY-MM-DD) for inclusive range.", | |
| ) | |
| parser.add_argument( | |
| "--calcium-events", | |
| default=None, | |
| help="Path to Calcium .Events file (defaults based on --resource).", | |
| ) | |
| parser.add_argument( | |
| "--preferences", | |
| type=Path, | |
| default=None, | |
| help="Optional path to Calcium .Preferences file.", | |
| ) | |
| parser.add_argument( | |
| "--resource", | |
| default=DEFAULT_RESOURCE, | |
| help="Scandium resource name.", | |
| ) | |
| parser.add_argument( | |
| "--timezone", | |
| default=DEFAULT_TZ, | |
| help="Timezone for display and matching.", | |
| ) | |
| parser.add_argument( | |
| "--typ", | |
| default=None, | |
| help="Filter by TYP (Calcium category/Scandium PI value).", | |
| ) | |
| parser.add_argument( | |
| "--start-time", | |
| type=parse_time_value, | |
| default=None, | |
| help="Filter by exact start time (HH:MM). Requires --end-time.", | |
| ) | |
| parser.add_argument( | |
| "--end-time", | |
| type=parse_time_value, | |
| default=None, | |
| help="Filter by exact end time (HH:MM). Requires --start-time.", | |
| ) | |
| parser.add_argument( | |
| "--no-ai", | |
| action="store_true", | |
| help="Disable AI matching fallback.", | |
| ) | |
| parser.add_argument( | |
| "--ai-model", | |
| default="gpt-5-nano", | |
| help="OpenAI model for AI matching.", | |
| ) | |
| args = parser.parse_args() | |
| tz = ZoneInfo(args.timezone) | |
| events_path_value = args.calcium_events | |
| if not events_path_value: | |
| events_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource) | |
| if not events_path_value: | |
| raise SystemExit( | |
| f"No default Calcium .Events for resource '{args.resource}'. " | |
| "Use --calcium-events to specify a file." | |
| ) | |
| events_path = Path(events_path_value) | |
| preferences_path = resolve_preferences_path(events_path, args.preferences) | |
| rsync_cmd = [ | |
| "rsync", | |
| "-rltv", | |
| "ddrucker@calendar-actual:/var/www/cgi-bin/CalciumDir40/data/", | |
| "data/", | |
| ] | |
| try: | |
| subprocess.run(rsync_cmd, check=True) | |
| except subprocess.CalledProcessError as exc: | |
| raise SystemExit(f"rsync failed: {exc}") from exc | |
| if (args.start_time and not args.end_time) or (args.end_time and not args.start_time): | |
| raise SystemExit("Both --start-time and --end-time are required together.") | |
| if (args.start_date and not args.end_date) or (args.end_date and not args.start_date): | |
| raise SystemExit("Both --start-date and --end-date are required together.") | |
| if args.start_date and args.end_date and args.dates: | |
| raise SystemExit("Use either explicit dates or --start-date/--end-date, not both.") | |
| if not args.dates and not (args.start_date and args.end_date): | |
| raise SystemExit("Provide at least one date or a --start-date/--end-date range.") | |
| if args.start_date and args.end_date: | |
| if args.end_date < args.start_date: | |
| raise SystemExit("--end-date must be on or after --start-date.") | |
| dates: List[date] = [] | |
| cursor = args.start_date | |
| while cursor <= args.end_date: | |
| dates.append(cursor) | |
| cursor += timedelta(days=1) | |
| else: | |
| dates = list(args.dates) | |
| api = BookedAPI() | |
| resource = api.find_resource_by_name(args.resource) | |
| if not resource: | |
| raise SystemExit(f"Resource not found: {args.resource}") | |
| dates_sorted = sorted(dates) | |
| calcium_by_date: Dict[date, List[EventView]] = {} | |
| scandium_by_date: Dict[date, List[EventView]] = {} | |
| pairs_by_date: Dict[ | |
| date, | |
| List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]], | |
| ] = {} | |
| calcium_by_date = build_calcium_events_by_date( | |
| events_path=events_path, | |
| preferences_path=preferences_path, | |
| tz=tz, | |
| dates=dates_sorted, | |
| start_time=args.start_time, | |
| end_time=args.end_time, | |
| ) | |
| if args.typ: | |
| for date_value in dates_sorted: | |
| calcium_by_date[date_value] = [ | |
| event | |
| for event in calcium_by_date.get(date_value, []) | |
| if (event.compare_fields.get("TYP") or "").lower() == args.typ.lower() | |
| ] | |
| for date_value in tqdm(dates_sorted, desc="Reading Scandium"): | |
| scandium_events = build_scandium_events_for_date( | |
| api=api, | |
| resource_name=args.resource, | |
| tz=tz, | |
| target_date=date_value, | |
| start_time=args.start_time, | |
| end_time=args.end_time, | |
| resource=resource, | |
| ) | |
| if args.typ: | |
| scandium_events = [ | |
| event | |
| for event in scandium_events | |
| if (event.compare_fields.get("TYP") or "").lower() == args.typ.lower() | |
| ] | |
| scandium_by_date[date_value] = scandium_events | |
| for date_value in dates_sorted: | |
| pairs_by_date[date_value] = build_pairs( | |
| calcium_by_date.get(date_value, []), | |
| scandium_by_date.get(date_value, []), | |
| date_value, | |
| ) | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| memory_path = Path("sync-memory.dat") | |
| accepted_hashes = load_accepted_hashes(memory_path) | |
| hashes_by_date: Dict[date, List[str]] = {} | |
| compare_results_by_date: Dict[date, List[Optional[Dict[str, object]]]] = {} | |
| cache: Dict[str, Dict[str, object]] = {} | |
| for date_value in dates_sorted: | |
| pairs = pairs_by_date.get(date_value, []) | |
| hashes = [canonical_hash(cal, sca) for cal, sca, _key, _date in pairs] | |
| hashes_by_date[date_value] = hashes | |
| compare_results_by_date[date_value] = compute_compare_results_for_pairs( | |
| pairs, | |
| hashes=hashes, | |
| accepted_hashes=accepted_hashes, | |
| use_ai=not args.no_ai, | |
| model=args.ai_model, | |
| api_key=api_key, | |
| cache=cache, | |
| ) | |
| display_pairs( | |
| dates_sorted, | |
| pairs_by_date, | |
| hashes_by_date, | |
| compare_results_by_date, | |
| calcium_by_date, | |
| scandium_by_date, | |
| accepted_hashes, | |
| memory_path, | |
| tz, | |
| use_ai=not args.no_ai, | |
| model=args.ai_model, | |
| api_key=api_key, | |
| api=api, | |
| resource=resource, | |
| resource_name=args.resource, | |
| typ_filter=args.typ, | |
| start_time=args.start_time, | |
| end_time=args.end_time, | |
| ) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment