Skip to content

Instantly share code, notes, and snippets.

@dmd
Created January 14, 2026 13:29
Show Gist options
  • Select an option

  • Save dmd/3e9e2817896ef51eb2248a67a6a4e60c to your computer and use it in GitHub Desktop.

Select an option

Save dmd/3e9e2817896ef51eb2248a67a6a4e60c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import html
import json
import os
import re
import sys
import textwrap
import readline
from bisect import bisect_left, bisect_right
try:
from prompt_toolkit import prompt as pt_prompt
HAS_PROMPT_TOOLKIT = True
except Exception:
HAS_PROMPT_TOOLKIT = False
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
from zoneinfo import ZoneInfo
import shutil
from threading import Lock
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402
from calcium_events import build_vevent, build_vevents, parse_calcium_events
from tqdm import tqdm
from create_scandium_clinical import extract_scanned_initials, has_contrast_indicator
DEFAULT_RESOURCE = "P1 Prisma"
DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = {
"P1 Prisma": "data/3T_Prisma_P1.Events",
"P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events",
"4T Varian": "data/4T_Varian_Inova.Events",
}
DEFAULT_TZ = "America/New_York"
PI_ATTRIBUTE_ID = 1
SCANNED_INITIALS_ATTRIBUTE_ID = 2
BOLD = "\033[1m"
RESET = "\033[0m"
ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
GREEN = "\033[92m"
ORANGE = "\033[93m"
BLUE = "\033[94m"
CACHE_LOCK = Lock()
EMAIL_RE = re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.IGNORECASE)
INITIALS_RE = re.compile(r"(?:Confirmed[- ]|[^A-Z])([A-Z]{2,3})(?:[^A-Z]|$)")
EMAIL_DOMAIN_SWAPS = {
"mgb.org": "mclean.harvard.edu",
"mclean.harvard.edu": "mgb.org",
}
@dataclass(frozen=True)
class EventView:
start: datetime
end: datetime
title: str
description: Optional[str]
details: List[Tuple[str, str]]
recurrence: str
reference: Optional[str]
compare_fields: Dict[str, str]
ai_payload: Dict[str, object]
calcium_event: Optional[Dict[str, str]]
scandium_detail: Optional[Dict[str, object]]
@dataclass(frozen=True)
class ParsedCalciumVEvent:
event: Dict[str, str]
start_dt: datetime
end_dt: datetime
duration: timedelta
rrule: Optional[str]
rule: Optional[Dict]
until_date: Optional[date]
exdates: set[date]
def parse_date(value: str) -> date:
return datetime.strptime(value, "%Y-%m-%d").date()
def parse_time_value(value: str) -> time:
return datetime.strptime(value, "%H:%M").time()
def normalize_summary(summary: str) -> str:
normalized = summary.replace("\\n", "\n")
normalized = normalized.replace("\\,", ",").replace("\\;", ";").replace("\\\\", "\\")
normalized = html.unescape(normalized)
return normalized.strip()
def extract_emails(text: str) -> List[str]:
return EMAIL_RE.findall(text or "")
def contains_spi_blocker(text: str) -> bool:
if not text:
return False
lowered = text.lower()
if "acr qa" in lowered:
return True
if "hd cleanup" in lowered or "hd clean up" in lowered:
return True
return False
def swap_email_domain(email: str) -> Optional[str]:
if "@" not in email:
return None
local, domain = email.rsplit("@", 1)
swapped = EMAIL_DOMAIN_SWAPS.get(domain.lower())
if not swapped:
return None
return f"{local}@{swapped}"
def extract_initials(text: str) -> List[str]:
if not text:
return []
if contains_spi_blocker(text):
return []
cleaned = re.sub(r"fbirn\s*&\s*hd\s*cleanup", "", text, flags=re.IGNORECASE)
return [match.group(1) for match in INITIALS_RE.finditer(cleaned)]
def extract_patient_source(text: str) -> Optional[str]:
if not text:
return None
cleaned = text.strip()
for prefix in ("OP-", "IP-"):
if cleaned.startswith(prefix):
return prefix[:-1]
for prefix in ("OP ", "IP "):
if cleaned.startswith(prefix):
return prefix.strip()
if "-" in cleaned:
lead = cleaned.split("-", 1)[0].strip()
if lead and lead.isalnum():
return lead
return None
def parse_clinical_prefix(summary: str) -> tuple[Optional[str], Optional[str], Optional[str], str]:
lines = summary.splitlines() if summary else []
if not lines:
return None, None, None, ""
first = lines[0]
match = re.match(
r"^\s*(?P<pat>[A-Za-z0-9]+)\s*-\s*(?P<body>.+?)\s*-\s*(?P<spi>[A-Za-z]{2,3})(?P<rest>.*)$",
first,
)
if not match:
return None, None, None, summary
pat = match.group("pat").strip()
body = match.group("body").strip()
spi = match.group("spi").strip()
rest = match.group("rest").strip().lstrip("-").strip()
new_lines: list[str] = []
if rest:
new_lines.append(rest)
new_lines.extend(lines[1:])
return pat, body, spi, "\n".join(new_lines).strip()
def extract_body_part(text: str) -> Optional[str]:
if not text:
return None
cleaned = text.strip()
for prefix in ("OP-", "IP-"):
if cleaned.startswith(prefix):
parts = cleaned.split("-", 2)
if len(parts) >= 2:
return parts[1].strip()
if "-" in cleaned:
parts = cleaned.split("-", 2)
if len(parts) >= 2:
return parts[1].strip()
return None
def normalize_compare_value(value: str) -> str:
cleaned = (value or "").lower()
cleaned = re.sub(r"\band\b", "", cleaned)
return re.sub(r"[^a-z0-9]+", "", cleaned)
def canonical_lines(
details: Optional[List[Tuple[str, str]]],
swap_body_label: bool = False,
) -> List[str]:
if not details:
return ["missing"]
lines: List[str] = []
for label, value in details:
clean_label = label
if swap_body_label:
if clean_label == "BOD":
clean_label = "Body part"
elif clean_label == "Body part":
clean_label = "BOD"
parts = (value or "").splitlines() or [""]
for idx, part in enumerate(parts):
if idx == 0:
lines.append(f"{clean_label}: {part}".strip())
else:
lines.append(part.strip())
return lines
def canonical_hash(
cal: Optional[EventView],
sca: Optional[EventView],
swap_body_label: bool = False,
) -> str:
left = "\n".join(canonical_lines(cal.details if cal else None, swap_body_label))
right = "\n".join(canonical_lines(sca.details if sca else None, swap_body_label))
payload = f"LEFT\n{left}\nRIGHT\n{right}"
import hashlib
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def resolve_accepted_hash(
cal: Optional[EventView],
sca: Optional[EventView],
accepted_hashes: set[str],
) -> Optional[str]:
primary = canonical_hash(cal, sca, swap_body_label=False)
if primary in accepted_hashes:
return primary
alternate = canonical_hash(cal, sca, swap_body_label=True)
if alternate in accepted_hashes:
return alternate
return None
def load_accepted_hashes(path: Path) -> set[str]:
if not path.exists():
return set()
try:
return {line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()}
except OSError:
return set()
def store_accepted_hash(path: Path, hash_value: str) -> None:
with path.open("a", encoding="utf-8") as handle:
handle.write(hash_value + "\n")
def parse_exdate_dates(value: str) -> set[date]:
if not value:
return set()
dates: set[date] = set()
for part in value.split(","):
chunk = part.strip()
if len(chunk) >= 8:
try:
dates.add(datetime.strptime(chunk[:8], "%Y%m%d").date())
except ValueError:
continue
return dates
def parse_ics_datetime(value: str, tz: ZoneInfo) -> Optional[datetime]:
if not value:
return None
if "T" not in value:
return None
if value.endswith("Z"):
iso = ics_datetime_to_iso(value)
dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z")
return dt.astimezone(tz)
dt = datetime.strptime(value, "%Y%m%dT%H%M%S")
return dt.replace(tzinfo=tz)
def parse_api_datetime(value: str, tz: ZoneInfo) -> Optional[datetime]:
if not value:
return None
cleaned = value
if cleaned.endswith("Z"):
cleaned = cleaned[:-1] + "+0000"
if len(cleaned) >= 6 and cleaned[-3] == ":":
cleaned = cleaned[:-3] + cleaned[-2:]
try:
dt = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z")
except ValueError:
return None
return dt.astimezone(tz)
def parse_termination_date(value: Optional[str], tz: ZoneInfo) -> Optional[date]:
if not value:
return None
cleaned = value
if cleaned.endswith("Z"):
cleaned = cleaned[:-1] + "+0000"
if cleaned[-3] == ":":
cleaned = cleaned[:-3] + cleaned[-2:]
return datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date()
def parse_booked_until_date(value: Optional[str]) -> Optional[date]:
if not value:
return None
cleaned = value
if cleaned.endswith("Z"):
cleaned = cleaned[:-1] + "+0000"
if cleaned[-3] == ":":
cleaned = cleaned[:-3] + cleaned[-2:]
try:
dt = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z")
except ValueError:
return None
return dt.date()
def parse_ics_until(value: str) -> Optional[date]:
if not value:
return None
cleaned = value.strip()
if len(cleaned) >= 8:
try:
return datetime.strptime(cleaned[:8], "%Y%m%d").date()
except ValueError:
return None
return None
def format_weekdays(weekdays: Iterable[int]) -> str:
names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]
output = []
for day in weekdays:
if 0 <= day < len(names):
output.append(names[day])
return ", ".join(output)
def format_ics_recurrence(rrule: Optional[str], start_dt: datetime) -> str:
if not rrule:
return "none"
parts = {}
for piece in rrule.split(";"):
if "=" in piece:
key, value = piece.split("=", 1)
parts[key] = value
freq = parts.get("FREQ", "").upper()
interval = parts.get("INTERVAL") or "1"
byday = parts.get("BYDAY", "")
until = parts.get("UNTIL", "")
freq_map = {
"DAILY": "daily",
"WEEKLY": "weekly",
"MONTHLY": "monthly",
"YEARLY": "yearly",
}
label = freq_map.get(freq, "none")
if label == "none":
return "none"
tokens = [f"{label} every {interval}"]
if label == "weekly":
day_map = {"SU": 0, "MO": 1, "TU": 2, "WE": 3, "TH": 4, "FR": 5, "SA": 6}
days = []
if byday:
for code in byday.split(","):
code = code.strip().upper()
if code in day_map:
days.append(day_map[code])
if not days:
days = [(start_dt.weekday() + 1) % 7]
readable = format_weekdays(days)
if readable:
tokens.append(f"on {readable}")
until_date = parse_ics_until(until)
if until_date:
tokens.append(f"until {until_date.isoformat()}")
return " ".join(tokens)
def recurrence_occurs_on(
target_date: date,
start_date: date,
rule: Dict,
) -> bool:
if target_date < start_date:
return False
interval = int(rule.get("interval", 1))
rtype = rule.get("type")
if rtype == "daily":
diff_days = (target_date - start_date).days
return diff_days % interval == 0
if rtype == "weekly":
weekdays = rule.get("weekdays") or []
booked_weekday = (target_date.weekday() + 1) % 7
if booked_weekday not in weekdays:
return False
diff_days = (target_date - start_date).days
return diff_days % (interval * 7) == 0
if rtype == "monthly":
if target_date.day != start_date.day:
return False
months = (target_date.year - start_date.year) * 12 + (
target_date.month - start_date.month
)
return months % interval == 0
if rtype == "yearly":
if (target_date.month, target_date.day) != (start_date.month, start_date.day):
return False
return (target_date.year - start_date.year) % interval == 0
return False
def calcium_occurrence_for_date(
event: Dict[str, str],
target_date: date,
tz: ZoneInfo,
) -> Optional[Tuple[datetime, datetime]]:
start_raw = event.get("DTSTART", "")
end_raw = event.get("DTEND", "")
start_dt = parse_ics_datetime(start_raw, tz)
end_dt = parse_ics_datetime(end_raw, tz)
if not start_dt or not end_dt:
return None
duration = end_dt - start_dt
rrule = event.get("RRULE")
exdates = parse_exdate_dates(event.get("EXDATE", ""))
if not rrule:
if start_dt.date() != target_date:
return None
return start_dt, end_dt
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None))
until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz)
if until_date and target_date > until_date:
return None
if target_date in exdates:
return None
if not recurrence_occurs_on(target_date, start_dt.date(), rule):
return None
new_start = datetime.combine(target_date, start_dt.timetz())
new_end = new_start + duration
return new_start, new_end
def build_event_view_from_calcium_occurrence(
event: Dict[str, str],
start_dt: datetime,
end_dt: datetime,
) -> EventView:
summary = normalize_summary(event.get("SUMMARY", "")) or "(no summary)"
description = event.get("DESCRIPTION")
categories = event.get("CATEGORIES")
rrule = event.get("RRULE")
recurrence_label = format_ics_recurrence(rrule, start_dt)
details: List[Tuple[str, str]] = []
if categories:
details.append(("TYP", categories))
details.append(("SUM", summary))
if recurrence_label != "none":
details.append(("REC", recurrence_label))
compare_fields = {
"TYP": categories or "",
"SUM": summary or "",
"REC": recurrence_label or "none",
}
patient_source = extract_patient_source(summary or "") or ""
body_part = extract_body_part(summary or "") or ""
spi_values = extract_initials(summary or "")
if categories and categories.strip().lower() == "open":
patient_source = ""
body_part = ""
spi_values = []
elif categories and "Clinical" in categories:
pat_parsed, body_parsed, spi_parsed, _cleaned = parse_clinical_prefix(summary or "")
if pat_parsed:
patient_source = pat_parsed
if body_parsed:
body_part = body_parsed
if spi_parsed:
spi_values = [spi_parsed]
ai_payload = {
"categories": categories or "",
"summary": summary or "",
"description": description or "",
"emails": extract_emails(summary or ""),
"initials": spi_values,
"patient_source": patient_source,
"body_part": body_part,
"recurrence": recurrence_label or "none",
}
return EventView(
start=start_dt,
end=end_dt,
title=summary,
description=description,
details=details,
recurrence=recurrence_label,
reference=None,
compare_fields=compare_fields,
ai_payload=ai_payload,
calcium_event=event,
scandium_detail=None,
)
def build_calcium_events_for_date(
events_path: Path,
preferences_path: Optional[Path],
tz: ZoneInfo,
target_date: date,
start_time: Optional[time],
end_time: Optional[time],
) -> List[EventView]:
vevents = build_vevents(
events_path,
category_filter=None,
tz_name=str(tz.key),
preferences_path=preferences_path,
exclude_categories=None,
debug=False,
use_utc=True,
)
results: List[EventView] = []
extra_open: List[Dict[str, str]] = []
for calcium_event in parse_calcium_events(
events_path,
category_filter=None,
exclude_categories=None,
preferences_path=preferences_path,
debug=False,
):
if calcium_event.categories:
continue
summary_text = (calcium_event.text or "").lstrip()
if summary_text[:4].lower() != "open":
continue
for vevent in build_vevent(
calcium_event,
tz,
category_value="Open",
use_utc=True,
debug=False,
):
if vevent:
extra_open.append(vevent)
def append_event(event: Dict[str, str]) -> None:
occurrence = calcium_occurrence_for_date(event, target_date, tz)
if not occurrence:
return
start_dt, end_dt = occurrence
if start_time and end_time:
if start_dt.timetz().replace(tzinfo=None) != start_time:
return
if end_dt.timetz().replace(tzinfo=None) != end_time:
return
results.append(build_event_view_from_calcium_occurrence(event, start_dt, end_dt))
for item in vevents:
append_event(item.event)
for event in extra_open:
append_event(event)
return results
def build_calcium_events_by_date(
events_path: Path,
preferences_path: Optional[Path],
tz: ZoneInfo,
dates: Iterable[date],
start_time: Optional[time],
end_time: Optional[time],
) -> Dict[date, List[EventView]]:
dates_sorted = sorted(dates)
results: Dict[date, List[EventView]] = {value: [] for value in dates_sorted}
if not dates_sorted:
return results
calcium_events = parse_calcium_events(
events_path,
category_filter=None,
exclude_categories=None,
preferences_path=preferences_path,
debug=False,
)
vevents: List[Dict[str, str]] = []
for calcium_event in calcium_events:
category_value = calcium_event.categories[0] if calcium_event.categories else None
if category_value:
for vevent in build_vevent(
calcium_event,
tz,
category_value=category_value,
use_utc=True,
debug=False,
):
if vevent:
vevents.append(vevent)
if calcium_event.categories:
continue
summary_text = (calcium_event.text or "").lstrip()
if summary_text[:4].lower() != "open":
continue
for vevent in build_vevent(
calcium_event,
tz,
category_value="Open",
use_utc=True,
debug=False,
):
if vevent:
vevents.append(vevent)
parsed: List[ParsedCalciumVEvent] = []
for event in vevents:
start_dt = parse_ics_datetime(event.get("DTSTART", ""), tz)
end_dt = parse_ics_datetime(event.get("DTEND", ""), tz)
if not start_dt or not end_dt:
continue
rrule = event.get("RRULE")
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) if rrule else None
until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) if rule else None
parsed.append(
ParsedCalciumVEvent(
event=event,
start_dt=start_dt,
end_dt=end_dt,
duration=end_dt - start_dt,
rrule=rrule,
rule=rule,
until_date=until_date,
exdates=parse_exdate_dates(event.get("EXDATE", "")),
)
)
min_date = dates_sorted[0]
max_date = dates_sorted[-1]
for item in parsed:
start_date = item.start_dt.date()
if not item.rrule:
if start_date not in results:
continue
if start_time and end_time:
if item.start_dt.timetz().replace(tzinfo=None) != start_time:
continue
if item.end_dt.timetz().replace(tzinfo=None) != end_time:
continue
results[start_date].append(
build_event_view_from_calcium_occurrence(item.event, item.start_dt, item.end_dt)
)
continue
range_start = max(start_date, min_date)
range_end = min(item.until_date, max_date) if item.until_date else max_date
if range_end < range_start:
continue
start_idx = bisect_left(dates_sorted, range_start)
end_idx = bisect_right(dates_sorted, range_end)
for target_date in dates_sorted[start_idx:end_idx]:
if target_date in item.exdates:
continue
if not item.rule or not recurrence_occurs_on(target_date, start_date, item.rule):
continue
new_start = datetime.combine(target_date, item.start_dt.timetz())
new_end = new_start + item.duration
if start_time and end_time:
if new_start.timetz().replace(tzinfo=None) != start_time:
continue
if new_end.timetz().replace(tzinfo=None) != end_time:
continue
results[target_date].append(
build_event_view_from_calcium_occurrence(item.event, new_start, new_end)
)
return results
def format_recurrence(rule: Optional[Dict], start_dt: datetime) -> str:
if not rule:
return "none"
rule_type = rule.get("type") or "none"
if rule_type == "none":
return "none"
parts = [rule_type]
interval = rule.get("interval")
if interval:
parts.append(f"every {interval}")
weekdays = rule.get("weekdays") or []
if rule_type == "weekly":
normalized_days: List[int] = []
for day in weekdays:
try:
normalized_days.append(int(day))
except (TypeError, ValueError):
continue
if not normalized_days:
normalized_days = [(start_dt.weekday() + 1) % 7]
readable = format_weekdays(normalized_days)
if readable:
parts.append(f"on {readable}")
until = rule.get("repeatTerminationDate")
if until:
until_date = parse_booked_until_date(until)
if until_date:
parts.append(f"until {until_date.isoformat()}")
return " ".join(parts)
def build_scandium_event_view_from_detail(
detail: Dict[str, object],
tz: ZoneInfo,
) -> Optional[EventView]:
start_raw = detail.get("startDate") or detail.get("startDateTime")
end_raw = detail.get("endDate") or detail.get("endDateTime")
start_dt = parse_api_datetime(start_raw, tz) if start_raw else None
end_dt = parse_api_datetime(end_raw, tz) if end_raw else None
if not start_dt or not end_dt:
return None
title = (detail.get("title") or "").strip()
description = detail.get("description")
reference = detail.get("referenceNumber") or None
participant_items = detail.get("participants", [])
invitee_items = detail.get("invitees", [])
participants: List[str] = []
for entry in list(participant_items) + list(invitee_items):
if isinstance(entry, dict):
first = (entry.get("firstName") or "").strip()
last = (entry.get("lastName") or "").strip()
email = (entry.get("emailAddress") or "").strip()
label = " ".join(piece for piece in [first, last] if piece).strip()
if email:
if label:
label = f"{label} <{email}>"
else:
label = email
if label:
participants.append(label)
elif isinstance(entry, str):
label = entry.strip()
if label:
participants.append(label)
participant_text = ", ".join(p.strip() for p in participants if p.strip())
label_map = {
"patient source": "PAT",
"scanned person initials": "SPI",
"body part": "BOD",
"contrast": "CON",
}
attributes: List[Tuple[str, str]] = []
scan_type_value: Optional[str] = None
spi_value: Optional[str] = None
patient_source: Optional[str] = None
body_part: Optional[str] = None
for attr in detail.get("customAttributes", []):
label = (attr.get("label") or "").strip()
value = (attr.get("value") or "").strip()
label_key = label.lower()
if not label or not value:
continue
if label_key == "scan type / pi" and not scan_type_value:
scan_type_value = value
continue
if label_key == "scanned person initials" and not spi_value:
spi_value = value
if label_key == "patient source" and not patient_source:
patient_source = value
if label_key == "body part" and not body_part:
body_part = value
attributes.append((label_map.get(label_key, label), value))
recurrence = format_recurrence(detail.get("recurrenceRule"), start_dt)
details: List[Tuple[str, str]] = []
compare_fields: Dict[str, str] = {}
if scan_type_value:
details.append(("TYP", scan_type_value))
compare_fields["TYP"] = scan_type_value
if scan_type_value and scan_type_value.strip().lower() == "notice":
if description and title and description.strip() == title.strip():
title = ""
else:
if description and title and description.strip() == title.strip():
description = ""
if description:
details.append(("SUM", description))
compare_fields["SUM"] = description
if title:
details.append(("TIT", title))
if not compare_fields.get("SUM"):
compare_fields["SUM"] = title
if recurrence != "none":
details.append(("REC", recurrence))
compare_fields["REC"] = recurrence
if participant_text:
details.append(("PAR", participant_text))
for label, value in attributes:
details.append((label, value))
ai_payload = {
"scan_type_pi": scan_type_value or "",
"description": description or "",
"title": title,
"recurrence": recurrence or "none",
"participants": participants,
"attributes": {label: value for label, value in attributes},
"spi": spi_value or "",
"patient_source": patient_source or "",
"body_part": body_part or "",
}
return EventView(
start=start_dt,
end=end_dt,
title=title,
description=description,
details=details,
recurrence=recurrence,
reference=reference,
compare_fields=compare_fields,
ai_payload=ai_payload,
calcium_event=None,
scandium_detail=detail,
)
def build_scandium_events_for_date(
api: BookedAPI,
resource_name: str,
tz: ZoneInfo,
target_date: date,
start_time: Optional[time],
end_time: Optional[time],
resource: Optional[Dict[str, object]] = None,
) -> List[EventView]:
if resource is None:
resource = api.find_resource_by_name(resource_name)
if not resource:
raise RuntimeError(f"Resource not found: {resource_name}")
start_date = target_date.strftime("%Y-%m-%d")
end_date = (target_date + timedelta(days=1)).strftime("%Y-%m-%d")
reservations = api.list_reservations(
start_date=start_date,
end_date=end_date,
resource_id=resource["resourceId"],
)
results: List[EventView] = []
for reservation in reservations:
reference = reservation.get("referenceNumber", "")
detail = reservation
needs_detail = False
if reference:
for field in ("customAttributes", "participants", "invitees", "recurrenceRule"):
if field not in reservation:
needs_detail = True
break
if not needs_detail:
if not reservation.get("customAttributes") or reservation.get("recurrenceRule") is None:
needs_detail = True
if needs_detail:
try:
detail = api.get_reservation(reference)
except Exception:
detail = reservation
view = build_scandium_event_view_from_detail(detail, tz)
if not view:
continue
if view.start.date() != target_date:
continue
if start_time and end_time:
if view.start.timetz().replace(tzinfo=None) != start_time:
continue
if view.end.timetz().replace(tzinfo=None) != end_time:
continue
results.append(view)
return results
def group_by_time(events: Iterable[EventView]) -> Dict[Tuple[datetime, datetime], List[EventView]]:
grouped: Dict[Tuple[datetime, datetime], List[EventView]] = {}
for event in events:
key = (event.start, event.end)
grouped.setdefault(key, []).append(event)
return grouped
def format_event_label(event: EventView, width: int) -> str:
if not event.details:
title = event.title.strip() if event.title else "(no title)"
if event.description:
return f"{title}\n{event.description.strip()}"
return title
lines: List[str] = []
for key, value in event.details:
label = f"{key}: "
available = max(10, width - len(label))
wrapped = textwrap.wrap(value, width=available) or [""]
for idx, chunk in enumerate(wrapped):
prefix = label if idx == 0 else " " * len(label)
lines.append(f"{prefix}{BOLD}{chunk}{RESET}")
return "\n".join(lines)
def build_compare_key(calcium: EventView, scandium: EventView) -> str:
payload = {
"calcium": calcium.ai_payload,
"scandium": scandium.ai_payload,
}
return json.dumps(payload, sort_keys=True)
def build_ai_prompt(calcium: EventView, scandium: EventView) -> str:
return (
"You are comparing a Calcium calendar event (unstructured) to a Scandium event "
"(structured). Determine whether they refer to the same booking. The Scandium "
"side has structured fields (PAT, Body part, SPI, Participants, Title) that may "
"appear embedded in Calcium SUM. The first words of Calcium SUM often map to "
"Scandium TIT. Scanned person initials might be in Calcium SUM and Scandium SPI. "
"Body part wording can differ (e.g., 'L/Spine' vs 'L Spine'). "
"Emails or names in Calcium SUM often correspond to Scandium Participants. "
"If Scandium fields are contained in Calcium SUM, treat that as a match even if "
"the strings are not identical. If both sides provide PAT or Body part and they "
"conflict, mark as mismatch.\n\n"
"Calcium event:\n"
f"{json.dumps(calcium.ai_payload, indent=2)}\n\n"
"Scandium event:\n"
f"{json.dumps(scandium.ai_payload, indent=2)}\n\n"
"Respond with JSON:\n"
"{\n"
' "match": true|false,\n'
' "field_matches": {"TYP": "match|mismatch|unclear", "SUM": "match|mismatch|unclear", "REC": "match|mismatch|unclear"},\n'
' "notes": "short reason"\n'
"}\n"
)
def ai_compare(
calcium: EventView,
scandium: EventView,
model: str,
api_key: str,
cache: Dict[str, Dict[str, object]],
) -> Dict[str, object]:
key = build_compare_key(calcium, scandium)
with CACHE_LOCK:
if key in cache:
return cache[key]
import requests
prompt = build_ai_prompt(calcium, scandium)
response = requests.post(
"https://api.openai.com/v1/responses",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": model,
"input": [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": prompt,
}
],
}
],
},
timeout=30,
)
if response.status_code != 200:
raise RuntimeError(f"OpenAI error {response.status_code}: {response.text}")
payload = response.json()
text = ""
for item in payload.get("output", []):
if item.get("type") == "message":
for content in item.get("content", []):
if content.get("type") == "output_text":
text += content.get("text", "")
try:
result = json.loads(text)
except json.JSONDecodeError:
result = {"match": False, "field_matches": {}, "notes": "AI response parse error"}
with CACHE_LOCK:
cache[key] = result
return result
def compare_events(
calcium: EventView,
scandium: EventView,
use_ai: bool,
model: str,
api_key: Optional[str],
cache: Dict[str, Dict[str, object]],
) -> Dict[str, object]:
calcium_spi = set(calcium.ai_payload.get("initials", []) or [])
scandium_spi = (scandium.ai_payload.get("spi") or "").strip()
if scandium_spi and calcium_spi and scandium_spi not in calcium_spi:
return {
"match": False,
"field_matches": {"SUM": "mismatch"},
"notes": "initials mismatch",
"source": "guard",
}
calcium_typ = calcium.compare_fields.get("TYP", "")
scandium_typ = scandium.compare_fields.get("TYP", "")
if calcium_typ in {"PM / Service", "PM-Service", "PM Service", "PM/Service"} and scandium_typ == "Service":
calcium_typ = "Service"
exact = {
"TYP": calcium_typ == scandium_typ,
"SUM": calcium.compare_fields.get("SUM", "") == scandium.compare_fields.get("SUM", ""),
"REC": calcium.compare_fields.get("REC", "") == scandium.compare_fields.get("REC", ""),
}
if all(exact.values()):
return {
"match": True,
"field_matches": {key: "match" for key in exact},
"notes": "exact match",
"source": "exact",
}
if not use_ai or not api_key:
mismatched = [key for key, ok in exact.items() if not ok]
return {
"match": False,
"field_matches": {key: "mismatch" if key in mismatched else "match" for key in exact},
"notes": "exact mismatch",
"source": "exact",
}
try:
result = ai_compare(calcium, scandium, model, api_key, cache)
# Post-check: PAT/body mismatches override AI match when both sides provide values.
calcium_pat = (calcium.ai_payload.get("patient_source") or "").strip()
scandium_pat = (scandium.ai_payload.get("patient_source") or "").strip()
calcium_body = (calcium.ai_payload.get("body_part") or "").strip()
scandium_body = (scandium.ai_payload.get("body_part") or "").strip()
if result.get("match"):
if scandium_pat and calcium_pat:
if normalize_compare_value(scandium_pat) != normalize_compare_value(calcium_pat):
result["match"] = False
field_matches = result.get("field_matches") or {}
field_matches["SUM"] = "mismatch"
result["field_matches"] = field_matches
result["notes"] = "patient source mismatch"
if result.get("match") and scandium_body and calcium_body:
norm_scandium_body = normalize_compare_value(scandium_body)
norm_calcium_body = normalize_compare_value(calcium_body)
if norm_scandium_body != norm_calcium_body:
if norm_scandium_body not in norm_calcium_body and norm_calcium_body not in norm_scandium_body:
result["match"] = False
field_matches = result.get("field_matches") or {}
field_matches["SUM"] = "mismatch"
result["field_matches"] = field_matches
result["notes"] = "body part mismatch"
result["source"] = "ai"
return result
except Exception as exc:
mismatched = [key for key, ok in exact.items() if not ok]
return {
"match": False,
"field_matches": {key: "mismatch" if key in mismatched else "match" for key in exact},
"notes": f"ai error: {exc}",
"source": "ai-error",
}
def input_with_default(label: str, current: str) -> str:
prompt = f"{label}: "
if HAS_PROMPT_TOOLKIT:
response = pt_prompt(prompt, default=current or "").strip()
else:
def hook() -> None:
readline.insert_text(current)
readline.redisplay()
readline.set_startup_hook(hook)
try:
response = input(prompt).strip()
finally:
readline.set_startup_hook(None)
if response == "":
return current
if response == "-":
return ""
return response
def input_sum_with_default(current: str, calcium_sum: str) -> str:
suffix = " (c=copy from calcium, -=clear)"
label = f"SUM (Description){suffix}"
prompt = f"{label}: "
if HAS_PROMPT_TOOLKIT:
response = pt_prompt(prompt, default=current or "").strip()
if response == "":
return current
else:
def hook() -> None:
readline.insert_text(current)
readline.redisplay()
readline.set_startup_hook(hook)
try:
response = input(prompt).strip()
finally:
readline.set_startup_hook(None)
if response == "":
return current
if response == "-":
return ""
if response.lower() == "c" and calcium_sum:
return calcium_sum
return response
def prompt_yes_no(prompt: str, default: bool = False) -> bool:
suffix = " [Y/n/h]: " if default else " [y/N/h]: "
while True:
response = input(prompt + suffix).strip().lower()
if response == "h":
print("Options: y=yes, n=no.")
continue
if not response:
return default
return response in {"y", "yes"}
def prompt_yes_no_accept(
prompt: str,
default: bool = False,
include_reload: bool = False,
include_clear: bool = False,
include_update_rec: bool = False,
) -> str:
if include_reload and include_clear and include_update_rec:
suffix = " [y/N/a/r/c/u/h]: " if not default else " [Y/n/a/r/c/u/h]: "
help_text = "Options: y=yes, n=no, a=accept, r=reload, c=clear SUM, u=update REC."
elif include_reload and include_clear:
suffix = " [y/N/a/r/c/h]: " if not default else " [Y/n/a/r/c/h]: "
help_text = "Options: y=yes, n=no, a=accept, r=reload, c=clear SUM."
elif include_reload and include_update_rec:
suffix = " [y/N/a/r/u/h]: " if not default else " [Y/n/a/r/u/h]: "
help_text = "Options: y=yes, n=no, a=accept, r=reload, u=update REC."
elif include_clear and include_update_rec:
suffix = " [y/N/a/c/u/h]: " if not default else " [Y/n/a/c/u/h]: "
help_text = "Options: y=yes, n=no, a=accept, c=clear SUM, u=update REC."
elif include_reload:
suffix = " [y/N/a/r/h]: " if not default else " [Y/n/a/r/h]: "
help_text = "Options: y=yes, n=no, a=accept, r=reload."
elif include_clear:
suffix = " [y/N/a/c/h]: " if not default else " [Y/n/a/c/h]: "
help_text = "Options: y=yes, n=no, a=accept, c=clear SUM."
elif include_update_rec:
suffix = " [y/N/a/u/h]: " if not default else " [Y/n/a/u/h]: "
help_text = "Options: y=yes, n=no, a=accept, u=update REC."
else:
suffix = " [y/N/a/h]: " if not default else " [Y/n/a/h]: "
help_text = "Options: y=yes, n=no, a=accept."
while True:
response = input(prompt + suffix).strip().lower()
if response == "h":
print(help_text)
continue
if response == "r" and include_reload:
return "reload"
if response == "c" and include_clear:
return "clear"
if response == "u" and include_update_rec:
return "update_rec"
if response == "a":
return "accept"
if not response:
return "yes" if default else "no"
return "yes" if response in {"y", "yes"} else "no"
def prompt_edit_scope(
is_recurring: bool,
default_all: bool = False,
) -> str:
if not is_recurring:
return "this"
while True:
prompt = "Edit scope [t]his/[a]ll/[h]: "
if default_all:
prompt = "Edit scope [t]his/[A]ll/[h]: "
response = input(prompt).strip().lower()
if response == "h":
print("Options: t=this instance, a=all instances.")
continue
if not response:
return "full" if default_all else "this"
if response.startswith("a"):
return "full"
return "this"
def update_reservation_raw(
api: BookedAPI,
reference: str,
data: Dict[str, object],
update_scope: str,
) -> Dict:
return api._request(
"POST",
f"Reservations/{reference}",
params={"updateScope": update_scope},
json=data,
)
def update_scandium_event(
api: BookedAPI,
sca: EventView,
tz: ZoneInfo,
fields: Optional[set[str]] = None,
calcium: Optional[EventView] = None,
prefill: Optional[Dict[str, str]] = None,
prompt: bool = True,
update_scope_override: Optional[str] = None,
recurrence_rule_override: Optional[Dict[str, object]] = None,
) -> Optional[EventView]:
detail = sca.scandium_detail or {}
reference = sca.reference
if not reference:
return sca
title_current = (detail.get("title") or "").strip()
desc_current = (detail.get("description") or "").strip()
typ_current = ""
pat_current = ""
body_current = ""
spi_current = ""
attr_map: Dict[str, Tuple[int, str]] = {}
for attr in detail.get("customAttributes", []):
label = (attr.get("label") or "").strip()
value = (attr.get("value") or "").strip()
if label and value:
attr_map[label.lower()] = (int(attr.get("id")), value)
if "scan type / pi" in attr_map:
typ_current = attr_map["scan type / pi"][1]
if "patient source" in attr_map:
pat_current = attr_map["patient source"][1]
if "body part" in attr_map:
body_current = attr_map["body part"][1]
if "scanned person initials" in attr_map:
spi_current = attr_map["scanned person initials"][1]
if not typ_current:
typ_current = sca.compare_fields.get("TYP", "")
if not pat_current:
pat_current = (sca.ai_payload.get("patient_source") or "")
if not body_current:
body_current = (sca.ai_payload.get("body_part") or "")
if not spi_current:
spi_current = (sca.ai_payload.get("spi") or "")
fields = fields or {"TYP", "PAT", "BODY", "SPI", "SUM"}
typ_input = typ_current
sum_input = desc_current
tit_input = title_current
pat_input = pat_current
body_input = body_current
spi_input = spi_current
par_input = ""
rec_input = sca.recurrence or "none"
if prefill and "TYP" in prefill:
typ_current = prefill["TYP"]
typ_input = prefill["TYP"]
if "TYP" in fields and prompt:
typ_input = input_with_default("TYP (Scan Type / PI)", typ_current)
if prefill and "SPI" in prefill:
spi_current = prefill["SPI"]
spi_input = prefill["SPI"]
if "SPI" in fields and prompt:
spi_input = input_with_default("SPI (Scanned Person Initials)", spi_current)
typ_is_clinical = (typ_input or "").strip().lower() == "clinical"
typ_changed_from_clinical_to_open = (
(typ_current or "").strip().lower() == "clinical"
and (typ_input or "").strip().lower() == "open"
)
if prefill and "PAT" in prefill:
pat_current = prefill["PAT"]
pat_input = prefill["PAT"]
if "PAT" in fields and typ_is_clinical and prompt:
pat_input = input_with_default("PAT (Patient Source)", pat_current)
else:
pat_input = ""
if prefill and "BODY" in prefill:
body_current = prefill["BODY"]
body_input = prefill["BODY"]
if "BODY" in fields and typ_is_clinical and prompt:
body_input = input_with_default("Body part", body_current)
else:
body_input = ""
if typ_changed_from_clinical_to_open:
body_input = ""
spi_input = ""
# Prompt SUM last.
if prefill and "SUM" in prefill:
sum_input = prefill["SUM"]
if "SUM" in fields and prompt:
calcium_sum = ""
if calcium:
calcium_sum = calcium.compare_fields.get("SUM", "")
sum_input = input_sum_with_default(sum_input, calcium_sum)
participants_current = []
for p in list(detail.get("participants", [])) + list(detail.get("invitees", [])):
if isinstance(p, dict):
email = (p.get("emailAddress") or "").strip()
if email:
participants_current.append(email)
elif isinstance(p, str) and p.strip():
participants_current.append(p.strip())
par_current = ", ".join(participants_current)
par_input = par_current
rec_current = sca.recurrence or "none"
rec_input = rec_current
if update_scope_override:
update_scope = update_scope_override
else:
update_scope = prompt_edit_scope(bool(detail.get("isRecurring")))
custom_attrs: List[Dict[str, object]] = []
label_to_id = {
"Scan Type / PI": 1,
"Patient Source": 11,
"Body part": 10,
"Scanned Person Initials": 2,
}
def apply_attr(label: str, value: str) -> None:
custom_attrs.append({"attributeId": label_to_id[label], "attributeValue": value})
apply_attr("Scan Type / PI", typ_input or "")
apply_attr("Patient Source", pat_input or "")
apply_attr("Body part", body_input or "")
apply_attr("Scanned Person Initials", spi_input or "")
participant_ids: List[int] = []
if par_input.strip():
for raw in par_input.split(","):
email = raw.strip()
if not email:
continue
user = api.find_user_by_email(email)
if user:
participant_ids.append(user["id"])
else:
print(f"Participant not found: {email}")
recurrence_rule = None
if rec_input.strip().lower() == "none":
recurrence_rule = {"type": "none"}
elif rec_input.strip() and rec_input.strip() != rec_current:
rrule = rec_input.strip()
if rrule.upper().startswith("RRULE:"):
rrule = rrule.split(":", 1)[1]
recurrence_rule = parse_rrule(rrule, sca.start.replace(tzinfo=None))
data = {
"resourceId": int(detail.get("resourceId")),
"userId": api.get_user_id(),
"startDateTime": detail.get("startDate") or detail.get("startDateTime"),
"endDateTime": detail.get("endDate") or detail.get("endDateTime"),
"title": tit_input,
"description": sum_input or "",
"participants": participant_ids,
"customAttributes": custom_attrs,
}
if recurrence_rule_override is not None:
data["recurrenceRule"] = recurrence_rule_override
elif recurrence_rule is not None:
data["recurrenceRule"] = recurrence_rule
update_reservation_raw(api, reference, data, update_scope)
new_detail = api.get_reservation(reference)
return build_scandium_event_view_from_detail(new_detail, tz)
def build_event_payload_simple(
api: BookedAPI,
event: Dict[str, str],
resource: Dict[str, object],
tz: ZoneInfo,
) -> Dict[str, object]:
dtstart = event.get("DTSTART", "")
dtend = event.get("DTEND", "")
if not dtstart or not dtend:
raise ValueError("DTSTART and DTEND are required")
start_iso = ics_datetime_to_iso(dtstart)
end_iso = ics_datetime_to_iso(dtend)
summary_raw = event.get("SUMMARY", "").strip()
summary = normalize_summary(summary_raw)
categories = event.get("CATEGORIES", "")
is_open_category = categories.strip().lower() == "open"
summary_clean = summary
title = summary_clean.splitlines()[0].strip() if summary_clean else ""
matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories)
if not matched_pi and categories.strip():
matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories.split()[0])
if not matched_pi:
matched_pi = categories.strip() or "Unknown"
initials: List[str] = []
if not is_open_category:
initials = extract_initials(summary)
custom_attributes = [
{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": matched_pi}
]
if initials:
custom_attributes.append(
{
"attributeId": SCANNED_INITIALS_ATTRIBUTE_ID,
"attributeValue": initials[0],
}
)
participant_ids: List[int] = []
seen_user_ids: set[int] = set()
if not is_open_category:
for email in extract_emails(summary):
user = api.find_user_by_email(email)
if not user:
swapped = swap_email_domain(email)
if swapped:
user = api.find_user_by_email(swapped)
if user:
user_id = int(user["id"])
if user_id not in seen_user_ids:
participant_ids.append(user_id)
seen_user_ids.add(user_id)
recurrence_rule = None
if event.get("RRULE"):
start_dt = parse_ics_datetime(dtstart, tz)
if start_dt:
recurrence_rule = parse_rrule(event.get("RRULE", ""), start_dt.replace(tzinfo=None))
is_notice_category = categories.strip().lower() == "notice"
if title and summary_clean and title.strip() == summary_clean.strip():
if is_notice_category:
title = ""
else:
summary_clean = ""
if is_open_category:
title = ""
return {
"resource_id": resource["resourceId"],
"start_datetime": start_iso,
"end_datetime": end_iso,
"title": title,
"description": summary_clean or None,
"participants": participant_ids if participant_ids else None,
"custom_attributes": custom_attributes,
"recurrence_rule": recurrence_rule,
}
def create_scandium_from_calcium(
api: BookedAPI,
resource: Dict[str, object],
tz: ZoneInfo,
calcium_event: Dict[str, str],
) -> Optional[EventView]:
categories = calcium_event.get("CATEGORIES", "") or ""
clinical_created = False
if "Clinical" in categories:
description_raw = calcium_event.get("SUMMARY", "")
pat, body, spi, description = parse_clinical_prefix(description_raw)
if spi and contains_spi_blocker(description_raw):
spi = None
if not spi:
spi = extract_scanned_initials(description_raw)
pi_value = api.find_attribute_value(1, "Clinical") or "Clinical"
custom_attrs = [{"attributeId": 1, "attributeValue": pi_value}]
if pat:
custom_attrs.append({"attributeId": 11, "attributeValue": pat})
if body:
custom_attrs.append({"attributeId": 10, "attributeValue": body})
if spi:
custom_attrs.append({"attributeId": 2, "attributeValue": spi})
if has_contrast_indicator(description):
custom_attrs.append({"attributeId": 3, "attributeValue": "Yes"})
recurrence_rule = None
if calcium_event.get("RRULE"):
start_dt = parse_ics_datetime(calcium_event.get("DTSTART", ""), tz)
if start_dt:
recurrence_rule = parse_rrule(
calcium_event.get("RRULE", ""), start_dt.replace(tzinfo=None)
)
result = api.create_reservation(
resource_id=resource["resourceId"],
start_datetime=ics_datetime_to_iso(calcium_event.get("DTSTART", "")),
end_datetime=ics_datetime_to_iso(calcium_event.get("DTEND", "")),
title="",
description=description,
custom_attributes=custom_attrs,
recurrence_rule=recurrence_rule,
)
clinical_created = True
elif "Service" in categories:
service_value = api.find_attribute_value(PI_ATTRIBUTE_ID, "Service") or "Service"
start_iso = ics_datetime_to_iso(calcium_event.get("DTSTART", ""))
end_iso = ics_datetime_to_iso(calcium_event.get("DTEND", ""))
summary = calcium_event.get("SUMMARY", "")
recurrence_rule = None
if calcium_event.get("RRULE"):
start_dt = parse_ics_datetime(calcium_event.get("DTSTART", ""), tz)
if start_dt:
recurrence_rule = parse_rrule(
calcium_event.get("RRULE", ""), start_dt.replace(tzinfo=None)
)
result = api.create_reservation(
resource_id=resource["resourceId"],
start_datetime=start_iso,
end_datetime=end_iso,
title="",
description=normalize_summary(summary),
custom_attributes=[{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": service_value}],
recurrence_rule=recurrence_rule,
)
else:
payload = build_event_payload_simple(api, calcium_event, resource, tz)
result = api.create_reservation(
resource_id=payload["resource_id"],
start_datetime=payload["start_datetime"],
end_datetime=payload["end_datetime"],
title=payload["title"],
description=payload.get("description"),
participants=payload.get("participants"),
custom_attributes=payload.get("custom_attributes"),
recurrence_rule=payload.get("recurrence_rule"),
)
reference = result.get("referenceNumber")
if not reference:
print(f"Create failed: {result}")
return None
new_detail = api.get_reservation(reference)
view = build_scandium_event_view_from_detail(new_detail, tz)
return view
def build_pairs(
calcium_events: List[EventView],
scandium_events: List[EventView],
date_value: date,
) -> List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]]:
def normalize_match_text(text: str) -> str:
cleaned = " ".join(text.strip().split())
return cleaned.lower()
def normalize_open_summary(text: str) -> str:
cleaned = normalize_match_text(text)
return re.sub(r"^open\b\s*[-:]*\s*", "", cleaned, flags=re.IGNORECASE)
def normalize_typ_for_pairing(raw: str) -> str:
cleaned = raw.strip().lower()
if not cleaned:
return ""
if "service" in cleaned and "pm" in cleaned:
return "service"
if "/" in cleaned:
cleaned = cleaned.split("/")[-1].strip()
if cleaned:
cleaned = cleaned.split()[0]
return cleaned
def match_fields(event: EventView) -> Tuple[str, str]:
typ = (event.compare_fields.get("TYP") or "").strip()
summary = (event.compare_fields.get("SUM") or "").strip()
if not typ and summary.lower().startswith("open"):
typ = "Open"
typ_key = normalize_typ_for_pairing(typ)
if typ_key == "open":
summary = normalize_open_summary(summary)
else:
summary = normalize_match_text(summary)
return (typ_key, summary)
calcium_grouped = group_by_time(calcium_events)
scandium_grouped = group_by_time(scandium_events)
all_keys = sorted(
set(calcium_grouped) | set(scandium_grouped),
key=lambda key: (key[0], key[1]),
)
pairs: List[
Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]
] = []
for key in all_keys:
calcium_list = calcium_grouped.get(key, [])
scandium_list = scandium_grouped.get(key, [])
if not calcium_list or not scandium_list:
max_count = max(len(calcium_list), len(scandium_list))
for idx in range(max_count):
cal = calcium_list[idx] if idx < len(calcium_list) else None
sca = scandium_list[idx] if idx < len(scandium_list) else None
pairs.append((cal, sca, key, date_value))
continue
cal_remaining = list(range(len(calcium_list)))
sca_remaining = list(range(len(scandium_list)))
matched: List[Tuple[int, int]] = []
sca_by_exact: Dict[Tuple[str, str], List[int]] = {}
for idx in sca_remaining:
sca_key = match_fields(scandium_list[idx])
sca_by_exact.setdefault(sca_key, []).append(idx)
for cal_idx in cal_remaining[:]:
cal_key = match_fields(calcium_list[cal_idx])
candidates = sca_by_exact.get(cal_key, [])
if candidates:
sca_idx = candidates.pop(0)
matched.append((cal_idx, sca_idx))
cal_remaining.remove(cal_idx)
sca_remaining.remove(sca_idx)
sca_by_typ: Dict[str, List[int]] = {}
for idx in sca_remaining:
typ, _ = match_fields(scandium_list[idx])
sca_by_typ.setdefault(typ, []).append(idx)
for cal_idx in cal_remaining[:]:
typ, _ = match_fields(calcium_list[cal_idx])
candidates = sca_by_typ.get(typ, [])
if candidates:
sca_idx = candidates.pop(0)
matched.append((cal_idx, sca_idx))
cal_remaining.remove(cal_idx)
sca_remaining.remove(sca_idx)
for cal_idx, sca_idx in matched:
pairs.append((calcium_list[cal_idx], scandium_list[sca_idx], key, date_value))
for cal_idx in cal_remaining:
pairs.append((calcium_list[cal_idx], None, key, date_value))
for sca_idx in sca_remaining:
pairs.append((None, scandium_list[sca_idx], key, date_value))
return pairs
def precompute_ai_results(
pairs: List[
Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]
],
hashes: List[str],
accepted_hashes: set[str],
use_ai: bool,
model: str,
api_key: Optional[str],
) -> Dict[int, Dict[str, object]]:
cache: Dict[str, Dict[str, object]] = {}
results: Dict[int, Dict[str, object]] = {}
to_schedule: List[Tuple[int, EventView, EventView]] = []
for idx, (cal, sca, _key, _date_value) in enumerate(pairs):
accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes)
if not cal or not sca:
if accepted_hash:
results[idx] = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
continue
if accepted_hash:
results[idx] = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
continue
prelim = compare_events(cal, sca, False, model, api_key, cache)
if not use_ai or not api_key or prelim.get("source") == "guard" or prelim.get("match") is True:
results[idx] = prelim
continue
to_schedule.append((idx, cal, sca))
if not use_ai or not api_key or not to_schedule:
return results
max_workers = min(20, len(to_schedule))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(compare_events, cal, sca, True, model, api_key, cache): idx
for (idx, cal, sca) in to_schedule
}
for future in tqdm(as_completed(future_map), total=len(future_map), desc="AI compare"):
idx = future_map[future]
results[idx] = future.result()
return results
def compute_compare_results_for_pairs(
pairs: List[
Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]
],
hashes: List[str],
accepted_hashes: set[str],
use_ai: bool,
model: str,
api_key: Optional[str],
cache: Optional[Dict[str, Dict[str, object]]] = None,
) -> List[Optional[Dict[str, object]]]:
if cache is None:
cache = {}
results_list: List[Optional[Dict[str, object]]] = [None] * len(pairs)
to_schedule: List[Tuple[int, EventView, EventView]] = []
for idx, (cal, sca, _key, _date_value) in enumerate(pairs):
accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes)
if not cal or not sca:
if accepted_hash:
results_list[idx] = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
continue
if accepted_hash:
results_list[idx] = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
continue
prelim = compare_events(cal, sca, False, model, api_key, cache)
if not use_ai or not api_key or prelim.get("source") == "guard" or prelim.get("match") is True:
results_list[idx] = prelim
continue
to_schedule.append((idx, cal, sca))
if not use_ai or not api_key or not to_schedule:
return results_list
max_workers = min(20, len(to_schedule))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(compare_events, cal, sca, True, model, api_key, cache): idx
for (idx, cal, sca) in to_schedule
}
for future in tqdm(as_completed(future_map), total=len(future_map), desc="AI compare"):
idx = future_map[future]
results_list[idx] = future.result()
return results_list
def side_by_side_lines(
left: str,
right: str,
width: int,
gutter: int,
) -> List[str]:
left_lines = left.splitlines() or [""]
right_lines = right.splitlines() or [""]
rows = max(len(left_lines), len(right_lines))
output: List[str] = []
def pad_to_width(text: str) -> str:
visible_len = len(ANSI_RE.sub("", text))
if visible_len >= width:
return text
return text + (" " * (width - visible_len))
for idx in range(rows):
left_text = left_lines[idx] if idx < len(left_lines) else ""
right_text = right_lines[idx] if idx < len(right_lines) else ""
output.append(
f"{pad_to_width(left_text)}{' ' * gutter}{pad_to_width(right_text)}"
)
return output
def display_pairs(
dates_sorted: List[date],
pairs_by_date: Dict[
date,
List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]],
],
hashes_by_date: Dict[date, List[str]],
compare_results_by_date: Dict[date, List[Optional[Dict[str, object]]]],
calcium_by_date: Dict[date, List[EventView]],
scandium_by_date: Dict[date, List[EventView]],
accepted_hashes: set[str],
memory_path: Path,
tz: ZoneInfo,
use_ai: bool,
model: str,
api_key: Optional[str],
api: BookedAPI,
resource: Dict[str, object],
resource_name: str,
typ_filter: Optional[str],
start_time: Optional[time],
end_time: Optional[time],
) -> None:
terminal_width = shutil.get_terminal_size((120, 20)).columns
gutter = 4
column_width = max(30, (terminal_width - gutter) // 2)
if not pairs_by_date:
print("No events found on either side.")
return
def recurrence_dates_for_event(
calcium_event: Dict[str, str],
dates: Iterable[date],
) -> List[date]:
affected: List[date] = []
for candidate in dates:
occurrence = calcium_occurrence_for_date(calcium_event, candidate, tz)
if not occurrence:
continue
if start_time and end_time:
occ_start, occ_end = occurrence
if occ_start.timetz().replace(tzinfo=None) != start_time:
continue
if occ_end.timetz().replace(tzinfo=None) != end_time:
continue
affected.append(candidate)
return affected
def refresh_scandium_for_dates(dates_to_refresh: Iterable[date]) -> None:
for date_value in dates_to_refresh:
scandium_events = build_scandium_events_for_date(
api=api,
resource_name=resource_name,
tz=tz,
target_date=date_value,
start_time=start_time,
end_time=end_time,
resource=resource,
)
if typ_filter:
scandium_events = [
event
for event in scandium_events
if (event.compare_fields.get("TYP") or "").lower() == typ_filter.lower()
]
scandium_by_date[date_value] = scandium_events
pairs_by_date[date_value] = build_pairs(
calcium_by_date.get(date_value, []),
scandium_events,
date_value,
)
hashes = [
canonical_hash(cal, sca)
for cal, sca, _key, _date in pairs_by_date[date_value]
]
hashes_by_date[date_value] = hashes
compare_results_by_date[date_value] = compute_compare_results_for_pairs(
pairs_by_date[date_value],
hashes,
accepted_hashes,
use_ai=use_ai,
model=model,
api_key=api_key,
)
for date_value in dates_sorted:
pairs = pairs_by_date.get(date_value, [])
hashes = hashes_by_date.get(date_value, [])
compare_results = compare_results_by_date.get(date_value, [])
if not pairs:
continue
for idx, (cal, sca, key, _date_value) in enumerate(pairs):
start_dt, end_dt = key
compare_result = compare_results[idx] if idx < len(compare_results) else None
while True:
current_hash = canonical_hash(cal, sca)
if idx < len(hashes):
hashes[idx] = current_hash
accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes)
print("\n\n\n\n")
start_label = start_dt.astimezone(tz).strftime("%Y-%m-%d %H:%M")
end_label = end_dt.astimezone(tz).strftime("%H:%M")
header = f"{start_label} -> {end_label}"
print("=" * terminal_width)
print(header.center(terminal_width))
print("-" * terminal_width)
sca_label = "Scandium"
if sca and sca.reference:
sca_label = f"Scandium ({sca.reference})"
sca_link = f"https://scandium.mclean.harvard.edu/reservation/?rn={sca.reference}"
print(f"{''.ljust(column_width)}{' ' * gutter}{sca_link.ljust(column_width)}")
print(f"{'Calcium'.ljust(column_width)}{' ' * gutter}{sca_label.ljust(column_width)}")
print("-" * terminal_width)
left_text = format_event_label(cal, column_width) if cal else "missing"
right_text = format_event_label(sca, column_width) if sca else "missing"
for line in side_by_side_lines(
left_text, right_text, width=column_width, gutter=gutter
):
print(line)
if accepted_hash:
compare_result = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
if compare_result:
match_label = "MATCH" if compare_result.get("match") else "DIFF"
field_matches = compare_result.get("field_matches", {})
mismatched = [
key for key, status in field_matches.items() if status == "mismatch"
]
diff_line = "DIFF: none" if not mismatched else f"DIFF: {', '.join(mismatched)}"
source = compare_result.get("source", "exact")
notes = compare_result.get("notes", "")
note_suffix = ""
if source == "ai-error" and notes:
note_suffix = f" | {notes[:60]}"
is_match = compare_result.get("match") is True
is_clean = not mismatched
if source == "accepted":
color = BLUE
else:
color = GREEN if is_match and is_clean else ORANGE
compare_text = f"CHK: {match_label} | {diff_line} | {source}{note_suffix}"
left_only = f"{color}{compare_text}{RESET}"
for line in side_by_side_lines(
left_only,
"",
width=column_width,
gutter=gutter,
):
print(line)
if source == "accepted":
short_hash = accepted_hash[:8]
accepted_text = f"{BLUE}[ ACCEPTED - HASH {short_hash} ]{RESET}"
for line in side_by_side_lines(
accepted_text,
"",
width=column_width,
gutter=gutter,
):
print(line)
break
if compare_result and compare_result.get("match"):
action = prompt_yes_no_accept(
"Edit?",
default=False,
include_reload=True,
include_clear=True,
)
if action == "clear":
sca = update_scandium_event(
api,
sca,
tz,
fields={"SUM"},
calcium=cal,
prefill={"SUM": ""},
prompt=False,
update_scope_override="this",
)
if sca and cal:
compare_result = compare_events(cal, sca, False, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
if action == "reload":
if sca and sca.reference:
new_detail = api.get_reservation(sca.reference)
sca = build_scandium_event_view_from_detail(new_detail, tz)
if sca and cal:
compare_result = compare_events(cal, sca, use_ai, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
if action == "accept":
current_hash = canonical_hash(cal, sca)
if idx < len(hashes):
hashes[idx] = current_hash
if current_hash not in accepted_hashes:
store_accepted_hash(memory_path, current_hash)
accepted_hashes.add(current_hash)
compare_result = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
if idx < len(compare_results):
compare_results[idx] = compare_result
break
if action == "yes":
sca = update_scandium_event(api, sca, tz, calcium=cal)
if sca and cal:
compare_result = compare_events(cal, sca, False, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
break
if cal is None and sca is not None:
api.delete_reservation(sca.reference, update_scope="this")
sca = None
compare_result = None
if idx < len(compare_results):
compare_results[idx] = None
break
if sca is None and cal is not None:
action = prompt_yes_no_accept("Scandium missing. Create from Calcium?", default=True)
if action == "accept":
current_hash = canonical_hash(cal, sca)
if idx < len(hashes):
hashes[idx] = current_hash
if current_hash not in accepted_hashes:
store_accepted_hash(memory_path, current_hash)
accepted_hashes.add(current_hash)
compare_result = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
if idx < len(compare_results):
compare_results[idx] = compare_result
break
if action == "yes":
sca = create_scandium_from_calcium(api, resource, tz, cal.calcium_event)
if sca and cal:
compare_result = compare_events(cal, sca, use_ai, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
if cal.calcium_event and cal.calcium_event.get("RRULE"):
future_dates = [d for d in dates_sorted if d > date_value]
affected_dates = recurrence_dates_for_event(cal.calcium_event, future_dates)
if affected_dates:
refresh_scandium_for_dates(affected_dates)
continue
break
if cal is not None and sca is not None:
field_matches = compare_result.get("field_matches", {}) if compare_result else {}
rec_mismatch = field_matches.get("REC") == "mismatch"
action = prompt_yes_no_accept(
"Edit Scandium fields?",
default=False,
include_update_rec=rec_mismatch,
)
if action == "accept":
current_hash = canonical_hash(cal, sca)
if idx < len(hashes):
hashes[idx] = current_hash
if current_hash not in accepted_hashes:
store_accepted_hash(memory_path, current_hash)
accepted_hashes.add(current_hash)
compare_result = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
if idx < len(compare_results):
compare_results[idx] = compare_result
break
if action == "update_rec":
cal_rrule = (cal.calcium_event or {}).get("RRULE", "") if cal else ""
if cal_rrule:
recurrence_override = parse_rrule(
cal_rrule,
cal.start.replace(tzinfo=None),
)
else:
recurrence_override = {"type": "none"}
is_recurring = bool(sca.scandium_detail.get("isRecurring") if sca.scandium_detail else False)
if is_recurring:
scope = prompt_edit_scope(is_recurring, default_all=True)
else:
scope = "full"
sca = update_scandium_event(
api,
sca,
tz,
calcium=cal,
recurrence_rule_override=recurrence_override,
prompt=False,
update_scope_override=scope,
)
if sca:
compare_result = compare_events(cal, sca, False, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
if action == "yes":
recurrence_override = None
update_scope_override = None
if cal and compare_result:
if rec_mismatch:
if prompt_yes_no("Update REC to match Calcium?", default=True):
cal_rrule = (cal.calcium_event or {}).get("RRULE", "")
if cal_rrule:
recurrence_override = parse_rrule(
cal_rrule,
cal.start.replace(tzinfo=None),
)
else:
recurrence_override = {"type": "none"}
is_recurring = bool(sca.scandium_detail.get("isRecurring") if sca.scandium_detail else False)
if is_recurring:
update_scope_override = prompt_edit_scope(
is_recurring,
default_all=True,
)
else:
update_scope_override = "full"
sca = update_scandium_event(
api,
sca,
tz,
calcium=cal,
recurrence_rule_override=recurrence_override,
update_scope_override=update_scope_override,
)
if sca:
compare_result = compare_events(cal, sca, False, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
break
if compare_result and compare_result.get("match"):
pass
def resolve_preferences_path(events_path: Path, override: Optional[Path]) -> Optional[Path]:
if override:
return override
candidate = events_path.with_suffix(".Preferences")
if candidate.exists():
return candidate
return None
def main() -> int:
parser = argparse.ArgumentParser(
description="Compare Calcium and Scandium events for a single date.",
)
parser.add_argument(
"dates",
nargs="*",
type=parse_date,
help="Date(s) to compare (YYYY-MM-DD).",
)
parser.add_argument(
"--start-date",
type=parse_date,
default=None,
help="Start date (YYYY-MM-DD) for inclusive range.",
)
parser.add_argument(
"--end-date",
type=parse_date,
default=None,
help="End date (YYYY-MM-DD) for inclusive range.",
)
parser.add_argument(
"--calcium-events",
default=None,
help="Path to Calcium .Events file (defaults based on --resource).",
)
parser.add_argument(
"--preferences",
type=Path,
default=None,
help="Optional path to Calcium .Preferences file.",
)
parser.add_argument(
"--resource",
default=DEFAULT_RESOURCE,
help="Scandium resource name.",
)
parser.add_argument(
"--timezone",
default=DEFAULT_TZ,
help="Timezone for display and matching.",
)
parser.add_argument(
"--typ",
default=None,
help="Filter by TYP (Calcium category/Scandium PI value).",
)
parser.add_argument(
"--start-time",
type=parse_time_value,
default=None,
help="Filter by exact start time (HH:MM). Requires --end-time.",
)
parser.add_argument(
"--end-time",
type=parse_time_value,
default=None,
help="Filter by exact end time (HH:MM). Requires --start-time.",
)
parser.add_argument(
"--no-ai",
action="store_true",
help="Disable AI matching fallback.",
)
parser.add_argument(
"--ai-model",
default="gpt-5-nano",
help="OpenAI model for AI matching.",
)
args = parser.parse_args()
tz = ZoneInfo(args.timezone)
events_path_value = args.calcium_events
if not events_path_value:
events_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource)
if not events_path_value:
raise SystemExit(
f"No default Calcium .Events for resource '{args.resource}'. "
"Use --calcium-events to specify a file."
)
events_path = Path(events_path_value)
preferences_path = resolve_preferences_path(events_path, args.preferences)
rsync_cmd = [
"rsync",
"-rltv",
"ddrucker@calendar-actual:/var/www/cgi-bin/CalciumDir40/data/",
"data/",
]
try:
subprocess.run(rsync_cmd, check=True)
except subprocess.CalledProcessError as exc:
raise SystemExit(f"rsync failed: {exc}") from exc
if (args.start_time and not args.end_time) or (args.end_time and not args.start_time):
raise SystemExit("Both --start-time and --end-time are required together.")
if (args.start_date and not args.end_date) or (args.end_date and not args.start_date):
raise SystemExit("Both --start-date and --end-date are required together.")
if args.start_date and args.end_date and args.dates:
raise SystemExit("Use either explicit dates or --start-date/--end-date, not both.")
if not args.dates and not (args.start_date and args.end_date):
raise SystemExit("Provide at least one date or a --start-date/--end-date range.")
if args.start_date and args.end_date:
if args.end_date < args.start_date:
raise SystemExit("--end-date must be on or after --start-date.")
dates: List[date] = []
cursor = args.start_date
while cursor <= args.end_date:
dates.append(cursor)
cursor += timedelta(days=1)
else:
dates = list(args.dates)
api = BookedAPI()
resource = api.find_resource_by_name(args.resource)
if not resource:
raise SystemExit(f"Resource not found: {args.resource}")
dates_sorted = sorted(dates)
calcium_by_date: Dict[date, List[EventView]] = {}
scandium_by_date: Dict[date, List[EventView]] = {}
pairs_by_date: Dict[
date,
List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]],
] = {}
calcium_by_date = build_calcium_events_by_date(
events_path=events_path,
preferences_path=preferences_path,
tz=tz,
dates=dates_sorted,
start_time=args.start_time,
end_time=args.end_time,
)
if args.typ:
for date_value in dates_sorted:
calcium_by_date[date_value] = [
event
for event in calcium_by_date.get(date_value, [])
if (event.compare_fields.get("TYP") or "").lower() == args.typ.lower()
]
for date_value in tqdm(dates_sorted, desc="Reading Scandium"):
scandium_events = build_scandium_events_for_date(
api=api,
resource_name=args.resource,
tz=tz,
target_date=date_value,
start_time=args.start_time,
end_time=args.end_time,
resource=resource,
)
if args.typ:
scandium_events = [
event
for event in scandium_events
if (event.compare_fields.get("TYP") or "").lower() == args.typ.lower()
]
scandium_by_date[date_value] = scandium_events
for date_value in dates_sorted:
pairs_by_date[date_value] = build_pairs(
calcium_by_date.get(date_value, []),
scandium_by_date.get(date_value, []),
date_value,
)
api_key = os.environ.get("OPENAI_API_KEY")
memory_path = Path("sync-memory.dat")
accepted_hashes = load_accepted_hashes(memory_path)
hashes_by_date: Dict[date, List[str]] = {}
compare_results_by_date: Dict[date, List[Optional[Dict[str, object]]]] = {}
cache: Dict[str, Dict[str, object]] = {}
for date_value in dates_sorted:
pairs = pairs_by_date.get(date_value, [])
hashes = [canonical_hash(cal, sca) for cal, sca, _key, _date in pairs]
hashes_by_date[date_value] = hashes
compare_results_by_date[date_value] = compute_compare_results_for_pairs(
pairs,
hashes=hashes,
accepted_hashes=accepted_hashes,
use_ai=not args.no_ai,
model=args.ai_model,
api_key=api_key,
cache=cache,
)
display_pairs(
dates_sorted,
pairs_by_date,
hashes_by_date,
compare_results_by_date,
calcium_by_date,
scandium_by_date,
accepted_hashes,
memory_path,
tz,
use_ai=not args.no_ai,
model=args.ai_model,
api_key=api_key,
api=api,
resource=resource,
resource_name=args.resource,
typ_filter=args.typ,
start_time=args.start_time,
end_time=args.end_time,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment