Skip to content

Instantly share code, notes, and snippets.

@dmd
Created January 14, 2026 13:30
Show Gist options
  • Select an option

  • Save dmd/5fe58cbbe966c041a0ea608a9747f89a to your computer and use it in GitHub Desktop.

Select an option

Save dmd/5fe58cbbe966c041a0ea608a9747f89a to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S uv run --script
# /// script
# dependencies = ["requests"]
# ///
"""
Booked Scheduler API client library.
Common functions for interacting with the Booked Scheduler API at scandium.mclean.harvard.edu.
Usage:
from booked_api import BookedAPI
api = BookedAPI()
user = api.get_current_user()
resources = api.list_resources()
"""
import json
import os
import requests
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
class BookedAPI:
"""Client for Booked Scheduler API."""
def __init__(self, server: str = None, api_id: str = None, api_key: str = None):
"""Initialize the API client.
Args:
server: Server URL (default: from env or scandium.mclean.harvard.edu)
api_id: API ID (default: from SCANDIUM_API_KEY env var)
api_key: API Key (default: from SCANDIUM_API_KEY env var)
"""
self.server = server or os.environ.get(
"BOOKED_SERVER", "https://scandium.mclean.harvard.edu"
)
if api_id and api_key:
self._api_id = api_id
self._api_key = api_key
else:
creds = os.environ.get("SCANDIUM_API_KEY", "")
if ":" in creds:
self._api_id, self._api_key = creds.split(":", 1)
else:
raise ValueError(
"SCANDIUM_API_KEY environment variable must be set (format: api_id:api_key)"
)
self.headers = {
"X-Booked-ApiId": self._api_id,
"X-Booked-ApiKey": self._api_key,
}
self._session = requests.Session()
self._user_id: Optional[int] = None
def _request(
self,
method: str,
endpoint: str,
params: Dict = None,
json: Dict = None,
expect_json: bool = True,
) -> Dict | str:
"""Make an API request."""
url = f"{self.server}/Services/{endpoint.lstrip('/')}"
r = self._session.request(
method,
url,
headers=self.headers,
params=params,
json=json,
)
r.raise_for_status()
if not expect_json:
return r.text
try:
return r.json()
except requests.exceptions.JSONDecodeError as exc:
snippet = r.text[:500].strip()
raise RuntimeError(
f"Non-JSON response from {endpoint}: {snippet or '<empty>'}"
) from exc
# ==================== Account Methods ====================
def get_current_user(self) -> Dict:
"""Get current authenticated user info."""
attempts = [
("Accounts/", None),
("Accounts/Current", None),
("Users/Current", None),
]
last_result = None
for endpoint, params in attempts:
try:
result = self._request("GET", endpoint, params=params, expect_json=True)
last_result = result
if isinstance(result, dict):
if result.get("message") and not (
result.get("userId")
or result.get("id")
or result.get("user")
):
continue
return result
except (requests.HTTPError, RuntimeError):
pass
try:
raw = self._request("GET", endpoint, params=params, expect_json=False)
except requests.HTTPError:
continue
last_result = raw
if isinstance(raw, str) and raw.strip():
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
continue
last_result = parsed
if isinstance(parsed, dict):
if parsed.get("message") and not (
parsed.get("userId")
or parsed.get("id")
or parsed.get("user")
):
continue
return parsed
raise RuntimeError(f"Unexpected current user response: {last_result!r}")
def get_user_id(self) -> int:
"""Get the current user's ID (cached)."""
if self._user_id is None:
env_user_id = os.environ.get("SCANDIUM_USER_ID", "").strip()
if env_user_id:
self._user_id = int(env_user_id)
else:
env_user_email = os.environ.get("SCANDIUM_USER_EMAIL", "").strip()
if not env_user_email:
env_user_email = "[email protected]"
if env_user_email:
user = self.find_user_by_email(env_user_email)
if user and user.get("id"):
self._user_id = int(user["id"])
if self._user_id is None:
user = self.get_current_user()
self._user_id = (
user.get("userId")
or user.get("id")
or user.get("user", {}).get("userId")
or user.get("user", {}).get("id")
)
if self._user_id is None:
raise RuntimeError(
"Unable to resolve user id from API. Set SCANDIUM_USER_ID "
"or SCANDIUM_USER_EMAIL."
)
return self._user_id
# ==================== Resource Methods ====================
def list_resources(self) -> List[Dict]:
"""List all resources the user can access."""
return self._request("GET", "Resources/").get("resources", [])
def get_resource(self, resource_id: int) -> Dict:
"""Get a single resource by ID."""
return self._request("GET", f"Resources/{resource_id}")
def find_resource_by_name(self, name: str) -> Optional[Dict]:
"""Find a resource by name (partial match)."""
resources = self.list_resources()
for r in resources:
if name.lower() in r.get("name", "").lower():
return r
return None
# ==================== User Methods ====================
def list_users(self, **filters) -> List[Dict]:
"""List users with optional filters (email, firstName, lastName, etc.)."""
return self._request("GET", "Users/", params=filters).get("users", [])
def find_user_by_email(self, email: str) -> Optional[Dict]:
"""Find a user by exact email match."""
users = self.list_users(email=email)
for u in users:
if u.get("emailAddress", "").lower() == email.lower():
return u
return None
def get_user(self, user_id: int) -> Dict:
"""Get a single user by ID."""
return self._request("GET", f"Users/{user_id}")
# ==================== Schedule Methods ====================
def list_schedules(self) -> List[Dict]:
"""List all schedules."""
return self._request("GET", "Schedules/").get("schedules", [])
def get_schedule(self, schedule_id: int) -> Dict:
"""Get a single schedule by ID."""
return self._request("GET", f"Schedules/{schedule_id}")
def get_schedule_slots(
self,
schedule_id: int,
start_date: str,
end_date: str,
resource_id: int = None,
) -> Dict:
"""Get availability slots for a schedule."""
params = {"startDateTime": start_date, "endDateTime": end_date}
if resource_id:
params["resourceId"] = resource_id
return self._request("GET", f"Schedules/{schedule_id}/Slots", params=params)
# ==================== Reservation Methods ====================
def list_reservations(
self,
start_date: str = None,
end_date: str = None,
resource_id: int = None,
user_id: int = None,
) -> List[Dict]:
"""List reservations with optional filters."""
params = {}
if start_date:
params["startDateTime"] = start_date
if end_date:
params["endDateTime"] = end_date
if resource_id:
params["resourceId"] = resource_id
if user_id:
params["userId"] = user_id
return self._request("GET", "Reservations/", params=params).get(
"reservations", []
)
def get_reservation(self, reference_number: str) -> Dict:
"""Get a single reservation by reference number."""
return self._request("GET", f"Reservations/{reference_number}")
def create_reservation(
self,
resource_id: int,
start_datetime: str,
end_datetime: str,
title: str,
description: str = None,
user_id: int = None,
participants: List[int] = None,
invitees: List[int] = None,
custom_attributes: List[Dict] = None,
recurrence_rule: Dict = None,
) -> Dict:
"""Create a new reservation.
Args:
resource_id: ID of the resource to book
start_datetime: Start time in ISO8601 format with TZ (e.g., "2026-01-15T14:00:00+0000")
end_datetime: End time in ISO8601 format with TZ
title: Reservation title
description: Optional description
user_id: Owner user ID (defaults to current user)
participants: List of user IDs to add as participants
invitees: List of user IDs to invite
custom_attributes: List of {"attributeId": id, "attributeValue": value}
recurrence_rule: Recurrence configuration (see skill.md for format)
Returns:
API response with referenceNumber on success, errors on failure
Note:
Due to an API quirk, participants may not be added on initial creation.
This method automatically performs a follow-up update to ensure
participants are added.
"""
resolved_user_id = user_id or self.get_user_id()
data = {
"resourceId": resource_id,
"userId": resolved_user_id,
"startDateTime": start_datetime,
"endDateTime": end_datetime,
"title": title,
}
if description:
data["description"] = description
if participants:
data["participants"] = participants
if invitees:
data["invitees"] = invitees
if custom_attributes:
data["customAttributes"] = custom_attributes
if recurrence_rule:
data["recurrenceRule"] = recurrence_rule
result = self._request("POST", "Reservations/", json=data)
# Handle participants quirk: they often don't get added on creation
# Do a follow-up update to ensure they're added
if participants and result.get("referenceNumber"):
ref_num = result["referenceNumber"]
created = self.get_reservation(ref_num)
# Check if participants were actually added
created_participant_ids = [
int(p.get("userId")) for p in created.get("participants", [])
]
if set(participants) != set(created_participant_ids):
# Need to update to add participants
# Use startDate/endDate from the created reservation (API returns these keys)
self.update_reservation(
reference_number=ref_num,
resource_id=resource_id,
start_datetime=created.get("startDate"),
end_datetime=created.get("endDate"),
title=title,
update_scope="full",
user_id=resolved_user_id,
description=description,
participants=participants,
invitees=invitees,
custom_attributes=custom_attributes,
)
return result
def update_reservation(
self,
reference_number: str,
resource_id: int,
start_datetime: str,
end_datetime: str,
title: str,
update_scope: str = "full",
**kwargs,
) -> Dict:
"""Update an existing reservation.
Args:
reference_number: Reservation reference number
update_scope: "this", "full", or "future"
**kwargs: Same as create_reservation
"""
data = {
"resourceId": resource_id,
"userId": kwargs.get("user_id") or self.get_user_id(),
"startDateTime": start_datetime,
"endDateTime": end_datetime,
"title": title,
}
for key in [
"description",
"participants",
"invitees",
"custom_attributes",
"recurrence_rule",
]:
if key in kwargs and kwargs[key]:
# Convert snake_case to camelCase for API
api_key = key.replace("_", " ").title().replace(" ", "")
api_key = api_key[0].lower() + api_key[1:]
data[api_key] = kwargs[key]
url = f"{self.server}/Services/Reservations/{reference_number}"
r = requests.post(
url, headers=self.headers, params={"updateScope": update_scope}, json=data
)
r.raise_for_status()
return r.json()
def delete_reservation(
self, reference_number: str, update_scope: str = "this"
) -> Dict:
"""Delete a reservation.
Args:
reference_number: Reservation reference number
update_scope: "this", "full", or "future" for recurring
"""
url = f"{self.server}/Services/Reservations/{reference_number}"
r = requests.delete(
url, headers=self.headers, params={"updateScope": update_scope}
)
r.raise_for_status()
return r.json() if r.text else {}
# ==================== Attribute Methods ====================
def get_reservation_attributes(self) -> List[Dict]:
"""Get custom attributes for reservations (category 1)."""
return self._request("GET", "Attributes/Category/1").get("attributes", [])
def find_attribute_value(
self, attribute_id: int, search_value: str
) -> Optional[str]:
"""Find the closest matching value for a select-list attribute.
Args:
attribute_id: The attribute ID
search_value: Value to search for (case-insensitive partial match)
Returns:
The matching value from possibleValues, or None
"""
attrs = self.get_reservation_attributes()
for attr in attrs:
if attr.get("id") == attribute_id:
possible = attr.get("possibleValues", [])
# Try exact match first
for v in possible:
if v.lower() == search_value.lower():
return v
# Try partial match
for v in possible:
if search_value.lower() in v.lower():
return v
return None
# ==================== Utility Functions ====================
def ics_datetime_to_iso(ics_dt: str) -> str:
"""Convert ICS datetime to ISO8601 with UTC offset.
Args:
ics_dt: ICS format datetime (e.g., "20260112T130000Z")
Returns:
ISO8601 format with timezone (e.g., "2026-01-12T13:00:00+0000")
"""
# Remove the Z suffix if present
if ics_dt.endswith("Z"):
ics_dt = ics_dt[:-1]
tz_offset = "+0000"
else:
tz_offset = "+0000" # Assume UTC if no suffix
dt = datetime.strptime(ics_dt, "%Y%m%dT%H%M%S")
return dt.strftime(f"%Y-%m-%dT%H:%M:%S{tz_offset}")
def parse_rrule(rrule: str, start_dt: datetime) -> Dict:
"""Parse an ICS RRULE into Booked recurrenceRule format.
Args:
rrule: ICS RRULE string (e.g., "FREQ=WEEKLY;INTERVAL=4;UNTIL=20260711")
start_dt: Start datetime (used to determine weekday)
Returns:
Booked recurrenceRule dictionary
"""
parts = {}
for part in rrule.split(";"):
if "=" in part:
key, value = part.split("=", 1)
parts[key] = value
freq_map = {
"DAILY": "daily",
"WEEKLY": "weekly",
"MONTHLY": "monthly",
"YEARLY": "yearly",
}
result = {
"type": freq_map.get(parts.get("FREQ", ""), "none"),
"interval": int(parts.get("INTERVAL", 1)),
}
# For weekly, include the weekday from start date
# Python: 0=Monday, Booked: 0=Sunday, so add 1 and mod 7
if result["type"] == "weekly":
python_weekday = start_dt.weekday() # 0=Monday in Python
booked_weekday = (python_weekday + 1) % 7 # Convert to 0=Sunday
result["weekdays"] = [booked_weekday]
# Parse UNTIL date
if "UNTIL" in parts:
until = parts["UNTIL"]
if len(until) == 8: # YYYYMMDD format
until_dt = datetime.strptime(until, "%Y%m%d")
else:
until_dt = datetime.strptime(until[:15], "%Y%m%dT%H%M%S")
result["repeatTerminationDate"] = until_dt.strftime("%Y-%m-%dT00:00:00+0000")
return result
if __name__ == "__main__":
# Example usage
api = BookedAPI()
user = api.get_current_user()
print(f"Logged in as: {user['firstName']} {user['lastName']}")
resources = api.list_resources()
print(f"\nAvailable resources: {len(resources)}")
for r in resources[:5]:
print(f" - {r['resourceId']}: {r['name']}")
# Get this week's reservations
today = datetime.now().strftime("%Y-%m-%d")
next_week = (datetime.now() + timedelta(days=7)).strftime("%Y-%m-%d")
reservations = api.list_reservations(today, next_week)
print(f"\nReservations this week: {len(reservations)}")
for res in reservations[:5]:
print(f" - {res['referenceNumber']}: {res['title']}")
#!/usr/bin/env python3
import re
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from pathlib import Path
from typing import Dict, Iterable, List, Optional
from zoneinfo import ZoneInfo
UNIT_SEP = "\x1c"
CATEGORY_SEP = "\x1d"
TIME_PERIOD_SEP = "\x1e"
OPEN_FUTURE_DATE = date(2037, 1, 1)
FIELD_MAP = {
"a": "text",
"b": "link",
"c": "popup",
"d": "export",
"e": "start_time",
"f": "end_time",
"g": "id",
"h": "owner",
"i": "draw_border",
"j": "bg_color",
"k": "fg_color",
"l": "mail_to",
"m": "mail_cc",
"n": "mail_bcc",
"o": "mail_text",
"p": "reminder_times",
"q": "reminder_to",
"r": "category",
"s": "is_tentative",
"t": "subscriptions",
"u": "time_period",
"A": "repeat_start",
"B": "repeat_end",
"C": "period",
"D": "frequency",
"E": "month_week",
"F": "month_month",
"G": "exclusions",
"H": "skip_weekends",
}
DAY_NAMES = [None, "MO", "TU", "WE", "TH", "FR", "SA", "SU"]
@dataclass(frozen=True)
class CalciumEvent:
text: str
popup: Optional[str]
link: Optional[str]
categories: List[str]
start_time: Optional[int]
end_time: Optional[int]
time_period: Optional[str]
date: Optional[date]
repeat_start: Optional[date]
repeat_end: Optional[date]
period: Optional[str | List[int]]
frequency: Optional[int]
month_week: Optional[List[int]]
month_month: Optional[int]
exclusions: List[date]
skip_weekends: bool
@property
def is_repeating(self) -> bool:
return self.repeat_start is not None
@dataclass(frozen=True)
class VEventData:
event: Dict[str, str]
lines: List[str]
def _parse_date(value: str) -> Optional[date]:
value = value.strip()
if not value:
return None
try:
year, month, day = value.split("/")
return date(int(year), int(month), int(day))
except ValueError:
return None
def _parse_minutes(value: str) -> Optional[int]:
value = value.strip()
if not value:
return None
try:
return int(value)
except ValueError:
return None
def _parse_int_list(value: str) -> Optional[List[int]]:
if not value:
return None
try:
return [int(piece) for piece in value.split() if piece.strip()]
except ValueError:
return None
def _unescape(value: str) -> str:
return value.replace("\\n", "\n").replace("\r", "").strip()
def _load_time_periods(preferences_path: Path) -> Dict[str, List[str]]:
periods: Dict[str, List[str]] = {}
if not preferences_path.exists():
return periods
try:
lines = preferences_path.read_text(encoding="utf-8").splitlines()
except OSError:
return periods
for raw in lines:
parts = raw.split(UNIT_SEP)
if not parts:
continue
if parts[0] != "TimePeriods":
continue
values = parts[1:]
for idx in range(0, len(values) - 1, 2):
period_id = values[idx]
period_value = values[idx + 1]
if not period_id:
continue
fields = (period_value or "").split(TIME_PERIOD_SEP)
periods[period_id] = fields
return periods
def _resolve_time_period(
time_period: Optional[str],
time_periods: Dict[str, List[str]],
) -> tuple[Optional[int], Optional[int]]:
if not time_period:
return None, None
fields = time_periods.get(time_period)
if not fields or len(fields) < 3:
return None, None
try:
start = int(fields[1])
end = int(fields[2])
except ValueError:
return None, None
return start, end
def _parse_categories(value: str) -> List[str]:
if not value:
return []
return [cat for cat in value.split(CATEGORY_SEP) if cat]
def _parse_exclusions(value: str) -> List[date]:
if not value:
return []
dates: List[date] = []
for raw in value.split():
parsed = _parse_date(raw)
if parsed:
dates.append(parsed)
return dates
def parse_calcium_events(
events_path: Path,
category_filter: Optional[str],
exclude_categories: Optional[Iterable[str]],
preferences_path: Optional[Path],
debug: bool,
) -> List[CalciumEvent]:
time_periods = _load_time_periods(preferences_path) if preferences_path else {}
events: List[CalciumEvent] = []
try:
raw_text = events_path.read_text(encoding="utf-8", errors="replace")
except OSError as exc:
raise RuntimeError(f"Failed to read {events_path}: {exc}") from exc
lines = raw_text.split("\n")
exclude_set = {value for value in (exclude_categories or []) if value}
for raw in lines:
if not raw:
continue
parts = raw.split(UNIT_SEP)
if not parts:
continue
key = parts[0]
if key == "LastID":
continue
raw_fields = {}
values = parts[1:]
for idx in range(0, len(values) - 1, 2):
field = values[idx]
value = _unescape(values[idx + 1])
if not field:
continue
raw_fields[field] = value
text = raw_fields.get("a", "").strip()
popup = raw_fields.get("c") or None
link = raw_fields.get("b") or None
categories = _parse_categories(raw_fields.get("r", ""))
if category_filter and category_filter not in categories:
continue
if exclude_set and any(category in exclude_set for category in categories):
continue
start_time = _parse_minutes(raw_fields.get("e", ""))
end_time = _parse_minutes(raw_fields.get("f", ""))
time_period = raw_fields.get("u") or None
if (start_time is None or end_time is None) and time_period:
resolved_start, resolved_end = _resolve_time_period(time_period, time_periods)
start_time = start_time if start_time is not None else resolved_start
end_time = end_time if end_time is not None else resolved_end
period_raw = raw_fields.get("C")
period = None
if period_raw:
if re.fullmatch(r"[\d\s]+", period_raw):
period = _parse_int_list(period_raw)
else:
period = period_raw
month_week = _parse_int_list(raw_fields.get("E", ""))
month_month = _parse_minutes(raw_fields.get("F", ""))
frequency = _parse_minutes(raw_fields.get("D", ""))
exclusions = _parse_exclusions(raw_fields.get("G", ""))
skip_weekends = raw_fields.get("H", "").strip() == "1"
event_date = _parse_date(key) if key != "Repeat" else None
repeat_start = _parse_date(raw_fields.get("A", ""))
repeat_end = _parse_date(raw_fields.get("B", ""))
events.append(
CalciumEvent(
text=text,
popup=popup,
link=link,
categories=categories,
start_time=start_time,
end_time=end_time,
time_period=time_period,
date=event_date,
repeat_start=repeat_start,
repeat_end=repeat_end,
period=period,
frequency=frequency,
month_week=month_week,
month_month=month_month,
exclusions=exclusions,
skip_weekends=skip_weekends,
)
)
if debug:
print(f"Parsed {len(events)} events from {events_path}")
return events
def _format_local_datetime(dt: datetime, use_utc: bool) -> str:
if use_utc:
utc = dt.astimezone(ZoneInfo("UTC"))
return utc.strftime("%Y%m%dT%H%M%SZ")
return dt.strftime("%Y%m%dT%H%M%S")
def _minutes_to_time(minutes: int) -> time:
return time(minutes // 60, minutes % 60)
def _build_rrule(event: CalciumEvent) -> Optional[str]:
if not event.is_repeating:
return None
start_date = event.repeat_start
if not start_date:
return None
freq = None
interval = event.frequency or 1
by_day = None
by_month_day = None
by_set_pos = None
period = event.period
if period:
if isinstance(period, list):
freq = "WEEKLY"
by_day = "BYDAY=" + ",".join(DAY_NAMES[day] for day in period if 0 < day < 8)
else:
freq = {
"day": "DAILY",
"dayBanner": "DAILY",
"week": "WEEKLY",
"month": "MONTHLY",
"year": "YEARLY",
}.get(period)
if freq == "MONTHLY":
by_month_day = f"BYMONTHDAY={start_date.day}"
if event.month_week:
freq = "MONTHLY"
interval = event.month_month or 1
dow = start_date.isoweekday()
by_day = "BYDAY=" + ",".join(DAY_NAMES[dow] for _ in event.month_week)
for week_num in event.month_week:
if week_num == 5:
by_set_pos = "BYSETPOS=-1"
elif week_num == 6:
by_set_pos = "BYSETPOS=5"
else:
by_set_pos = f"BYSETPOS={week_num}"
if not freq:
return None
parts = [f"FREQ={freq}", f"INTERVAL={interval}"]
if by_month_day:
parts.append(by_month_day)
if by_day:
parts.append(by_day)
if event.repeat_end and event.repeat_end < OPEN_FUTURE_DATE:
parts.append(f"UNTIL={event.repeat_end.strftime('%Y%m%d')}")
if by_set_pos:
parts.append(by_set_pos)
return ";".join(parts)
def _format_rrule_with_until(rrule: str, until_date: date) -> str:
parts: Dict[str, str] = {}
for part in rrule.split(";"):
if "=" in part:
key, value = part.split("=", 1)
parts[key] = value
parts["UNTIL"] = until_date.strftime("%Y%m%d")
ordered = ["FREQ", "INTERVAL", "BYDAY", "UNTIL"]
items = []
for key in ordered:
if key in parts:
items.append(f"{key}={parts[key]}")
for key, value in parts.items():
if key not in ordered:
items.append(f"{key}={value}")
return ";".join(items)
def _build_exdates(event: CalciumEvent) -> Optional[str]:
if not event.exclusions:
return None
return ",".join(exclusion.strftime("%Y%m%d") for exclusion in event.exclusions)
def _parse_rrule_simple(rrule: str, start_date: date) -> Dict[str, Optional[object]]:
parts: Dict[str, str] = {}
for part in rrule.split(";"):
if "=" in part:
key, value = part.split("=", 1)
parts[key] = value
freq_map = {
"DAILY": "daily",
"WEEKLY": "weekly",
"MONTHLY": "monthly",
"YEARLY": "yearly",
}
rule = {
"type": freq_map.get(parts.get("FREQ", ""), "none"),
"interval": int(parts.get("INTERVAL", 1)),
"weekdays": None,
"until": None,
}
if rule["type"] == "weekly":
booked_weekday = (start_date.weekday() + 1) % 7
rule["weekdays"] = [booked_weekday]
until_value = parts.get("UNTIL")
if until_value:
try:
if len(until_value) >= 8:
rule["until"] = datetime.strptime(until_value[:8], "%Y%m%d").date()
except ValueError:
rule["until"] = None
return rule
def _recurrence_occurs_on(entry_date: date, start_date: date, rule: Dict[str, Optional[object]]) -> bool:
rtype = rule.get("type")
if not rtype or rtype == "none":
return False
if entry_date < start_date:
return False
until = rule.get("until")
if until and entry_date > until:
return False
interval = int(rule.get("interval") or 1)
if rtype == "daily":
diff_days = (entry_date - start_date).days
return diff_days % interval == 0
if rtype == "weekly":
weekdays = rule.get("weekdays") or []
booked_weekday = (entry_date.weekday() + 1) % 7
if booked_weekday not in weekdays:
return False
diff_days = (entry_date - start_date).days
return diff_days % (interval * 7) == 0
return False
def _build_description(event: CalciumEvent) -> Optional[str]:
return event.popup or event.link
def _build_dtstart_dtend(
event: CalciumEvent,
tz: ZoneInfo,
use_utc: bool,
) -> Optional[tuple[str, str]]:
start_date = event.repeat_start or event.date
if not start_date:
return None
if event.start_time is None:
if use_utc:
return None
start_value = start_date.strftime("%Y%m%d")
end_value = (start_date + timedelta(days=1)).strftime("%Y%m%d")
return start_value, end_value
start_dt = datetime.combine(start_date, _minutes_to_time(event.start_time), tzinfo=tz)
if event.end_time is None:
end_dt = start_dt
else:
end_date = start_date
if event.end_time < event.start_time:
end_date = start_date + timedelta(days=1)
end_dt = datetime.combine(end_date, _minutes_to_time(event.end_time), tzinfo=tz)
return _format_local_datetime(start_dt, use_utc), _format_local_datetime(end_dt, use_utc)
def _resolve_category_value(event: CalciumEvent, category_filter: Optional[str]) -> Optional[str]:
if category_filter:
return category_filter
return event.categories[0] if event.categories else None
def _base_vevent_fields(event: CalciumEvent, category_value: str) -> Dict[str, str]:
vevent = {
"SUMMARY": event.text,
"CATEGORIES": category_value,
}
description = _build_description(event)
if description:
vevent["DESCRIPTION"] = description
return vevent
def _event_duration(event: CalciumEvent, tz: ZoneInfo) -> Optional[tuple[time, timedelta]]:
start_date = event.repeat_start or event.date
if not start_date or event.start_time is None:
return None
start_time = _minutes_to_time(event.start_time)
start_dt = datetime.combine(start_date, start_time, tzinfo=tz)
if event.end_time is None:
end_dt = start_dt
else:
end_date = start_date
if event.end_time < event.start_time:
end_date = start_date + timedelta(days=1)
end_dt = datetime.combine(end_date, _minutes_to_time(event.end_time), tzinfo=tz)
return start_time, end_dt - start_dt
def _build_vevent_for_start(
event: CalciumEvent,
tz: ZoneInfo,
category_value: str,
use_utc: bool,
start_date: date,
rrule: Optional[str],
until_date: Optional[date],
include_exdates: bool,
) -> Optional[Dict[str, str]]:
duration_info = _event_duration(event, tz)
if not duration_info:
return None
start_time, duration = duration_info
start_dt = datetime.combine(start_date, start_time, tzinfo=tz)
end_dt = start_dt + duration
vevent = _base_vevent_fields(event, category_value)
vevent["DTSTART"] = _format_local_datetime(start_dt, use_utc)
vevent["DTEND"] = _format_local_datetime(end_dt, use_utc)
if rrule:
if until_date:
if until_date < start_date:
return None
if until_date != start_date:
vevent["RRULE"] = _format_rrule_with_until(rrule, until_date)
else:
vevent["RRULE"] = rrule
if include_exdates:
exdates = _build_exdates(event)
if exdates:
vevent["EXDATE"] = exdates
return vevent
def _split_recurring_event(
event: CalciumEvent,
tz: ZoneInfo,
category_value: str,
use_utc: bool,
debug: bool,
) -> List[Dict[str, str]]:
start_date = event.repeat_start or event.date
if not start_date:
return []
rrule = _build_rrule(event)
if not rrule:
vevent = _build_vevent_for_start(
event,
tz,
category_value,
use_utc,
start_date,
rrule=None,
until_date=None,
include_exdates=False,
)
return [vevent] if vevent else []
rule = _parse_rrule_simple(rrule, start_date)
if rule.get("type") not in {"daily", "weekly"}:
if debug:
print(f"Leaving EXDATE untouched for non-daily/weekly recurrence: {event.text}")
vevent = _build_vevent_for_start(
event,
tz,
category_value,
use_utc,
start_date,
rrule=rrule,
until_date=event.repeat_end if event.repeat_end and event.repeat_end < OPEN_FUTURE_DATE else None,
include_exdates=True,
)
return [vevent] if vevent else []
interval = int(rule.get("interval") or 1)
step_days = interval if rule.get("type") == "daily" else interval * 7
exclusions = sorted({exdate for exdate in event.exclusions})
current_start = start_date
last_allowed = event.repeat_end if event.repeat_end and event.repeat_end < OPEN_FUTURE_DATE else None
segments: List[tuple[date, Optional[date]]] = []
for exdate in exclusions:
if last_allowed and exdate > last_allowed:
continue
if exdate < current_start:
continue
if not _recurrence_occurs_on(exdate, start_date, rule):
continue
prev_date = exdate - timedelta(days=step_days)
if prev_date >= current_start:
segment_end = prev_date
if last_allowed and segment_end > last_allowed:
segment_end = last_allowed
if segment_end >= current_start:
segments.append((current_start, segment_end))
next_date = exdate + timedelta(days=step_days)
if last_allowed and next_date > last_allowed:
current_start = None
break
current_start = next_date
if current_start:
if not last_allowed or current_start <= last_allowed:
segments.append((current_start, last_allowed))
vevents: List[Dict[str, str]] = []
for segment_start, segment_until in segments:
vevent = _build_vevent_for_start(
event,
tz,
category_value,
use_utc,
segment_start,
rrule=rrule,
until_date=segment_until,
include_exdates=False,
)
if vevent:
vevents.append(vevent)
return vevents
def build_vevent(
event: CalciumEvent,
tz: ZoneInfo,
category_value: str,
use_utc: bool,
debug: bool,
) -> List[Dict[str, str]]:
start_date = event.repeat_start or event.date
if not start_date:
return []
if event.exclusions and event.is_repeating:
return _split_recurring_event(event, tz, category_value, use_utc, debug)
vevent = _build_vevent_for_start(
event,
tz,
category_value,
use_utc,
start_date,
rrule=_build_rrule(event),
until_date=event.repeat_end if event.repeat_end and event.repeat_end < OPEN_FUTURE_DATE else None,
include_exdates=False,
)
return [vevent] if vevent else []
def build_vevents(
events_path: Path,
category_filter: Optional[str],
tz_name: str,
preferences_path: Optional[Path] = None,
exclude_categories: Optional[Iterable[str]] = None,
debug: bool = False,
use_utc: bool = True,
) -> List[VEventData]:
tz = ZoneInfo(tz_name)
events = parse_calcium_events(
events_path,
category_filter,
exclude_categories,
preferences_path,
debug,
)
vevents: List[VEventData] = []
for event in events:
category_value = _resolve_category_value(event, category_filter)
if not category_value:
if debug:
print(f"Skipping event with no category: {event.text}")
continue
for vevent in build_vevent(event, tz, category_value, use_utc=use_utc, debug=debug):
if not vevent:
if debug:
print(f"Skipping event with missing datetime: {event.text}")
continue
lines = _build_vevent_lines(vevent)
vevents.append(VEventData(event=vevent, lines=lines))
return vevents
def _build_vevent_lines(event: Dict[str, str]) -> List[str]:
lines = ["BEGIN:VEVENT"]
for key in (
"DTSTART",
"DTEND",
"DTSTAMP",
"UID",
"SUMMARY",
"DESCRIPTION",
"CATEGORIES",
"RRULE",
"EXDATE",
):
value = event.get(key)
if value:
lines.append(f"{key}:{value}")
lines.append("END:VEVENT")
return lines
#!/usr/bin/env python3
import argparse
import re
import sys
from datetime import datetime, time, timedelta
from pathlib import Path
from typing import Optional
from zoneinfo import ZoneInfo
try:
from prompt_toolkit import prompt as pt_prompt
HAS_PROMPT_TOOLKIT = True
except Exception:
HAS_PROMPT_TOOLKIT = False
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402
native_root = Path(__file__).resolve().parent
sys.path.insert(0, str(native_root))
from calcium_events import build_vevents # noqa: E402
DEFAULT_RESOURCE = "P1 Prisma"
DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = {
"P1 Prisma": "data/3T_Prisma_P1.Events",
"P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events",
"4T Varian": "data/4T_Varian_Inova.Events",
}
DEFAULT_TZ = "America/New_York"
PI_ATTRIBUTE_ID = 1
SCANNED_INITIALS_ATTRIBUTE_ID = 2
PATIENT_SOURCE_ATTRIBUTE_ID = 11
BODY_PART_ATTRIBUTE_ID = 10
CONTRAST_ATTRIBUTE_ID = 3
def local_datetime_from_ics(value: str, tz: ZoneInfo) -> datetime:
if not value:
return datetime.min.replace(tzinfo=tz)
if "T" not in value:
date_value = datetime.strptime(value[:8], "%Y%m%d").date()
return datetime.combine(date_value, time(0, 0), tzinfo=tz)
iso = ics_datetime_to_iso(value)
dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z")
return dt.astimezone(tz)
def format_ics_like(template: str, dt: datetime) -> str:
if "T" not in template:
return dt.strftime("%Y%m%d")
if template.endswith("Z"):
utc = dt.astimezone(ZoneInfo("UTC"))
return utc.strftime("%Y%m%dT%H%M%SZ")
return dt.strftime("%Y%m%dT%H%M%S")
def parse_termination_date(value: str | None, tz: ZoneInfo) -> Optional[datetime.date]:
if not value:
return None
if value.endswith("Z"):
value = value[:-1] + "+0000"
if value[-3] == ":":
value = value[:-3] + value[-2:]
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date()
def parse_exdate_dates(value: str) -> set[datetime.date]:
if not value:
return set()
dates: set[datetime.date] = set()
for part in value.split(","):
chunk = part.strip()
if len(chunk) >= 8:
try:
dates.add(datetime.strptime(chunk[:8], "%Y%m%d").date())
except ValueError:
continue
return dates
def update_rrule_until(rrule: str, until_date: datetime.date) -> str:
if not rrule:
return rrule
parts = []
updated = False
for piece in rrule.split(";"):
if piece.startswith("UNTIL="):
parts.append(f"UNTIL={until_date.strftime('%Y%m%d')}")
updated = True
else:
parts.append(piece)
if not updated:
parts.append(f"UNTIL={until_date.strftime('%Y%m%d')}")
return ";".join(parts)
def recurrence_occurs_on(
target_date: datetime.date,
start_date: datetime.date,
rule: dict,
) -> bool:
if target_date < start_date:
return False
interval = int(rule.get("interval", 1))
rtype = rule.get("type")
if rtype == "daily":
diff_days = (target_date - start_date).days
return diff_days % interval == 0
if rtype == "weekly":
weekdays = rule.get("weekdays") or []
booked_weekday = (target_date.weekday() + 1) % 7
if booked_weekday not in weekdays:
return False
diff_days = (target_date - start_date).days
return diff_days % (interval * 7) == 0
if rtype == "monthly":
if target_date.day != start_date.day:
return False
months = (target_date.year - start_date.year) * 12 + (
target_date.month - start_date.month
)
return months % interval == 0
if rtype == "yearly":
if (target_date.month, target_date.day) != (start_date.month, start_date.day):
return False
return (target_date.year - start_date.year) % interval == 0
return False
def expand_exdates(
events: list[dict],
tz: ZoneInfo,
debug: bool,
) -> list[dict]:
expanded: list[dict] = []
for event in events:
rrule = event.get("RRULE") or ""
exdates = parse_exdate_dates(event.get("EXDATE", ""))
if not rrule or not exdates:
event.pop("EXDATE", None)
expanded.append(event)
continue
start_raw = event.get("DTSTART", "")
end_raw = event.get("DTEND", "")
start_dt = local_datetime_from_ics(start_raw, tz)
end_dt = local_datetime_from_ics(end_raw, tz)
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None))
until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz)
current_start = start_dt
duration = end_dt - start_dt
for exdate in sorted(exdates):
if until_date and exdate > until_date:
continue
if not recurrence_occurs_on(exdate, start_dt.date(), rule):
continue
segment_until = exdate - timedelta(days=1)
if segment_until >= current_start.date():
segment = dict(event)
segment["DTSTART"] = format_ics_like(start_raw, current_start)
segment["DTEND"] = format_ics_like(end_raw, current_start + duration)
segment["RRULE"] = update_rrule_until(rrule, segment_until)
segment.pop("EXDATE", None)
expanded.append(segment)
next_dt = next_occurrence(
start_dt,
rule,
datetime.combine(exdate + timedelta(days=1), time.min, tzinfo=tz),
)
if not next_dt:
current_start = None
break
current_start = next_dt
if current_start and (not until_date or current_start.date() <= until_date):
segment = dict(event)
segment["DTSTART"] = format_ics_like(start_raw, current_start)
segment["DTEND"] = format_ics_like(end_raw, current_start + duration)
if until_date:
segment["RRULE"] = update_rrule_until(rrule, until_date)
segment.pop("EXDATE", None)
expanded.append(segment)
elif debug:
print(f"Dropped event after EXDATE expansion: {event.get('SUMMARY', '').strip()}")
return expanded
def add_months(date_value: datetime.date, months: int) -> datetime.date:
year = date_value.year + (date_value.month - 1 + months) // 12
month = (date_value.month - 1 + months) % 12 + 1
day = date_value.day
while True:
try:
return datetime(year, month, day).date()
except ValueError:
day -= 1
def next_occurrence(start_dt: datetime, rule: dict, now_dt: datetime) -> Optional[datetime]:
interval = int(rule.get("interval", 1))
occurrence = start_dt
if rule.get("type") == "daily":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
steps = delta_days // interval
occurrence = occurrence + timedelta(days=steps * interval)
if occurrence < now_dt:
occurrence += timedelta(days=interval)
elif rule.get("type") == "weekly":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
weeks = delta_days // 7
steps = (weeks // interval) * interval
occurrence = occurrence + timedelta(days=steps * 7)
if occurrence < now_dt:
occurrence += timedelta(days=interval * 7)
elif rule.get("type") == "monthly":
if now_dt > occurrence:
months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month)
steps = (months // interval) * interval
occurrence_date = add_months(occurrence.date(), steps)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
if occurrence < now_dt:
occurrence_date = add_months(occurrence.date(), interval)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
elif rule.get("type") == "yearly":
if now_dt > occurrence:
years = now_dt.year - occurrence.year
steps = (years // interval) * interval
try:
occurrence = occurrence.replace(year=occurrence.year + steps)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + steps, day=28)
if occurrence < now_dt:
try:
occurrence = occurrence.replace(year=occurrence.year + interval)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + interval, day=28)
else:
return None
return occurrence
def adjust_event_to_today(event: dict, tz: ZoneInfo, today_date: datetime.date) -> Optional[dict]:
start_raw = event.get("DTSTART", "")
end_raw = event.get("DTEND", "")
if not start_raw or not end_raw:
return None
start_dt = local_datetime_from_ics(start_raw, tz)
end_dt = local_datetime_from_ics(end_raw, tz)
rrule = event.get("RRULE")
if not rrule:
if start_dt.date() < today_date:
return None
return event
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None))
until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz)
if until_date and today_date > until_date:
return None
if start_dt.date() >= today_date:
return event
today_dt = datetime.combine(today_date, time.min, tzinfo=tz)
occurrence = next_occurrence(start_dt, rule, today_dt)
if not occurrence:
return None
if until_date and occurrence.date() > until_date:
return None
duration = end_dt - start_dt
new_start = occurrence
new_end = new_start + duration
updated = dict(event)
updated["DTSTART"] = format_ics_like(start_raw, new_start)
updated["DTEND"] = format_ics_like(end_raw, new_end)
return updated
def extract_scanned_initials(description: str) -> Optional[str]:
first_line = description.splitlines()[0].strip() if description else ""
if not first_line:
return None
if first_line in {"Clinical - IP", "Clinical IP"}:
return None
parts = first_line.split()
if not parts:
return None
candidate = parts[-1]
if len(candidate) in (2, 3) and candidate.isalpha() and candidate.isupper():
return candidate
if "-" in candidate:
tail = candidate.rsplit("-", 1)[-1]
if len(tail) in (2, 3) and tail.isalpha() and tail.isupper():
return tail
return None
def parse_clinical_prefix(description: str) -> tuple[Optional[str], Optional[str], Optional[str], str]:
lines = description.splitlines() if description else []
if not lines:
return None, None, None, ""
first = lines[0]
match = re.match(
r"^\s*(?P<pat>[A-Za-z0-9]+)\s*-\s*(?P<body>.+?)\s*-\s*(?P<spi>[A-Za-z]{2,3})(?P<rest>.*)$",
first,
)
if not match:
return None, None, None, description
pat = match.group("pat").strip()
body = match.group("body").strip()
spi = match.group("spi").strip()
rest = match.group("rest").strip().lstrip("-").strip()
new_lines: list[str] = []
if rest:
new_lines.append(rest)
new_lines.extend(lines[1:])
return pat, body, spi, "\n".join(new_lines).strip()
def input_with_default(label: str, current: str) -> str:
prompt = f"{label}: "
if HAS_PROMPT_TOOLKIT:
response = pt_prompt(prompt, default=current or "").strip()
else:
prompt = f"{label} [{current}]: " if current else f"{label}: "
response = input(prompt).strip()
if response == "":
return current
if response == "-":
return ""
return response
def contains_spi_blocker(text: str) -> bool:
if not text:
return False
lowered = text.lower()
if "acr qa" in lowered:
return True
if "hd cleanup" in lowered or "hd clean up" in lowered:
return True
return False
def has_contrast_indicator(text: str) -> bool:
if not text:
return False
lowered = text.lower()
if "no contrast" in lowered:
return False
if "non contr" in lowered:
return False
if "contrast" in lowered:
return True
patterns = [
r"w\s*/\s*o\s*&\s*w",
r"w\s*&\s*w\s*/\s*o",
r"w\s*/\s*o\s*\+\s*w",
r"w\s*\+\s*w\s*/\s*o",
r"w\s*/\s*wo",
r"wo\s*/\s*w",
r"w\s*-\s*wo",
r"wo\s*-\s*w",
]
for pattern in patterns:
if re.search(pattern, lowered):
return True
return False
def main() -> int:
parser = argparse.ArgumentParser(
description="Import Clinical events from a Calcium .Events file"
)
parser.add_argument(
"input",
nargs="?",
help="Input Calcium .Events file (defaults based on --resource)",
)
parser.add_argument(
"--preferences",
help="Optional Calcium .Preferences file to resolve time periods",
)
parser.add_argument(
"--resource",
default=DEFAULT_RESOURCE,
help="Resource name (default: P1 Prisma)",
)
parser.add_argument(
"--timezone",
default=DEFAULT_TZ,
help="Timezone for Calcium events (default: America/New_York)",
)
parser.add_argument(
"--yes",
action="store_true",
help="Create all events without prompting",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be created without creating reservations",
)
parser.add_argument(
"--list",
action="store_true",
help="List events without creating reservations",
)
args = parser.parse_args()
input_path_value = args.input
if not input_path_value:
input_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource)
if not input_path_value:
raise SystemExit(
f"No default Calcium .Events for resource '{args.resource}'. "
"Provide an input file."
)
args.input = input_path_value
events_path = Path(args.input)
prefs_path = Path(args.preferences) if args.preferences else None
tz = ZoneInfo(args.timezone)
today_date = datetime.now(tz).date()
vevents = build_vevents(
events_path,
category_filter="Clinical",
tz_name=args.timezone,
preferences_path=prefs_path,
use_utc=True,
)
events = expand_exdates([item.event for item in vevents], tz, args.debug)
adjusted_events = []
for event in events:
adjusted = adjust_event_to_today(event, tz, today_date)
if not adjusted:
continue
adjusted_events.append(adjusted)
events = adjusted_events
if not events:
print("No matching clinical events found.")
return 0
if args.list or args.dry_run:
for idx, event in enumerate(events, start=1):
start_local = local_datetime_from_ics(event.get("DTSTART", ""), tz)
end_local = local_datetime_from_ics(event.get("DTEND", ""), tz)
summary = event.get("SUMMARY", "").strip()
print(
f"{idx}. {start_local.strftime('%Y-%m-%d %H:%M')} -> "
f"{end_local.strftime('%H:%M')} | {summary}"
)
return 0
api = BookedAPI()
resource = api.find_resource_by_name(args.resource)
if not resource:
print(f"Resource '{args.resource}' not found.")
return 1
pi_value = api.find_attribute_value(PI_ATTRIBUTE_ID, "Clinical") or "Clinical"
if not args.yes:
response = input("Create these clinical events? [Y]es / [n]o: ").strip().lower()
if response in {"n", "no"}:
print("Aborted.")
return 0
for idx, event in enumerate(events, start=1):
description = event.get("SUMMARY", "")
pat, body, spi, description_clean = parse_clinical_prefix(description)
if spi and contains_spi_blocker(description):
spi = None
if not spi:
spi = extract_scanned_initials(description)
if not args.yes:
print(f"\nEvent {idx}:")
pat = input_with_default("PAT (Patient Source)", pat or "")
body = input_with_default("Body part", body or "")
spi = input_with_default("SPI (Scanned Person Initials)", spi or "")
description_clean = input_with_default("SUM (Description)", description_clean or "")
custom_attributes = [
{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": pi_value}
]
if pat:
custom_attributes.append(
{
"attributeId": PATIENT_SOURCE_ATTRIBUTE_ID,
"attributeValue": pat,
}
)
if body:
custom_attributes.append(
{
"attributeId": BODY_PART_ATTRIBUTE_ID,
"attributeValue": body,
}
)
if spi:
custom_attributes.append(
{
"attributeId": SCANNED_INITIALS_ATTRIBUTE_ID,
"attributeValue": spi,
}
)
if has_contrast_indicator(description):
custom_attributes.append(
{
"attributeId": CONTRAST_ATTRIBUTE_ID,
"attributeValue": "Yes",
}
)
result = api.create_reservation(
resource_id=resource["resourceId"],
start_datetime=ics_datetime_to_iso(event.get("DTSTART", "")),
end_datetime=ics_datetime_to_iso(event.get("DTEND", "")),
title="",
description=description_clean,
custom_attributes=custom_attributes if custom_attributes else None,
recurrence_rule=parse_rrule(
event.get("RRULE", ""),
local_datetime_from_ics(event.get("DTSTART", ""), tz).replace(tzinfo=None),
)
if event.get("RRULE")
else None,
)
reference = result.get("referenceNumber")
if not reference:
errors = result.get("errors", result)
print(f"Create failed for item {idx}: {errors}")
else:
print(f"Created {reference}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
import argparse
import json
import os
import sys
import urllib.request
from dataclasses import replace
from datetime import datetime, time, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from zoneinfo import ZoneInfo
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402
native_root = Path(__file__).resolve().parent
sys.path.insert(0, str(native_root))
from calcium_events import CalciumEvent, build_vevent, parse_calcium_events # noqa: E402
DEFAULT_RESOURCE = "Coverage"
DEFAULT_INPUT = "data/Technologist.Events"
DEFAULT_TZ = "America/New_York"
COVERAGE_ATTRIBUTE_ID = 7
COVERAGE_CATEGORIES = {
"MD Coverage": "MD",
"NP Coverage": "NP",
}
DEFAULT_START_MINUTES = 6 * 60
DEFAULT_END_MINUTES = 6 * 60 + 5
OPENAI_MODEL = "gpt-5.2"
def local_datetime_from_ics(value: str, tz: ZoneInfo) -> datetime:
if not value:
return datetime.min.replace(tzinfo=tz)
if "T" not in value:
date_value = datetime.strptime(value[:8], "%Y%m%d").date()
return datetime.combine(date_value, time(0, 0), tzinfo=tz)
iso = ics_datetime_to_iso(value)
dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z")
return dt.astimezone(tz)
def format_ics_like(template: str, dt: datetime) -> str:
if "T" not in template:
return dt.strftime("%Y%m%d")
if template.endswith("Z"):
utc = dt.astimezone(ZoneInfo("UTC"))
return utc.strftime("%Y%m%dT%H%M%SZ")
return dt.strftime("%Y%m%dT%H%M%S")
def parse_termination_date(value: str | None, tz: ZoneInfo) -> Optional[datetime.date]:
if not value:
return None
if value.endswith("Z"):
value = value[:-1] + "+0000"
if value[-3] == ":":
value = value[:-3] + value[-2:]
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date()
def add_months(date_value: datetime.date, months: int) -> datetime.date:
year = date_value.year + (date_value.month - 1 + months) // 12
month = (date_value.month - 1 + months) % 12 + 1
day = date_value.day
while True:
try:
return datetime(year, month, day).date()
except ValueError:
day -= 1
def next_occurrence(start_dt: datetime, rule: dict, now_dt: datetime) -> Optional[datetime]:
interval = int(rule.get("interval", 1))
occurrence = start_dt
if rule.get("type") == "daily":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
steps = delta_days // interval
occurrence = occurrence + timedelta(days=steps * interval)
if occurrence < now_dt:
occurrence += timedelta(days=interval)
elif rule.get("type") == "weekly":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
weeks = delta_days // 7
steps = (weeks // interval) * interval
occurrence = occurrence + timedelta(days=steps * 7)
if occurrence < now_dt:
occurrence += timedelta(days=interval * 7)
elif rule.get("type") == "monthly":
if now_dt > occurrence:
months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month)
steps = (months // interval) * interval
occurrence_date = add_months(occurrence.date(), steps)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
if occurrence < now_dt:
occurrence_date = add_months(occurrence.date(), interval)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
elif rule.get("type") == "yearly":
if now_dt > occurrence:
years = now_dt.year - occurrence.year
steps = (years // interval) * interval
try:
occurrence = occurrence.replace(year=occurrence.year + steps)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + steps, day=28)
if occurrence < now_dt:
try:
occurrence = occurrence.replace(year=occurrence.year + interval)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + interval, day=28)
else:
return None
return occurrence
def adjust_event_to_today(event: dict, tz: ZoneInfo, today_date: datetime.date) -> Optional[dict]:
start_raw = event.get("DTSTART", "")
end_raw = event.get("DTEND", "")
if not start_raw or not end_raw:
return None
start_dt = local_datetime_from_ics(start_raw, tz)
end_dt = local_datetime_from_ics(end_raw, tz)
rrule = event.get("RRULE")
if not rrule:
if start_dt.date() < today_date:
return None
return event
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None))
until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz)
if until_date and today_date > until_date:
return None
if start_dt.date() >= today_date:
return event
today_dt = datetime.combine(today_date, time.min, tzinfo=tz)
occurrence = next_occurrence(start_dt, rule, today_dt)
if not occurrence:
return None
if until_date and occurrence.date() > until_date:
return None
duration = end_dt - start_dt
new_start = occurrence
new_end = new_start + duration
updated = dict(event)
updated["DTSTART"] = format_ics_like(start_raw, new_start)
updated["DTEND"] = format_ics_like(end_raw, new_end)
return updated
def coverage_type_for_event(event: CalciumEvent) -> Optional[str]:
for category, coverage_type in COVERAGE_CATEGORIES.items():
if category in event.categories:
return coverage_type
return None
def normalize_coverage_event(event: CalciumEvent) -> CalciumEvent:
return replace(
event,
start_time=DEFAULT_START_MINUTES,
end_time=DEFAULT_END_MINUTES,
)
def build_items(
events: List[CalciumEvent],
tz: ZoneInfo,
today_date: datetime.date,
debug: bool,
) -> List[Tuple[dict, str]]:
items: List[Tuple[dict, str]] = []
for event in events:
coverage_type = coverage_type_for_event(event)
if not coverage_type:
continue
normalized = normalize_coverage_event(event)
category_value = "MD Coverage" if coverage_type == "MD" else "NP Coverage"
vevents = build_vevent(normalized, tz, category_value, use_utc=True, debug=debug)
for vevent in vevents:
adjusted = adjust_event_to_today(vevent, tz, today_date)
if not adjusted:
continue
items.append((adjusted, coverage_type))
return items
def format_list_entry(index: int, event: dict, coverage_type: str, tz: ZoneInfo) -> str:
start_local = local_datetime_from_ics(event.get("DTSTART", ""), tz)
end_local = local_datetime_from_ics(event.get("DTEND", ""), tz)
summary = event.get("SUMMARY", "").strip()
return (
f"{index}. {start_local.strftime('%Y-%m-%d %H:%M')} -> "
f"{end_local.strftime('%H:%M')} | {coverage_type} | {summary}"
)
def parse_time_value(value: Optional[str]) -> Optional[time]:
if not value:
return None
raw = value.strip()
if not raw:
return None
for fmt in ("%H:%M", "%H%M"):
try:
return datetime.strptime(raw, fmt).time()
except ValueError:
continue
return None
def normalize_name(value: str) -> str:
return " ".join(value.replace(",", " ").split()).strip().lower()
def build_user_maps(users: List[Dict]) -> tuple[Dict[str, int], Dict[str, int]]:
email_map: Dict[str, int] = {}
name_map: Dict[str, int] = {}
for user in users:
user_id = user.get("id")
if user_id is None:
continue
email = (user.get("emailAddress") or "").strip().lower()
if email:
email_map[email] = user_id
first = (user.get("firstName") or "").strip()
last = (user.get("lastName") or "").strip()
if first or last:
full = normalize_name(f"{first} {last}")
if full:
name_map[full] = user_id
if first and last:
alt = normalize_name(f"{last}, {first}")
if alt:
name_map[alt] = user_id
return email_map, name_map
def resolve_participants(
participants: List[str],
email_map: Dict[str, int],
name_map: Dict[str, int],
) -> Tuple[List[int], List[str]]:
resolved: List[int] = []
unresolved: List[str] = []
seen_ids = set()
for token in participants:
item = token.strip()
if not item:
continue
key = item.lower()
user_id = None
if "@" in key:
user_id = email_map.get(key)
if user_id is None:
user_id = name_map.get(normalize_name(item))
if user_id is None:
unresolved.append(item)
continue
if user_id in seen_ids:
continue
seen_ids.add(user_id)
resolved.append(user_id)
return resolved, unresolved
def call_openai(description: str) -> Dict:
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY is not set in the environment.")
system_msg = (
"Extract coverage details from the description. Return JSON only with keys: "
"start_time (HH:MM 24h), end_time (HH:MM 24h), participants (list of names/emails), "
"description_time_removed (string), description_participants_removed (string), "
"description_time_and_participants_removed (string). "
"Remove only what is clearly a time range for time_removed, only names/emails for "
"participants_removed, and both for time_and_participants_removed. Preserve all other notes. "
"If no time is present, set start_time/end_time to null and description_time_removed should "
"match the original. If nothing remains, return an empty string."
)
user_msg = f"Description:\\n{description}"
payload = {
"model": OPENAI_MODEL,
"temperature": 0,
"response_format": {"type": "json_object"},
"messages": [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
],
}
data = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=data,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
)
with urllib.request.urlopen(request, timeout=60) as response:
raw = response.read().decode("utf-8")
parsed = json.loads(raw)
choices = parsed.get("choices") or []
if not choices:
return {}
text = choices[0].get("message", {}).get("content", "")
return json.loads(text) if text else {}
def update_event_times(event: dict, tz: ZoneInfo, start: time, end: time) -> dict:
start_raw = event.get("DTSTART", "")
end_raw = event.get("DTEND", "")
start_dt = local_datetime_from_ics(start_raw, tz)
date_value = start_dt.date()
start_dt = datetime.combine(date_value, start, tzinfo=tz)
end_dt = datetime.combine(date_value, end, tzinfo=tz)
if end_dt <= start_dt:
end_dt = end_dt + timedelta(days=1)
updated = dict(event)
updated["DTSTART"] = format_ics_like(start_raw, start_dt)
updated["DTEND"] = format_ics_like(end_raw, end_dt)
return updated
def format_time_12h(value: time) -> str:
formatted = value.strftime("%I:%M %p")
return formatted.lstrip("0")
def main() -> int:
parser = argparse.ArgumentParser(
description="Import MD/NP Coverage events from a Calcium .Events file"
)
parser.add_argument(
"--timezone",
default=DEFAULT_TZ,
help="Timezone for Calcium events (default: America/New_York)",
)
parser.add_argument(
"--yes",
action="store_true",
help="Create all reservations without prompting",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be created without creating reservations",
)
parser.add_argument(
"--list",
action="store_true",
help="List events without creating reservations",
)
parser.add_argument(
"--debug",
action="store_true",
help="Log progress details to stderr",
)
args = parser.parse_args()
events_path = Path(DEFAULT_INPUT)
prefs_path = None
tz = ZoneInfo(args.timezone)
today_date = datetime.now(tz).date()
events = parse_calcium_events(
events_path,
category_filter=None,
exclude_categories=None,
preferences_path=prefs_path,
debug=args.debug,
)
items = build_items(events, tz, today_date, args.debug)
if not items:
print("No matching coverage events found.")
return 0
if args.list or args.dry_run:
for idx, (event, coverage_type) in enumerate(items, start=1):
print(format_list_entry(idx, event, coverage_type, tz))
return 0
api = BookedAPI()
resource = api.find_resource_by_name(DEFAULT_RESOURCE)
if not resource:
print(f"Resource '{DEFAULT_RESOURCE}' not found.")
return 1
users = api.list_users()
email_map, name_map = build_user_maps(users)
rejected_ids: List[str] = []
for idx, (event, coverage_type) in enumerate(items, start=1):
description = event.get("SUMMARY", "")
ai_data = {}
ai_error = None
try:
ai_data = call_openai(description)
except Exception as exc:
ai_error = str(exc)
ai_start = parse_time_value(ai_data.get("start_time"))
ai_end = parse_time_value(ai_data.get("end_time"))
ai_participants = ai_data.get("participants") or []
ai_desc_time = ai_data.get("description_time_removed")
ai_desc_participants = ai_data.get("description_participants_removed")
ai_desc_full = ai_data.get("description_time_and_participants_removed")
if not isinstance(ai_participants, list):
ai_participants = []
if ai_desc_time is not None:
ai_desc_time = str(ai_desc_time).strip()
if ai_desc_participants is not None:
ai_desc_participants = str(ai_desc_participants).strip()
if ai_desc_full is not None:
ai_desc_full = str(ai_desc_full).strip()
if ai_start and ai_end:
proposed_event = update_event_times(event, tz, ai_start, ai_end)
else:
proposed_event = event
resolved_ids, unresolved = resolve_participants(
[str(p) for p in ai_participants], email_map, name_map
)
ai_description = None
if ai_start and ai_end:
if resolved_ids:
ai_description = ai_desc_full or ai_desc_time or description
else:
ai_description = ai_desc_time or description
else:
if resolved_ids:
ai_description = ai_desc_participants or description
else:
ai_description = description
proposed_start = local_datetime_from_ics(proposed_event.get("DTSTART", ""), tz)
proposed_rrule = proposed_event.get("RRULE")
print("\n" + "=" * 72)
print(f"Event {idx} of {len(items)}")
print("-" * 72)
print(f"\033[1mCoverage Type:\033[0m {coverage_type}")
print(f"\033[1mDate:\033[0m {proposed_start.strftime('%Y-%m-%d')}")
if proposed_rrule:
print(f"\033[1mRecurrence:\033[0m {proposed_rrule}")
else:
print("\033[1mRecurrence:\033[0m (none)")
if ai_error:
print(f"\033[1mAI error:\033[0m {ai_error}")
if ai_start and ai_end:
print(
f"\033[1mAI times:\033[0m {format_time_12h(ai_start)} -> {format_time_12h(ai_end)}"
)
else:
print(
"\033[1mAI times:\033[0m (none found; will use 06:00 -> 06:05 if rejected)"
)
if ai_participants:
print(
f"\033[1mAI participants:\033[0m {', '.join(str(p) for p in ai_participants)}"
)
else:
print("\033[1mAI participants:\033[0m (none found)")
if resolved_ids:
print(
f"\033[1mResolved participants:\033[0m {', '.join(str(pid) for pid in resolved_ids)}"
)
if unresolved:
print(
f"\033[1mUnresolved participants:\033[0m {', '.join(unresolved)}"
)
print("\033[1mOriginal description:\033[0m")
print(description)
print("\033[1mAI description:\033[0m")
if ai_description is None:
print("(none)")
else:
print(ai_description)
response = "y" if args.yes else input("Accept AI selection? [Y]es / [n]o / [q]uit: ").strip().lower()
if response in {"q", "quit"}:
print("Stopping.")
break
use_ai = response not in {"n", "no"}
if use_ai:
final_event = proposed_event
participants = resolved_ids
final_description = ai_description if ai_description is not None else description
else:
final_event = update_event_times(
event,
tz,
time(DEFAULT_START_MINUTES // 60, DEFAULT_START_MINUTES % 60),
time(DEFAULT_END_MINUTES // 60, DEFAULT_END_MINUTES % 60),
)
participants = []
final_description = description
start_iso = ics_datetime_to_iso(final_event.get("DTSTART", ""))
end_iso = ics_datetime_to_iso(final_event.get("DTEND", ""))
result = api.create_reservation(
resource_id=resource["resourceId"],
start_datetime=start_iso,
end_datetime=end_iso,
title="",
description=final_description,
participants=participants if participants else None,
custom_attributes=[
{"attributeId": COVERAGE_ATTRIBUTE_ID, "attributeValue": coverage_type}
],
recurrence_rule=parse_rrule(
final_event.get("RRULE", ""),
local_datetime_from_ics(final_event.get("DTSTART", ""), tz).replace(tzinfo=None),
)
if final_event.get("RRULE")
else None,
)
reference = result.get("referenceNumber")
if not reference:
errors = result.get("errors", result)
print(f"Create failed for item {idx}: {errors}")
continue
print(f"Created {reference}")
if not use_ai:
rejected_ids.append(reference)
if rejected_ids:
print("\nRejected AI selections (created with defaults):")
print(", ".join(rejected_ids))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
import argparse
import sys
from datetime import datetime, time, timedelta
from pathlib import Path
from typing import List, Optional, Tuple
from zoneinfo import ZoneInfo
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from create_scandium_event import ( # noqa: E402
PI_ATTRIBUTE_ID,
SCANNED_INITIALS_ATTRIBUTE_ID,
build_description,
find_resource,
parse_start_datetime,
parse_summary,
)
from create_scandium_event_multiple import ( # noqa: E402
dedupe_events,
format_event_summary,
log_debug,
)
from native.calcium_events import build_vevents
DEFAULT_RESOURCE = "P1 Prisma"
DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = {
"P1 Prisma": "data/3T_Prisma_P1.Events",
"P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events",
"4T Varian": "data/4T_Varian_Inova.Events",
}
DEFAULT_TZ = "America/New_York"
EMAIL_DOMAIN_SWAPS = {
"mgb.org": "mclean.harvard.edu",
"mclean.harvard.edu": "mgb.org",
}
def swap_email_domain(email: str) -> Optional[str]:
if "@" not in email:
return None
local, domain = email.rsplit("@", 1)
swapped = EMAIL_DOMAIN_SWAPS.get(domain.lower())
if not swapped:
return None
return f"{local}@{swapped}"
def local_datetime_from_ics(value: str, tz: ZoneInfo) -> datetime:
if not value:
return datetime.min.replace(tzinfo=tz)
if "T" not in value:
date_value = datetime.strptime(value[:8], "%Y%m%d").date()
return datetime.combine(date_value, time(0, 0), tzinfo=tz)
iso = ics_datetime_to_iso(value)
dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z")
return dt.astimezone(tz)
def format_ics_like(template: str, dt: datetime) -> str:
if "T" not in template:
return dt.strftime("%Y%m%d")
if template.endswith("Z"):
utc = dt.astimezone(ZoneInfo("UTC"))
return utc.strftime("%Y%m%dT%H%M%SZ")
return dt.strftime("%Y%m%dT%H%M%S")
def parse_termination_date(value: str | None, tz: ZoneInfo) -> Optional[datetime.date]:
if not value:
return None
if value.endswith("Z"):
value = value[:-1] + "+0000"
if value[-3] == ":":
value = value[:-3] + value[-2:]
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date()
def add_months(date_value: datetime.date, months: int) -> datetime.date:
year = date_value.year + (date_value.month - 1 + months) // 12
month = (date_value.month - 1 + months) % 12 + 1
day = date_value.day
while True:
try:
return datetime(year, month, day).date()
except ValueError:
day -= 1
def next_occurrence(start_dt: datetime, rule: dict, now_dt: datetime) -> Optional[datetime]:
interval = int(rule.get("interval", 1))
occurrence = start_dt
if rule.get("type") == "daily":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
steps = delta_days // interval
occurrence = occurrence + timedelta(days=steps * interval)
if occurrence < now_dt:
occurrence += timedelta(days=interval)
elif rule.get("type") == "weekly":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
weeks = delta_days // 7
steps = (weeks // interval) * interval
occurrence = occurrence + timedelta(days=steps * 7)
if occurrence < now_dt:
occurrence += timedelta(days=interval * 7)
elif rule.get("type") == "monthly":
if now_dt > occurrence:
months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month)
steps = (months // interval) * interval
occurrence_date = add_months(occurrence.date(), steps)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
if occurrence < now_dt:
occurrence_date = add_months(occurrence.date(), interval)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
elif rule.get("type") == "yearly":
if now_dt > occurrence:
years = now_dt.year - occurrence.year
steps = (years // interval) * interval
try:
occurrence = occurrence.replace(year=occurrence.year + steps)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + steps, day=28)
if occurrence < now_dt:
try:
occurrence = occurrence.replace(year=occurrence.year + interval)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + interval, day=28)
else:
return None
return occurrence
def adjust_event_to_today(event: dict, tz: ZoneInfo, today_date: datetime.date) -> Optional[dict]:
start_raw = event.get("DTSTART", "")
end_raw = event.get("DTEND", "")
if not start_raw or not end_raw:
return None
start_dt = local_datetime_from_ics(start_raw, tz)
end_dt = local_datetime_from_ics(end_raw, tz)
rrule = event.get("RRULE")
if not rrule:
if start_dt.date() < today_date:
return None
return event
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None))
until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz)
if until_date and today_date > until_date:
return None
if start_dt.date() >= today_date:
return event
today_dt = datetime.combine(today_date, time.min, tzinfo=tz)
occurrence = next_occurrence(start_dt, rule, today_dt)
if not occurrence:
return None
if until_date and occurrence.date() > until_date:
return None
duration = end_dt - start_dt
new_start = occurrence
new_end = new_start + duration
updated = dict(event)
updated["DTSTART"] = format_ics_like(start_raw, new_start)
updated["DTEND"] = format_ics_like(end_raw, new_end)
return updated
def build_reservation_payload(
api: BookedAPI, event: dict, resource: dict
) -> Tuple[dict, dict]:
dtstart = event.get("DTSTART", "")
dtend = event.get("DTEND", "")
if not dtstart or not dtend:
raise ValueError("DTSTART and DTEND are required")
start_iso = ics_datetime_to_iso(dtstart)
end_iso = ics_datetime_to_iso(dtend)
start_dt = parse_start_datetime(dtstart)
summary = event.get("SUMMARY", "Untitled")
title, participant_info, confirmed_initials = parse_summary(summary)
categories = event.get("CATEGORIES", "")
if not categories:
raise ValueError("CATEGORIES is required to map Principal Investigator")
matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories)
if not matched_pi and categories.strip():
first_word = categories.strip().split()[0]
matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, first_word)
if not matched_pi:
raise ValueError(
f"CATEGORIES '{categories}' did not match a Principal Investigator value"
)
custom_attributes = [
{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": matched_pi}
]
if confirmed_initials:
custom_attributes.append(
{
"attributeId": SCANNED_INITIALS_ATTRIBUTE_ID,
"attributeValue": confirmed_initials,
}
)
participant_ids: List[int] = []
seen_user_ids: set[int] = set()
matched_emails: List[str] = []
unmatched_emails: List[str] = []
for _, email in participant_info:
user = api.find_user_by_email(email)
if not user:
swapped = swap_email_domain(email)
if swapped:
user = api.find_user_by_email(swapped)
if user:
user_id = int(user["id"])
if user_id not in seen_user_ids:
participant_ids.append(user_id)
seen_user_ids.add(user_id)
matched_emails.append(email)
else:
unmatched_emails.append(email)
description = build_description(event.get("DESCRIPTION", ""), unmatched_emails)
description = description if description else None
recurrence_rule = None
rrule = event.get("RRULE", "")
if rrule:
recurrence_rule = parse_rrule(rrule, start_dt)
payload = {
"resource_id": resource["resourceId"],
"start_datetime": start_iso,
"end_datetime": end_iso,
"title": title,
"description": description,
"participants": participant_ids if participant_ids else None,
"custom_attributes": custom_attributes,
"recurrence_rule": recurrence_rule,
}
preview = {
"resource_name": resource["name"],
"resource_id": resource["resourceId"],
"title": title,
"start_iso": start_iso,
"end_iso": end_iso,
"matched_pi": matched_pi,
"confirmed_initials": confirmed_initials,
"matched_emails": matched_emails,
"unmatched_emails": unmatched_emails,
"recurrence_rule": recurrence_rule,
}
return payload, preview
def print_dry_run(preview: dict) -> None:
print("Dry run: reservation not created")
print(f"Resource: {preview['resource_name']} (ID {preview['resource_id']})")
print(f"Title: {preview['title']}")
print(f"Start: {preview['start_iso']}")
print(f"End: {preview['end_iso']}")
print(f"Principal Investigator: {preview['matched_pi']}")
confirmed = preview.get("confirmed_initials")
if confirmed:
print(f"Scanned Person Initials: {confirmed}")
matched = preview.get("matched_emails") or []
if matched:
print(f"Participants: {', '.join(matched)}")
unmatched = preview.get("unmatched_emails") or []
if unmatched:
print(f"Unreg: {', '.join(unmatched)}")
recurrence = preview.get("recurrence_rule")
if recurrence:
print(f"Recurrence: {recurrence}")
def main() -> int:
parser = argparse.ArgumentParser(
description=(
"Import Calcium .Events data with dedupe, preview each with --dry-run, "
"and optionally create it in Scandium."
)
)
parser.add_argument(
"input",
nargs="?",
help="Input Calcium .Events file (defaults based on --resource)",
)
parser.add_argument(
"--category",
help="Calcium category to import (exact match)",
)
parser.add_argument(
"--exclude-category",
action="append",
help=(
"Calcium category to exclude (repeatable). If set without --category, "
"imports all categories except these."
),
)
parser.add_argument(
"--preferences",
help="Optional Calcium .Preferences file to resolve time periods",
)
parser.add_argument(
"--resource",
default=DEFAULT_RESOURCE,
help="Resource name (default: P1 Prisma)",
)
parser.add_argument(
"--timezone",
default=DEFAULT_TZ,
help="Timezone for Calcium events (default: America/New_York)",
)
parser.add_argument(
"--yes",
action="store_true",
help="Create all events without prompting",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show deduped events without creating reservations",
)
parser.add_argument(
"--list",
action="store_true",
help="List deduped events without creating reservations",
)
parser.add_argument(
"--debug",
action="store_true",
help="Log progress details to stderr",
)
args = parser.parse_args()
input_path_value = args.input
if not input_path_value:
input_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource)
if not input_path_value:
raise SystemExit(
f"No default Calcium .Events for resource '{args.resource}'. "
"Provide an input file."
)
args.input = input_path_value
if args.category and args.exclude_category:
parser.error("Use either --category or --exclude-category, not both.")
if not args.category and not args.exclude_category:
parser.error("Provide --category or at least one --exclude-category.")
events_path = Path(args.input)
prefs_path = Path(args.preferences) if args.preferences else None
exclude_categories: List[str] = []
if args.exclude_category:
for value in args.exclude_category:
for piece in value.split(","):
item = piece.strip()
if item:
exclude_categories.append(item)
log_debug(args.debug, f"Reading Calcium events: {events_path}")
vevents = build_vevents(
events_path,
category_filter=args.category,
tz_name=args.timezone,
preferences_path=prefs_path,
exclude_categories=exclude_categories,
debug=args.debug,
use_utc=True,
)
if not vevents:
print("No matching events found.")
return 1
tz = ZoneInfo(args.timezone)
today_date = datetime.now(tz).date()
events = []
for item in vevents:
adjusted = adjust_event_to_today(item.event, tz, today_date)
if not adjusted:
continue
events.append({"event": adjusted, "lines": item.lines})
log_debug(args.debug, f"Parsed {len(events)} events")
log_debug(args.debug, "Starting dedupe (API-resolved)")
api = BookedAPI()
kept, removed = dedupe_events(events, api, args.resource, args.debug)
if removed:
print(f"Removed {len(removed)} duplicate events:")
for item in removed:
print(f"- {format_event_summary(item['event'])}")
log_debug(args.debug, f"Deduped events: kept={len(kept)}, removed={len(removed)}")
if args.dry_run:
resource = find_resource(api, args.resource)
for idx, item in enumerate(kept, start=1):
print("\n" + "=" * 72)
print(f"Event {idx} of {len(kept)}")
print("-" * 72)
try:
_, preview = build_reservation_payload(api, item["event"], resource)
except ValueError as exc:
print(f"Skipping event: {exc}")
continue
print_dry_run(preview)
log_debug(args.debug, "Dry run enabled; exiting before creation")
return 0
if args.list:
for item in kept:
print(f"- {format_event_summary(item['event'])}")
return 0
resource = find_resource(api, args.resource)
for idx, item in enumerate(kept, start=1):
event = item["event"]
print("\n" + "=" * 72)
print(f"Event {idx} of {len(kept)}")
print("-" * 72)
try:
payload, preview = build_reservation_payload(api, event, resource)
except ValueError as exc:
print(f"Skipping event: {exc}")
continue
print_dry_run(preview)
if not args.yes:
response = input("Create this event? [Y]es / [n]o / [q]uit: ").strip().lower()
if response in {"q", "quit"}:
print("Stopping.")
return 0
if response in {"n", "no"}:
print("Skipped.")
continue
log_debug(args.debug, f"Creating event {idx}")
result = api.create_reservation(
resource_id=payload["resource_id"],
start_datetime=payload["start_datetime"],
end_datetime=payload["end_datetime"],
title=payload["title"],
description=payload["description"],
participants=payload["participants"],
custom_attributes=payload["custom_attributes"],
recurrence_rule=payload["recurrence_rule"],
)
reference = result.get("referenceNumber")
if not reference:
errors = result.get("errors", result)
print(f"Create failed: {errors}")
else:
print(f"Created ({reference}).")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
import argparse
import html
import sys
from datetime import datetime, time, timedelta
from pathlib import Path
from typing import Dict, List, Optional
from zoneinfo import ZoneInfo
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from create_scandium_event import parse_start_datetime # noqa: E402
from create_scandium_event_multiple import expand_exdates, get_resource_timezone, parse_exdate_dates # noqa: E402
from native.calcium_events import build_vevents
DEFAULT_RESOURCE = "P1 Prisma"
DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = {
"P1 Prisma": "data/3T_Prisma_P1.Events",
"P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events",
"4T Varian": "data/4T_Varian_Inova.Events",
}
PI_ATTRIBUTE_ID = 1
def normalize_summary(summary: str) -> str:
normalized = summary.replace("\\n", "\n")
normalized = normalized.replace("\\,", ",").replace("\\;", ";").replace("\\\\", "\\")
normalized = html.unescape(normalized)
return normalized.strip()
def local_datetime_from_ics(value: str, tz: ZoneInfo) -> datetime:
if not value:
return datetime.min.replace(tzinfo=tz)
if "T" not in value:
date_value = datetime.strptime(value[:8], "%Y%m%d").date()
return datetime.combine(date_value, datetime.min.time(), tzinfo=tz)
iso = ics_datetime_to_iso(value)
dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z")
return dt.astimezone(tz)
def local_iso_datetime(value: str, tz: ZoneInfo) -> str:
if not value:
return ""
local_dt = local_datetime_from_ics(value, tz)
return local_dt.strftime("%Y-%m-%dT%H:%M:%S%z")
def format_ics_like(template: str, dt: datetime) -> str:
if "T" not in template:
return dt.strftime("%Y%m%d")
if template.endswith("Z"):
utc = dt.astimezone(ZoneInfo("UTC"))
return utc.strftime("%Y%m%dT%H%M%SZ")
return dt.strftime("%Y%m%dT%H%M%S")
def truncate_end(start_iso: str, end_iso: str, max_end_iso: Optional[str]) -> str:
if not max_end_iso:
return end_iso
start_dt = datetime.strptime(start_iso, "%Y-%m-%dT%H:%M:%S%z")
end_dt = datetime.strptime(end_iso, "%Y-%m-%dT%H:%M:%S%z")
max_end_dt = datetime.strptime(max_end_iso, "%Y-%m-%dT%H:%M:%S%z")
if end_dt <= max_end_dt:
return end_iso
if max_end_dt <= start_dt:
return end_iso
return max_end_iso
def format_dry_run_entry(index: int, event: Dict[str, str], payload: Dict[str, str]) -> str:
rrule = event.get("RRULE")
recurring = "yes" if rrule else "no"
pieces = [
f"{index}. {payload['startDateTime']} -> {payload['endDateTime']}",
f"recurring={recurring}",
]
if rrule:
pieces.append(f"RRULE={rrule}")
return " | ".join(pieces)
def resolve_service_value(api: BookedAPI, override_value: Optional[str]) -> str:
if override_value:
return override_value
return api.find_attribute_value(PI_ATTRIBUTE_ID, "Service") or "Service"
def add_months(date_value: datetime.date, months: int) -> datetime.date:
year = date_value.year + (date_value.month - 1 + months) // 12
month = (date_value.month - 1 + months) % 12 + 1
day = date_value.day
while True:
try:
return datetime(year, month, day).date()
except ValueError:
day -= 1
def next_occurrence(
start_dt: datetime,
rule: Dict,
now_dt: datetime,
excluded_dates: set[datetime.date],
) -> Optional[datetime]:
interval = int(rule.get("interval", 1))
occurrence = start_dt
if rule.get("type") == "daily":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
steps = delta_days // interval
occurrence = occurrence + timedelta(days=steps * interval)
if occurrence < now_dt:
occurrence += timedelta(days=interval)
elif rule.get("type") == "weekly":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
weeks = delta_days // 7
steps = (weeks // interval) * interval
occurrence = occurrence + timedelta(days=steps * 7)
if occurrence < now_dt:
occurrence += timedelta(days=interval * 7)
elif rule.get("type") == "monthly":
if now_dt > occurrence:
months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month)
steps = (months // interval) * interval
occurrence_date = add_months(occurrence.date(), steps)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
if occurrence < now_dt:
occurrence_date = add_months(occurrence.date(), interval)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
elif rule.get("type") == "yearly":
if now_dt > occurrence:
years = now_dt.year - occurrence.year
steps = (years // interval) * interval
try:
occurrence = occurrence.replace(year=occurrence.year + steps)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + steps, day=28)
if occurrence < now_dt:
try:
occurrence = occurrence.replace(year=occurrence.year + interval)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + interval, day=28)
else:
return None
attempts = 0
while occurrence.date() in excluded_dates and attempts < 500:
if rule.get("type") == "daily":
occurrence += timedelta(days=interval)
elif rule.get("type") == "weekly":
occurrence += timedelta(days=interval * 7)
elif rule.get("type") == "monthly":
occurrence_date = add_months(occurrence.date(), interval)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
elif rule.get("type") == "yearly":
try:
occurrence = occurrence.replace(year=occurrence.year + interval)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + interval, day=28)
attempts += 1
return occurrence
def adjust_event_to_today(
event: Dict[str, str],
tz: ZoneInfo,
today_date: datetime.date,
) -> Optional[Dict[str, str]]:
start_raw = event.get("DTSTART", "")
end_raw = event.get("DTEND", "")
if not start_raw or not end_raw:
return None
start_dt = local_datetime_from_ics(start_raw, tz)
end_dt = local_datetime_from_ics(end_raw, tz)
rrule = event.get("RRULE")
exdates = parse_exdate_dates(event.get("EXDATE", ""), tz)
if not rrule:
if start_dt.date() < today_date:
return None
return event
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None))
termination = rule.get("repeatTerminationDate")
until_date = None
if termination:
until_date = datetime.strptime(
termination, "%Y-%m-%dT%H:%M:%S%z"
).astimezone(tz).date()
if until_date and today_date > until_date:
return None
if start_dt.date() >= today_date:
return event
today_dt = datetime.combine(today_date, time.min, tzinfo=tz)
occurrence = next_occurrence(start_dt, rule, today_dt, exdates)
if not occurrence:
return None
if until_date and occurrence.date() > until_date:
return None
duration = end_dt - start_dt
new_start = occurrence
new_end = new_start + duration
updated = dict(event)
updated["DTSTART"] = format_ics_like(start_raw, new_start)
updated["DTEND"] = format_ics_like(end_raw, new_end)
return updated
def event_has_future_occurrence(event: Dict[str, str], now_dt: datetime) -> bool:
tz = now_dt.tzinfo or ZoneInfo("America/New_York")
start_raw = event.get("DTSTART", "")
if not start_raw:
return False
start_dt = local_datetime_from_ics(start_raw, tz)
rrule = event.get("RRULE")
exdates = parse_exdate_dates(event.get("EXDATE", ""), tz)
if not rrule:
return start_dt >= now_dt
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None))
termination = rule.get("repeatTerminationDate")
until_date = None
if termination:
until_date = datetime.strptime(
termination, "%Y-%m-%dT%H:%M:%S%z"
).astimezone(tz).date()
occurrence = next_occurrence(start_dt, rule, now_dt, exdates)
if not occurrence:
return False
if until_date and occurrence.date() > until_date:
return False
return True
def build_payload(
event: Dict[str, str],
resource_id: int,
user_id: int,
service_value: str,
tz: ZoneInfo,
max_end_iso: Optional[str] = None,
) -> Dict:
start_iso = local_iso_datetime(event.get("DTSTART", ""), tz)
end_iso = local_iso_datetime(event.get("DTEND", ""), tz)
end_iso = truncate_end(start_iso, end_iso, max_end_iso)
summary = event.get("SUMMARY", "")
payload = {
"resourceId": resource_id,
"userId": user_id,
"startDateTime": start_iso,
"endDateTime": end_iso,
"title": "",
"description": normalize_summary(summary),
"customAttributes": [
{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": service_value}
],
}
rrule = event.get("RRULE")
if rrule:
start_raw = event.get("DTSTART", "")
if start_raw:
local_start = local_datetime_from_ics(start_raw, tz).replace(tzinfo=None)
else:
local_start = parse_start_datetime(start_raw)
payload["recurrenceRule"] = parse_rrule(rrule, local_start)
if payload["recurrenceRule"]["type"] == "monthly":
payload["recurrenceRule"].setdefault("monthlyType", "dayOfMonth")
payload["recurrenceRule"].setdefault("dayOfMonth", local_start.day)
return payload
def get_last_reservable_end(
api: BookedAPI,
schedule_id: int,
resource_id: int,
date_value: datetime.date,
cache: dict,
) -> Optional[str]:
key = (schedule_id, resource_id, date_value)
if key in cache:
return cache[key]
date_str = date_value.strftime("%Y-%m-%d")
slots = api.get_schedule_slots(schedule_id, date_str, date_str, resource_id=resource_id)
max_end = None
for date_entry in slots.get("dates", []):
for resource in date_entry.get("resources", []):
for slot in resource.get("slots", []):
if slot.get("isReservable"):
end_time = slot.get("endDateTime")
if end_time and (max_end is None or end_time > max_end):
max_end = end_time
cache[key] = max_end
return max_end
def delete_service_reservations(
api: BookedAPI,
start_date: str,
end_date: str,
target_pi_value: str,
) -> None:
resources = api.list_resources()
for resource in resources:
resource_id = resource.get("resourceId")
if not resource_id:
continue
reservations = api.list_reservations(
resource_id=resource_id,
start_date=start_date,
end_date=end_date,
)
for res in reservations:
attrs = res.get("customAttributes") or []
pi_value = None
for attr in attrs:
if attr.get("id") == PI_ATTRIBUTE_ID:
pi_value = attr.get("value")
break
if pi_value != target_pi_value:
continue
ref = res.get("referenceNumber")
if not ref:
continue
scope = "full" if res.get("isRecurring") else "this"
api.delete_reservation(ref, update_scope=scope)
print(
f"Deleted {ref} | {res.get('resourceName')} | {res.get('startDate')} -> {res.get('endDate')} | scope={scope}"
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Import Calcium .Events into a resource as Service reservations."
)
parser.add_argument(
"input",
nargs="?",
help="Input Calcium .Events file (defaults based on --resource)",
)
parser.add_argument(
"--category",
required=True,
help="Calcium category to import (exact match)",
)
parser.add_argument(
"--resource",
default=DEFAULT_RESOURCE,
help="Resource name (default: P1 Prisma)",
)
parser.add_argument(
"--preferences",
help="Optional Calcium .Preferences file to resolve time periods",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be created without creating reservations",
)
parser.add_argument(
"--debug",
action="store_true",
help="Log progress details to stderr",
)
parser.add_argument(
"--delete-existing",
action="store_true",
help="Delete existing PI reservations in the pmservice date range",
)
parser.add_argument(
"--list",
action="store_true",
help="List events without creating reservations",
)
parser.add_argument(
"--pi",
dest="pi_value",
help="Override PI attribute value (default: Service)",
)
args = parser.parse_args()
if not args.input:
input_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource)
if not input_path_value:
print(
f"No default Calcium .Events for resource '{args.resource}'. "
"Provide an input file.",
file=sys.stderr,
)
return 1
args.input = input_path_value
api = BookedAPI()
resource = api.find_resource_by_name(args.resource)
if not resource:
print(f"Resource '{args.resource}' not found.", file=sys.stderr)
return 1
resource_id = int(resource["resourceId"])
schedule_id = int(resource["scheduleId"])
user_id = api.get_user_id()
tz = get_resource_timezone(api, args.resource, {}, args.debug)
events_path = Path(args.input)
prefs_path = Path(args.preferences) if args.preferences else None
vevents = build_vevents(
events_path,
category_filter=args.category,
tz_name=tz.key,
preferences_path=prefs_path,
debug=args.debug,
use_utc=True,
)
events = [{"event": item.event, "lines": item.lines} for item in vevents]
expanded = expand_exdates(events, tz, args.debug)
today_date = datetime.now(tz).date()
adjusted = []
for item in expanded:
updated = adjust_event_to_today(item["event"], tz, today_date)
if not updated:
continue
adjusted.append({"event": updated, "lines": item["lines"]})
expanded = adjusted
service_value = resolve_service_value(api, args.pi_value)
if args.delete_existing and expanded and not args.dry_run and not args.list:
start_dates = [local_datetime_from_ics(item["event"]["DTSTART"], tz).date() for item in expanded]
end_dates = [local_datetime_from_ics(item["event"]["DTEND"], tz).date() for item in expanded]
min_date = min(start_dates)
max_date = max(end_dates) + timedelta(days=1)
delete_service_reservations(
api,
start_date=min_date.strftime("%Y-%m-%d"),
end_date=max_date.strftime("%Y-%m-%d"),
target_pi_value=service_value,
)
slot_cache: dict = {}
for idx, item in enumerate(expanded, start=1):
start_local = local_datetime_from_ics(item["event"]["DTSTART"], tz)
max_end = get_last_reservable_end(
api, schedule_id, resource_id, start_local.date(), slot_cache
)
payload = build_payload(
item["event"],
resource_id,
user_id,
service_value,
tz,
max_end_iso=max_end,
)
if args.list:
print(format_dry_run_entry(idx, item["event"], payload))
print(payload["description"])
continue
if args.dry_run:
print(format_dry_run_entry(idx, item["event"], payload))
print(payload["description"])
continue
result = api.create_reservation(
resource_id=payload["resourceId"],
start_datetime=payload["startDateTime"],
end_datetime=payload["endDateTime"],
title=payload["title"],
description=payload["description"],
user_id=payload["userId"],
custom_attributes=payload["customAttributes"],
recurrence_rule=payload.get("recurrenceRule"),
)
if result.get("referenceNumber"):
print(f"Created {idx}: {result['referenceNumber']}")
else:
print(f"Error creating {idx}: {result.get('errors')}", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
import argparse
import csv
import re
import sys
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from pathlib import Path
from typing import List, Optional
from zoneinfo import ZoneInfo
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI, parse_rrule # noqa: E402
from calcium_events import _build_rrule, build_vevents, parse_calcium_events
DEFAULT_RESOURCE_NAME = "Staff"
INITIALS_FILE = "initials.csv"
LOCATION_ATTRIBUTE_ID = 6
DEFAULT_TZ = "America/New_York"
DEFAULT_EVENTS_PATH = Path(__file__).resolve().parent / "data/3T_Prisma_P1.Events"
DEFAULT_OWNER_EMAIL = "[email protected]"
STAFF_CATEGORIES = ("Staff Hours", "Front Desk", "Tech_Aide")
DEFAULT_REPEAT_TERMINATION = "2026-07-30T00:00:00+0000"
TIME_RANGE_RE = re.compile(
r"^(?P<start>\d{1,2}(?::\d{2})?)(?P<start_suffix>[ap])?\s*-?\s*(?P<end>\d{1,2}(?::\d{2})?)(?P<end_suffix>[ap])?$"
)
SEGMENT_RE = re.compile(
r"(?P<range>\d{1,2}(?::\d{2})?[ap]?\s*-?\s*\d{1,2}(?::\d{2})?[ap]?)\s*\((?P<loc>[^)]+)\)"
)
RANGE_RE = re.compile(
r"\d{1,2}(?::\d{2})?[ap]?\s*-?\s*\d{1,2}(?::\d{2})?[ap]?"
)
DEFAULT_LOCATION_BY_CATEGORY = {
"Front Desk": "FD",
"Tech_Aide": "TA",
}
@dataclass(frozen=True)
class StaffEntry:
initials: str
date: date
start: time
end: time
location: str
def load_initials_map(path: str) -> dict[str, str]:
mapping: dict[str, str] = {}
with open(path, "r", encoding="utf-8") as handle:
reader = csv.reader(handle)
next(reader, None)
for row in reader:
if len(row) < 3:
continue
initials = row[0].strip()
email = row[2].strip()
if initials:
mapping[initials] = email
return mapping
def local_datetime_to_iso(day: date, t: time, tz_name: str = DEFAULT_TZ) -> str:
tz = ZoneInfo(tz_name)
dt = datetime.combine(day, t, tzinfo=tz)
return dt.astimezone(ZoneInfo("UTC")).strftime("%Y-%m-%dT%H:%M:%S%z")
def _parse_hhmm(token: str) -> time:
if ":" in token:
hour_str, minute_str = token.split(":", 1)
hour = int(hour_str)
minute = int(minute_str)
else:
hour = int(token)
minute = 0
return time(hour, minute)
def _apply_meridiem(base: time, suffix: str) -> time:
hour = base.hour
minute = base.minute
if suffix == "p" and hour < 12:
hour += 12
if suffix == "a" and hour == 12:
hour = 0
return time(hour, minute)
def parse_time_range(value: str) -> tuple[time, time]:
normalized = " ".join(value.strip().lower().split())
match = TIME_RANGE_RE.match(normalized)
if not match:
raise ValueError(f"Invalid time range: {value}")
start_token = match.group("start")
end_token = match.group("end")
start_suffix = match.group("start_suffix")
end_suffix = match.group("end_suffix")
if not end_suffix:
end_suffix = "p"
start_raw = _parse_hhmm(start_token)
end_raw = _parse_hhmm(end_token)
end_time = _apply_meridiem(end_raw, end_suffix)
if start_suffix:
start_time = _apply_meridiem(start_raw, start_suffix)
else:
assumed = _apply_meridiem(start_raw, end_suffix)
if assumed >= end_time:
flipped = "a" if end_suffix == "p" else "p"
assumed = _apply_meridiem(start_raw, flipped)
start_time = assumed
return start_time, end_time
def split_summary_lines(summary: str) -> list[str]:
return [line.strip() for line in summary.replace("\\n", "\n").split("\n") if line.strip()]
def parse_staff_line(
line: str, default_location: Optional[str] = None
) -> Optional[tuple[str, list[tuple[time, time, str]]]]:
stripped = line.strip()
if not stripped:
raise ValueError("Empty summary line")
parts = stripped.split(maxsplit=1)
initials = parts[0]
remainder = parts[1] if len(parts) > 1 else ""
if remainder.strip().lower().startswith("off"):
return None
segments: list[tuple[time, time, str]] = []
for match in SEGMENT_RE.finditer(remainder):
time_range = match.group("range")
location = match.group("loc").strip()
try:
start, end = parse_time_range(time_range)
except ValueError:
continue
segments.append((start, end, location))
if not segments and default_location:
for match in RANGE_RE.finditer(remainder):
try:
start, end = parse_time_range(match.group(0))
except ValueError:
continue
segments.append((start, end, default_location))
if not segments:
return None
return initials, segments
def build_staff_entries(
summary: str, day: date, default_location: Optional[str] = None
) -> list[StaffEntry]:
entries: list[StaffEntry] = []
for line in split_summary_lines(summary):
parsed = parse_staff_line(line, default_location=default_location)
if not parsed:
print(f"Warning: skipped summary line (no segments or off): {line}", file=sys.stderr)
continue
initials, segments = parsed
for start, end, location in segments:
entries.append(
StaffEntry(
initials=initials,
date=day,
start=start,
end=end,
location=location,
)
)
return entries
def create_reservations(
api: BookedAPI,
payloads: list[dict],
initials_map: dict[str, str],
owner_id: Optional[int],
) -> list[str]:
references: list[str] = []
for payload in payloads:
if payload.get("customAttributes"):
for attr in payload["customAttributes"]:
if attr.get("attributeId") == LOCATION_ATTRIBUTE_ID and attr.get("attributeValue"):
resolved = api.find_attribute_value(
LOCATION_ATTRIBUTE_ID, attr["attributeValue"]
)
if resolved:
attr["attributeValue"] = resolved
resource_id = payload.get("resource_id")
if resource_id is None:
raise ValueError("Payload missing resource_id")
participant_ids = None
initials = payload.get("initials", "")
if initials and initials != "?":
email = initials_map.get(initials)
if email:
user = api.find_user_by_email(email)
if user:
participant_ids = [user["id"]]
result = api.create_reservation(
resource_id=resource_id,
start_datetime=payload["start"],
end_datetime=payload["end"],
title="",
user_id=owner_id,
participants=participant_ids,
custom_attributes=payload["customAttributes"],
recurrence_rule=_apply_default_termination(payload.get("recurrence")),
)
reference = result.get("referenceNumber")
if not reference:
detail = {
"initials": payload.get("initials"),
"start": payload.get("start"),
"end": payload.get("end"),
"location": payload.get("location"),
"recurrence": payload.get("recurrence"),
}
raise RuntimeError(f"API error: {result} for {detail}")
references.append(reference)
return references
def _apply_default_termination(recurrence: Optional[dict]) -> Optional[dict]:
if not recurrence:
return recurrence
if recurrence.get("type") in {None, "none"}:
return recurrence
if recurrence.get("repeatTerminationDate"):
return recurrence
updated = dict(recurrence)
updated["repeatTerminationDate"] = DEFAULT_REPEAT_TERMINATION
return updated
def parse_start_datetime(dtstart: str) -> datetime:
trimmed = dtstart.rstrip("Z")
if len(trimmed) == 8:
return datetime.strptime(trimmed, "%Y%m%d")
return datetime.strptime(trimmed, "%Y%m%dT%H%M%S")
def parse_termination_date(value: str | None, tz: Optional[ZoneInfo] = None) -> date | None:
if not value:
return None
if value.endswith("Z"):
value = value[:-1] + "+0000"
if value[-3] == ":":
value = value[:-3] + value[-2:]
parsed = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z")
if tz:
return parsed.astimezone(tz).date()
return parsed.date()
def recurrence_signature(rule: dict) -> tuple:
if not rule:
return (None,)
return (
rule.get("type"),
int(rule.get("interval", 1)),
tuple(rule.get("weekdays") or []),
parse_termination_date(rule.get("repeatTerminationDate")),
)
def recurrence_occurs_on(entry_date: date, start_date: date, rule: dict) -> bool:
rtype = rule.get("type")
if not rtype or rtype == "none":
return False
if entry_date < start_date:
return False
termination = parse_termination_date(rule.get("repeatTerminationDate"))
if termination and entry_date > termination:
return False
interval = int(rule.get("interval", 1))
if rtype == "daily":
diff_days = (entry_date - start_date).days
return diff_days % interval == 0
if rtype == "weekly":
weekdays = rule.get("weekdays") or []
if not weekdays:
return False
booked_weekday = (entry_date.weekday() + 1) % 7
if booked_weekday not in weekdays:
return False
start_weekday = (start_date.weekday() + 1) % 7
delta_days = (booked_weekday - start_weekday) % 7
first_occurrence = start_date + timedelta(days=delta_days)
if entry_date < first_occurrence:
return False
diff_days = (entry_date - first_occurrence).days
if diff_days % 7 != 0:
return False
weeks = diff_days // 7
return weeks % interval == 0
return False
def recurrence_end_date(rule: dict) -> date:
termination = parse_termination_date(rule.get("repeatTerminationDate"))
if termination:
return termination
return parse_termination_date(DEFAULT_REPEAT_TERMINATION)
def format_termination_date(day: date) -> str:
return day.strftime("%Y-%m-%dT00:00:00+0000")
def next_occurrence_after(
start_date: date, rule: dict, after_date: date, tz: ZoneInfo
) -> Optional[date]:
start_dt = datetime.combine(start_date, time.min, tzinfo=tz)
now_dt = datetime.combine(after_date + timedelta(days=1), time.min, tzinfo=tz)
occurrence = next_occurrence(start_dt, rule, now_dt)
return occurrence.date() if occurrence else None
def add_months(date_value: date, months: int) -> date:
year = date_value.year + (date_value.month - 1 + months) // 12
month = (date_value.month - 1 + months) % 12 + 1
day = date_value.day
while True:
try:
return date(year, month, day)
except ValueError:
day -= 1
def next_occurrence(start_dt: datetime, rule: dict, now_dt: datetime) -> Optional[datetime]:
interval = int(rule.get("interval", 1))
occurrence = start_dt
if rule.get("type") == "daily":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
steps = delta_days // interval
occurrence = occurrence + timedelta(days=steps * interval)
if occurrence < now_dt:
occurrence += timedelta(days=interval)
elif rule.get("type") == "weekly":
if now_dt > occurrence:
delta_days = (now_dt.date() - occurrence.date()).days
weeks = delta_days // 7
steps = (weeks // interval) * interval
occurrence = occurrence + timedelta(days=steps * 7)
if occurrence < now_dt:
occurrence += timedelta(days=interval * 7)
elif rule.get("type") == "monthly":
if now_dt > occurrence:
months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month)
steps = (months // interval) * interval
occurrence_date = add_months(occurrence.date(), steps)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
if occurrence < now_dt:
occurrence_date = add_months(occurrence.date(), interval)
occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo)
elif rule.get("type") == "yearly":
if now_dt > occurrence:
years = now_dt.year - occurrence.year
steps = (years // interval) * interval
try:
occurrence = occurrence.replace(year=occurrence.year + steps)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + steps, day=28)
if occurrence < now_dt:
try:
occurrence = occurrence.replace(year=occurrence.year + interval)
except ValueError:
occurrence = occurrence.replace(year=occurrence.year + interval, day=28)
else:
return None
return occurrence
def adjust_entries_to_today(entries: List[dict], tz: ZoneInfo) -> List[dict]:
today_date = datetime.now(tz).date()
today_dt = datetime.combine(today_date, time.min, tzinfo=tz)
adjusted: List[dict] = []
for entry in entries:
recurrence_rule = entry.get("recurrence_rule")
if not recurrence_rule:
if entry["date"] < today_date:
continue
adjusted.append(entry)
continue
start_dt = datetime.combine(entry["date"], entry["start_time"], tzinfo=tz)
until_date = parse_termination_date(
recurrence_rule.get("repeatTerminationDate"), tz
)
if until_date and today_date > until_date:
continue
if entry["date"] < today_date:
occurrence = next_occurrence(start_dt, recurrence_rule, today_dt)
if not occurrence:
continue
if until_date and occurrence.date() > until_date:
continue
entry = dict(entry)
entry["date"] = occurrence.date()
entry["recurrence_start"] = occurrence.date()
adjusted.append(entry)
return adjusted
def entry_key(entry: dict) -> tuple:
return (
entry.get("initials"),
entry.get("location"),
entry.get("start_time"),
entry.get("end_time"),
)
def split_recurrence_on_overrides(
entry: dict,
override_dates: List[date],
tz: ZoneInfo,
) -> List[dict]:
if not override_dates:
return [entry]
rule = entry.get("recurrence_rule")
if not rule:
return [entry]
start_date = entry.get("recurrence_start") or entry.get("date")
if not start_date:
return [entry]
end_date = recurrence_end_date(rule)
valid_overrides = [
day
for day in override_dates
if day >= start_date and day <= end_date and recurrence_occurs_on(day, start_date, rule)
]
if not valid_overrides:
return [entry]
segments: List[dict] = []
segment_start = start_date
for override in sorted(valid_overrides):
segment_end = override - timedelta(days=1)
if segment_end >= segment_start:
segment = dict(entry)
segment["date"] = segment_start
segment["recurrence_start"] = segment_start
segment_rule = dict(rule)
segment_rule["repeatTerminationDate"] = format_termination_date(segment_end)
segment["recurrence_rule"] = segment_rule
segments.append(segment)
next_start = next_occurrence_after(segment_start, rule, override, tz)
if not next_start:
segment_start = None
break
segment_start = next_start
if segment_start and segment_start <= end_date:
segment = dict(entry)
segment["date"] = segment_start
segment["recurrence_start"] = segment_start
segment_rule = dict(rule)
segment_rule["repeatTerminationDate"] = format_termination_date(end_date)
segment["recurrence_rule"] = segment_rule
segments.append(segment)
return segments
def dedupe_entries(entries: List[dict], tz: ZoneInfo) -> tuple[List[dict], int, List[dict]]:
recurring_entries = [e for e in entries if e.get("recurrence_rule")]
single_entries = [e for e in entries if not e.get("recurrence_rule")]
override_dates_by_category: dict[str, set[date]] = {}
for entry in single_entries:
entry_date = entry.get("date")
if not entry_date:
continue
category = entry.get("category") or ""
override_dates_by_category.setdefault(category, set()).add(entry_date)
split_recurring: List[dict] = []
for entry in recurring_entries:
category = entry.get("category") or ""
override_dates = sorted(override_dates_by_category.get(category, set()))
split_recurring.extend(split_recurrence_on_overrides(entry, override_dates, tz))
recurring_seen = set()
deduped_recurring = []
removed = 0
removed_entries: List[dict] = []
for entry in split_recurring:
signature = (entry_key(entry), recurrence_signature(entry["recurrence_rule"]), entry["recurrence_start"])
if signature in recurring_seen:
removed += 1
removed_entries.append(entry)
continue
recurring_seen.add(signature)
deduped_recurring.append(entry)
return deduped_recurring + single_entries, removed, removed_entries
def build_entries_for_event(event: dict) -> List[dict]:
summary = event.get("SUMMARY", "")
dtstart = event.get("DTSTART", "")
if not summary.strip() or not dtstart:
raise ValueError("SUMMARY and DTSTART are required")
start_dt = parse_start_datetime(dtstart)
day = start_dt.date()
category = event.get("CATEGORIES", "")
default_location = DEFAULT_LOCATION_BY_CATEGORY.get(category)
staff_entries = build_staff_entries(summary, day, default_location=default_location)
recurrence_rule = None
recurrence_start = None
rrule = event.get("RRULE", "")
if rrule:
recurrence_rule = parse_rrule(rrule, start_dt)
recurrence_start = start_dt.date()
entries = []
for entry in staff_entries:
entries.append(
{
"initials": entry.initials,
"date": entry.date,
"start_time": entry.start,
"end_time": entry.end,
"location": entry.location,
"recurrence_rule": recurrence_rule,
"recurrence_start": recurrence_start,
"category": category,
"summary": summary,
}
)
return entries
def build_entries_for_calcium_event(event: "CalciumEvent") -> List[dict]:
summary = event.text or ""
start_date = event.repeat_start or event.date
if not summary.strip() or not start_date:
raise ValueError("SUMMARY and date are required")
category = event.categories[0] if event.categories else ""
default_location = DEFAULT_LOCATION_BY_CATEGORY.get(category)
staff_entries = build_staff_entries(summary, start_date, default_location=default_location)
recurrence_rule = None
recurrence_start = None
rrule = _build_rrule(event)
if rrule:
recurrence_rule = parse_rrule(rrule, datetime.combine(start_date, time.min))
recurrence_start = start_date
entries = []
for entry in staff_entries:
entries.append(
{
"initials": entry.initials,
"date": entry.date,
"start_time": entry.start,
"end_time": entry.end,
"location": entry.location,
"recurrence_rule": recurrence_rule,
"recurrence_start": recurrence_start,
"category": category,
"summary": summary,
}
)
return entries
def should_process_vevent(event: dict, today: date) -> bool:
dtstart = event.get("DTSTART", "")
if not dtstart:
return False
start_dt = parse_start_datetime(dtstart)
if start_dt.date() >= today:
return True
rrule = event.get("RRULE", "")
if not rrule:
return False
recurrence = parse_rrule(rrule, start_dt)
termination = parse_termination_date(recurrence.get("repeatTerminationDate"))
return termination is None or termination >= today
def should_process_calcium_event(event: "CalciumEvent", today: date) -> bool:
if event.is_repeating:
start = event.repeat_start
end = event.repeat_end
if start and start > today:
return True
if end and end < today:
return False
return True
if event.date and event.date >= today:
return True
return False
def build_payload_from_entry(entry: dict) -> dict:
return {
"start": local_datetime_to_iso(entry["date"], entry["start_time"]),
"end": local_datetime_to_iso(entry["date"], entry["end_time"]),
"location": entry["location"],
"customAttributes": [
{"attributeId": LOCATION_ATTRIBUTE_ID, "attributeValue": entry["location"]}
],
"initials": entry["initials"],
"recurrence": entry.get("recurrence_rule"),
}
def summarize_plan(entries: List[dict], removed: int, initials_map: dict) -> None:
print("Plan summary")
print(f"Total entries: {len(entries) + removed}")
print(f"Removed as duplicates: {removed}")
print(f"To create: {len(entries)}")
for entry in entries:
email = initials_map.get(entry.get("initials", ""))
recurrence = entry.get("recurrence_rule")
label = recurrence.get("type") if recurrence else "single"
start = datetime.combine(entry["date"], entry["start_time"]).strftime("%Y-%m-%d %H:%M")
end = datetime.combine(entry["date"], entry["end_time"]).strftime("%Y-%m-%d %H:%M")
line = f"- {entry.get('initials')} {start} -> {end} ({entry.get('location')})"
if email:
line = f"{line} {email}"
line = f"{line} [{label}]"
print(line)
def summarize_removed(removed_entries: List[dict], initials_map: dict) -> None:
if not removed_entries:
return
print("\nRemoved duplicates")
for entry in removed_entries:
email = initials_map.get(entry.get("initials", ""))
recurrence = entry.get("recurrence_rule")
label = recurrence.get("type") if recurrence else "single"
start = datetime.combine(entry["date"], entry["start_time"]).strftime("%Y-%m-%d %H:%M")
end = datetime.combine(entry["date"], entry["end_time"]).strftime("%Y-%m-%d %H:%M")
line = f"- {entry.get('initials')} {start} -> {end} ({entry.get('location')})"
if email:
line = f"{line} {email}"
line = f"{line} [{label}]"
print(line)
def main() -> int:
parser = argparse.ArgumentParser(
description=(
"Create Staff reservations from Calcium .Events file with dedupe "
f"(source: {DEFAULT_EVENTS_PATH})"
),
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--yes",
action="store_true",
help="Create all reservations without prompting",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show plan without creating reservations",
)
parser.add_argument(
"--list",
action="store_true",
help="List entries without creating reservations",
)
parser.add_argument(
"--only-date",
help="Create entries only for a specific date (YYYY-MM-DD)",
)
args = parser.parse_args()
events_path = DEFAULT_EVENTS_PATH
tz = ZoneInfo(DEFAULT_TZ)
today = datetime.now(tz).date()
vevents = []
raw_events = []
for category in STAFF_CATEGORIES:
vevents.extend(
build_vevents(
events_path,
category_filter=category,
tz_name=DEFAULT_TZ,
preferences_path=None,
use_utc=False,
)
)
raw_events.extend(
parse_calcium_events(
events_path,
category_filter=category,
exclude_categories=None,
preferences_path=None,
debug=False,
)
)
if not vevents and not raw_events:
print("No matching events found.")
return 1
entries: List[dict] = []
for item in vevents:
if not should_process_vevent(item.event, today):
continue
entries.extend(build_entries_for_event(item.event))
for event in raw_events:
if not should_process_calcium_event(event, today):
continue
if event.start_time is not None:
continue
entries.extend(build_entries_for_calcium_event(event))
if args.only_date:
try:
target_date = datetime.strptime(args.only_date, "%Y-%m-%d").date()
except ValueError:
print("Invalid --only-date value (expected YYYY-MM-DD).", file=sys.stderr)
return 1
singles_by_category: dict[str, List[dict]] = {}
recurring_by_category: dict[str, List[dict]] = {}
for entry in entries:
category = entry.get("category") or ""
if entry.get("recurrence_rule"):
if recurrence_occurs_on(
target_date, entry["recurrence_start"], entry["recurrence_rule"]
):
recurring_by_category.setdefault(category, []).append(entry)
else:
if entry["date"] == target_date:
singles_by_category.setdefault(category, []).append(entry)
entries = []
categories = set(singles_by_category) | set(recurring_by_category)
for category in sorted(categories):
singles = singles_by_category.get(category, [])
if singles:
entries.extend(singles)
continue
for entry in recurring_by_category.get(category, []):
single = dict(entry)
single["date"] = target_date
single["recurrence_rule"] = None
single["recurrence_start"] = None
entries.append(single)
else:
entries = adjust_entries_to_today(entries, tz)
if not entries:
print("No future staff entries found.")
return 0
deduped, removed, removed_entries = dedupe_entries(entries, tz)
initials_map = load_initials_map(INITIALS_FILE)
summarize_plan(deduped, removed, initials_map)
summarize_removed(removed_entries, initials_map)
if args.dry_run or args.list:
return 0
if not args.yes:
response = input("Create these entries? [Y]es / [n]o: ").strip().lower()
if response in {"n", "no"}:
print("Aborted.")
return 0
api = BookedAPI()
owner_id = None
if DEFAULT_OWNER_EMAIL:
owner = api.find_user_by_email(DEFAULT_OWNER_EMAIL)
if owner:
owner_id = int(owner["id"])
else:
print(f"Owner '{DEFAULT_OWNER_EMAIL}' not found.", file=sys.stderr)
return 1
resource = api.find_resource_by_name(DEFAULT_RESOURCE_NAME)
if not resource:
print(f"Resource '{DEFAULT_RESOURCE_NAME}' not found.", file=sys.stderr)
return 1
payloads = []
for entry in deduped:
payload = build_payload_from_entry(entry)
payload["resource_id"] = resource["resourceId"]
payloads.append(payload)
refs = create_reservations(api, payloads, initials_map, owner_id)
print(f"Created {len(refs)} reservations.")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
import argparse
import sys
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI # noqa: E402
PI_ATTRIBUTE_ID = 1
DEFAULT_RESOURCE = "P1 Prisma"
def parse_date(value: str) -> str:
try:
datetime.strptime(value, "%Y-%m-%d")
except ValueError as exc:
raise argparse.ArgumentTypeError(f"Invalid date: {value}") from exc
return value
def has_pi(reservation: dict, pi_value: str) -> bool:
for attr in reservation.get("customAttributes") or []:
if attr.get("id") == PI_ATTRIBUTE_ID and attr.get("value") == pi_value:
return True
return False
def main() -> int:
parser = argparse.ArgumentParser(
description="Delete reservations for a specific PI value on a resource."
)
parser.add_argument(
"--pi",
required=True,
help="Principal Investigator attribute value to delete",
)
parser.add_argument(
"--resource",
default=DEFAULT_RESOURCE,
help="Resource name (default: P1 Prisma)",
)
parser.add_argument(
"--start-date",
type=parse_date,
default="2000-01-01",
help="Start date (YYYY-MM-DD, default: 2000-01-01)",
)
parser.add_argument(
"--end-date",
type=parse_date,
default="2100-01-01",
help="End date (YYYY-MM-DD, default: 2100-01-01)",
)
parser.add_argument(
"--yes",
action="store_true",
help="Delete without confirmation",
)
args = parser.parse_args()
api = BookedAPI()
resource = api.find_resource_by_name(args.resource)
if not resource:
print(f"Resource '{args.resource}' not found.")
return 1
reservations = api.list_reservations(
resource_id=resource["resourceId"],
start_date=args.start_date,
end_date=args.end_date,
)
matches = [r for r in reservations if has_pi(r, args.pi)]
print(f"Found {len(matches)} reservations with PI '{args.pi}' on {resource['name']}.")
if not matches:
return 0
if not args.yes:
response = input("Delete these reservations? [Y]es / [n]o: ").strip().lower()
if response in {"n", "no"}:
print("Aborted.")
return 0
for res in matches:
ref = res.get("referenceNumber")
if not ref:
continue
result = api.delete_reservation(ref)
if result and isinstance(result, dict) and result.get("errors"):
print(f"{ref}: delete failed ({result['errors']})")
else:
print(f"{ref}: deleted")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
import argparse
import sys
from pathlib import Path
from typing import List
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI # noqa: E402
ALLOWED_RESOURCES = ("Staff", "Coverage", "P1 Prisma", "P2 Prisma Fit", "4T Varian")
DEFAULT_START_DATE = "2000-01-01"
DEFAULT_END_DATE = "2100-01-01"
def normalize_choice(response: str) -> str | None:
if response in {"", "n", "no"}:
return "no"
if response in {"y", "yes"}:
return "yes"
return None
def unique_refs(reservations: List[dict]) -> List[str]:
refs = []
seen = set()
for res in reservations:
ref = res.get("referenceNumber")
if not ref or ref in seen:
continue
seen.add(ref)
refs.append(ref)
return refs
def find_resource_by_exact_name(api: BookedAPI, name: str) -> dict:
resources = api.list_resources()
for res in resources:
if res.get("name", "").lower() == name.lower():
return res
raise ValueError(f"Resource '{name}' not found")
def main() -> int:
parser = argparse.ArgumentParser(
description="Delete all reservations for a specific Scandium resource."
)
parser.add_argument(
"--resource",
required=True,
choices=ALLOWED_RESOURCES,
help="Resource name to purge",
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip confirmation prompt",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="List reservations without deleting",
)
args = parser.parse_args()
api = BookedAPI()
resource = find_resource_by_exact_name(api, args.resource)
reservations = api.list_reservations(
DEFAULT_START_DATE,
DEFAULT_END_DATE,
resource_id=resource["resourceId"],
)
refs = unique_refs(reservations)
print(f"Resource: {resource['name']} (ID {resource['resourceId']})")
print(f"Found {len(reservations)} reservations, {len(refs)} unique references")
if args.dry_run:
for res in reservations:
ref = res.get("referenceNumber")
if not ref:
continue
title = res.get("title")
start = res.get("startDate")
end = res.get("endDate")
pi_value = None
for attr in res.get("customAttributes", []) or []:
attr_id = attr.get("attributeId") if "attributeId" in attr else attr.get("id")
if attr_id == 1:
pi_value = attr.get("attributeValue") or attr.get("value")
break
print(f"{ref} | title={title!r} | PI={pi_value} | {start} -> {end}")
return 0
if not args.yes:
response = input("Delete all reservations? [y/N]: ").strip().lower()
choice = normalize_choice(response)
if choice != "yes":
print("Cancelled.")
return 0
for idx, ref in enumerate(refs, start=1):
api.delete_reservation(ref, update_scope="full")
print(f"Deleted {idx}/{len(refs)}: {ref}")
print("Done")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from dataclasses import dataclass
from datetime import date, datetime, time
from pathlib import Path
from typing import Iterable
from dateutil.rrule import rrulestr
from icalendar import Event as ICalEvent
@dataclass
class Event:
raw_lines: list[str]
ical: ICalEvent | None
def parse_event(lines: list[str]) -> Event:
block = "\n".join(lines) + "\n"
try:
ical = ICalEvent.from_ical(block)
except Exception:
ical = None
return Event(lines, ical)
def iter_ical_datetimes(prop) -> Iterable[date | datetime]:
if not prop:
return []
props = prop if isinstance(prop, list) else [prop]
values: list[date | datetime] = []
for item in props:
if hasattr(item, "dts"):
values.extend([d.dt for d in item.dts])
elif hasattr(item, "dt"):
values.append(item.dt)
return values
def split_exdates(prop) -> tuple[set[date], set[datetime]]:
exdates_date: set[date] = set()
exdates_dt: set[datetime] = set()
for value in iter_ical_datetimes(prop):
if isinstance(value, datetime):
exdates_dt.add(value)
exdates_date.add(value.date())
elif isinstance(value, date):
exdates_date.add(value)
return exdates_date, exdates_dt
def split_rdates(prop) -> list[date | datetime]:
return list(iter_ical_datetimes(prop))
def as_date(value: date | datetime) -> date:
return value.date() if isinstance(value, datetime) else value
def event_has_remaining_occurrence(event: Event, today: date) -> bool:
if not event.ical:
return True
dtstart_prop = event.ical.get("DTSTART")
if not dtstart_prop or not hasattr(dtstart_prop, "dt"):
return True
dtstart = dtstart_prop.dt
exdates_date, exdates_dt = split_exdates(event.ical.get("EXDATE"))
rdates = split_rdates(event.ical.get("RDATE"))
rrule_prop = event.ical.get("RRULE")
def is_excluded(occ: date | datetime) -> bool:
if isinstance(occ, datetime):
if occ in exdates_dt:
return True
return occ.date() in exdates_date
return occ in exdates_date
def is_future(occ: date | datetime) -> bool:
return as_date(occ) >= today
for occ in rdates:
if is_future(occ) and not is_excluded(occ):
return True
if not rrule_prop:
dtstart_date = as_date(dtstart)
if dtstart_date >= today:
return not is_excluded(dtstart)
return False
if not hasattr(rrule_prop, "to_ical"):
return True
try:
rrule_str = rrule_prop.to_ical().decode("utf-8")
dtstart_dt = dtstart if isinstance(dtstart, datetime) else datetime.combine(dtstart, time.min)
rule = rrulestr(rrule_str, dtstart=dtstart_dt)
except Exception:
return True
search_start = datetime.combine(today, time.min)
occ = rule.after(search_start, inc=True)
while occ is not None:
if is_future(occ) and not is_excluded(occ):
return True
occ = rule.after(occ, inc=False)
return False
def main() -> None:
parser = argparse.ArgumentParser(
description="Filter VEVENT blocks with no remaining future occurrences."
)
parser.add_argument("input", help="Input VCALENDAR (.ics) file")
parser.add_argument("output", help="Output VCALENDAR (.ics) file")
args = parser.parse_args()
src = Path(args.input)
dst = Path(args.output)
today = date.today()
lines = src.read_text().splitlines()
out_lines: list[str] = []
i = 0
while i < len(lines):
line = lines[i]
if line == "BEGIN:VEVENT":
block = [line]
i += 1
while i < len(lines):
block.append(lines[i])
if lines[i] == "END:VEVENT":
break
i += 1
event = parse_event(block)
if event_has_remaining_occurrence(event, today):
out_lines.extend(block)
else:
out_lines.append(line)
i += 1
dst.write_text("\n".join(out_lines) + "\n")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import argparse
import html
import json
import os
import re
import sys
import textwrap
import readline
from bisect import bisect_left, bisect_right
try:
from prompt_toolkit import prompt as pt_prompt
HAS_PROMPT_TOOLKIT = True
except Exception:
HAS_PROMPT_TOOLKIT = False
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
from zoneinfo import ZoneInfo
import shutil
from threading import Lock
sys.path.insert(0, str(Path(__file__).resolve().parent))
from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402
from calcium_events import build_vevent, build_vevents, parse_calcium_events
from tqdm import tqdm
from create_scandium_clinical import extract_scanned_initials, has_contrast_indicator
DEFAULT_RESOURCE = "P1 Prisma"
DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = {
"P1 Prisma": "data/3T_Prisma_P1.Events",
"P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events",
"4T Varian": "data/4T_Varian_Inova.Events",
}
DEFAULT_TZ = "America/New_York"
PI_ATTRIBUTE_ID = 1
SCANNED_INITIALS_ATTRIBUTE_ID = 2
BOLD = "\033[1m"
RESET = "\033[0m"
ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
GREEN = "\033[92m"
ORANGE = "\033[93m"
BLUE = "\033[94m"
CACHE_LOCK = Lock()
EMAIL_RE = re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.IGNORECASE)
INITIALS_RE = re.compile(r"(?:Confirmed[- ]|[^A-Z])([A-Z]{2,3})(?:[^A-Z]|$)")
EMAIL_DOMAIN_SWAPS = {
"mgb.org": "mclean.harvard.edu",
"mclean.harvard.edu": "mgb.org",
}
@dataclass(frozen=True)
class EventView:
start: datetime
end: datetime
title: str
description: Optional[str]
details: List[Tuple[str, str]]
recurrence: str
reference: Optional[str]
compare_fields: Dict[str, str]
ai_payload: Dict[str, object]
calcium_event: Optional[Dict[str, str]]
scandium_detail: Optional[Dict[str, object]]
@dataclass(frozen=True)
class ParsedCalciumVEvent:
event: Dict[str, str]
start_dt: datetime
end_dt: datetime
duration: timedelta
rrule: Optional[str]
rule: Optional[Dict]
until_date: Optional[date]
exdates: set[date]
def parse_date(value: str) -> date:
return datetime.strptime(value, "%Y-%m-%d").date()
def parse_time_value(value: str) -> time:
return datetime.strptime(value, "%H:%M").time()
def normalize_summary(summary: str) -> str:
normalized = summary.replace("\\n", "\n")
normalized = normalized.replace("\\,", ",").replace("\\;", ";").replace("\\\\", "\\")
normalized = html.unescape(normalized)
return normalized.strip()
def extract_emails(text: str) -> List[str]:
return EMAIL_RE.findall(text or "")
def contains_spi_blocker(text: str) -> bool:
if not text:
return False
lowered = text.lower()
if "acr qa" in lowered:
return True
if "hd cleanup" in lowered or "hd clean up" in lowered:
return True
return False
def swap_email_domain(email: str) -> Optional[str]:
if "@" not in email:
return None
local, domain = email.rsplit("@", 1)
swapped = EMAIL_DOMAIN_SWAPS.get(domain.lower())
if not swapped:
return None
return f"{local}@{swapped}"
def extract_initials(text: str) -> List[str]:
if not text:
return []
if contains_spi_blocker(text):
return []
cleaned = re.sub(r"fbirn\s*&\s*hd\s*cleanup", "", text, flags=re.IGNORECASE)
return [match.group(1) for match in INITIALS_RE.finditer(cleaned)]
def extract_patient_source(text: str) -> Optional[str]:
if not text:
return None
cleaned = text.strip()
for prefix in ("OP-", "IP-"):
if cleaned.startswith(prefix):
return prefix[:-1]
for prefix in ("OP ", "IP "):
if cleaned.startswith(prefix):
return prefix.strip()
if "-" in cleaned:
lead = cleaned.split("-", 1)[0].strip()
if lead and lead.isalnum():
return lead
return None
def parse_clinical_prefix(summary: str) -> tuple[Optional[str], Optional[str], Optional[str], str]:
lines = summary.splitlines() if summary else []
if not lines:
return None, None, None, ""
first = lines[0]
match = re.match(
r"^\s*(?P<pat>[A-Za-z0-9]+)\s*-\s*(?P<body>.+?)\s*-\s*(?P<spi>[A-Za-z]{2,3})(?P<rest>.*)$",
first,
)
if not match:
return None, None, None, summary
pat = match.group("pat").strip()
body = match.group("body").strip()
spi = match.group("spi").strip()
rest = match.group("rest").strip().lstrip("-").strip()
new_lines: list[str] = []
if rest:
new_lines.append(rest)
new_lines.extend(lines[1:])
return pat, body, spi, "\n".join(new_lines).strip()
def extract_body_part(text: str) -> Optional[str]:
if not text:
return None
cleaned = text.strip()
for prefix in ("OP-", "IP-"):
if cleaned.startswith(prefix):
parts = cleaned.split("-", 2)
if len(parts) >= 2:
return parts[1].strip()
if "-" in cleaned:
parts = cleaned.split("-", 2)
if len(parts) >= 2:
return parts[1].strip()
return None
def normalize_compare_value(value: str) -> str:
cleaned = (value or "").lower()
cleaned = re.sub(r"\band\b", "", cleaned)
return re.sub(r"[^a-z0-9]+", "", cleaned)
def canonical_lines(
details: Optional[List[Tuple[str, str]]],
swap_body_label: bool = False,
) -> List[str]:
if not details:
return ["missing"]
lines: List[str] = []
for label, value in details:
clean_label = label
if swap_body_label:
if clean_label == "BOD":
clean_label = "Body part"
elif clean_label == "Body part":
clean_label = "BOD"
parts = (value or "").splitlines() or [""]
for idx, part in enumerate(parts):
if idx == 0:
lines.append(f"{clean_label}: {part}".strip())
else:
lines.append(part.strip())
return lines
def canonical_hash(
cal: Optional[EventView],
sca: Optional[EventView],
swap_body_label: bool = False,
) -> str:
left = "\n".join(canonical_lines(cal.details if cal else None, swap_body_label))
right = "\n".join(canonical_lines(sca.details if sca else None, swap_body_label))
payload = f"LEFT\n{left}\nRIGHT\n{right}"
import hashlib
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def resolve_accepted_hash(
cal: Optional[EventView],
sca: Optional[EventView],
accepted_hashes: set[str],
) -> Optional[str]:
primary = canonical_hash(cal, sca, swap_body_label=False)
if primary in accepted_hashes:
return primary
alternate = canonical_hash(cal, sca, swap_body_label=True)
if alternate in accepted_hashes:
return alternate
return None
def load_accepted_hashes(path: Path) -> set[str]:
if not path.exists():
return set()
try:
return {line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()}
except OSError:
return set()
def store_accepted_hash(path: Path, hash_value: str) -> None:
with path.open("a", encoding="utf-8") as handle:
handle.write(hash_value + "\n")
def parse_exdate_dates(value: str) -> set[date]:
if not value:
return set()
dates: set[date] = set()
for part in value.split(","):
chunk = part.strip()
if len(chunk) >= 8:
try:
dates.add(datetime.strptime(chunk[:8], "%Y%m%d").date())
except ValueError:
continue
return dates
def parse_ics_datetime(value: str, tz: ZoneInfo) -> Optional[datetime]:
if not value:
return None
if "T" not in value:
return None
if value.endswith("Z"):
iso = ics_datetime_to_iso(value)
dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z")
return dt.astimezone(tz)
dt = datetime.strptime(value, "%Y%m%dT%H%M%S")
return dt.replace(tzinfo=tz)
def parse_api_datetime(value: str, tz: ZoneInfo) -> Optional[datetime]:
if not value:
return None
cleaned = value
if cleaned.endswith("Z"):
cleaned = cleaned[:-1] + "+0000"
if len(cleaned) >= 6 and cleaned[-3] == ":":
cleaned = cleaned[:-3] + cleaned[-2:]
try:
dt = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z")
except ValueError:
return None
return dt.astimezone(tz)
def parse_termination_date(value: Optional[str], tz: ZoneInfo) -> Optional[date]:
if not value:
return None
cleaned = value
if cleaned.endswith("Z"):
cleaned = cleaned[:-1] + "+0000"
if cleaned[-3] == ":":
cleaned = cleaned[:-3] + cleaned[-2:]
return datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date()
def parse_booked_until_date(value: Optional[str]) -> Optional[date]:
if not value:
return None
cleaned = value
if cleaned.endswith("Z"):
cleaned = cleaned[:-1] + "+0000"
if cleaned[-3] == ":":
cleaned = cleaned[:-3] + cleaned[-2:]
try:
dt = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z")
except ValueError:
return None
return dt.date()
def parse_ics_until(value: str) -> Optional[date]:
if not value:
return None
cleaned = value.strip()
if len(cleaned) >= 8:
try:
return datetime.strptime(cleaned[:8], "%Y%m%d").date()
except ValueError:
return None
return None
def format_weekdays(weekdays: Iterable[int]) -> str:
names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]
output = []
for day in weekdays:
if 0 <= day < len(names):
output.append(names[day])
return ", ".join(output)
def format_ics_recurrence(rrule: Optional[str], start_dt: datetime) -> str:
if not rrule:
return "none"
parts = {}
for piece in rrule.split(";"):
if "=" in piece:
key, value = piece.split("=", 1)
parts[key] = value
freq = parts.get("FREQ", "").upper()
interval = parts.get("INTERVAL") or "1"
byday = parts.get("BYDAY", "")
until = parts.get("UNTIL", "")
freq_map = {
"DAILY": "daily",
"WEEKLY": "weekly",
"MONTHLY": "monthly",
"YEARLY": "yearly",
}
label = freq_map.get(freq, "none")
if label == "none":
return "none"
tokens = [f"{label} every {interval}"]
if label == "weekly":
day_map = {"SU": 0, "MO": 1, "TU": 2, "WE": 3, "TH": 4, "FR": 5, "SA": 6}
days = []
if byday:
for code in byday.split(","):
code = code.strip().upper()
if code in day_map:
days.append(day_map[code])
if not days:
days = [(start_dt.weekday() + 1) % 7]
readable = format_weekdays(days)
if readable:
tokens.append(f"on {readable}")
until_date = parse_ics_until(until)
if until_date:
tokens.append(f"until {until_date.isoformat()}")
return " ".join(tokens)
def recurrence_occurs_on(
target_date: date,
start_date: date,
rule: Dict,
) -> bool:
if target_date < start_date:
return False
interval = int(rule.get("interval", 1))
rtype = rule.get("type")
if rtype == "daily":
diff_days = (target_date - start_date).days
return diff_days % interval == 0
if rtype == "weekly":
weekdays = rule.get("weekdays") or []
booked_weekday = (target_date.weekday() + 1) % 7
if booked_weekday not in weekdays:
return False
diff_days = (target_date - start_date).days
return diff_days % (interval * 7) == 0
if rtype == "monthly":
if target_date.day != start_date.day:
return False
months = (target_date.year - start_date.year) * 12 + (
target_date.month - start_date.month
)
return months % interval == 0
if rtype == "yearly":
if (target_date.month, target_date.day) != (start_date.month, start_date.day):
return False
return (target_date.year - start_date.year) % interval == 0
return False
def calcium_occurrence_for_date(
event: Dict[str, str],
target_date: date,
tz: ZoneInfo,
) -> Optional[Tuple[datetime, datetime]]:
start_raw = event.get("DTSTART", "")
end_raw = event.get("DTEND", "")
start_dt = parse_ics_datetime(start_raw, tz)
end_dt = parse_ics_datetime(end_raw, tz)
if not start_dt or not end_dt:
return None
duration = end_dt - start_dt
rrule = event.get("RRULE")
exdates = parse_exdate_dates(event.get("EXDATE", ""))
if not rrule:
if start_dt.date() != target_date:
return None
return start_dt, end_dt
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None))
until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz)
if until_date and target_date > until_date:
return None
if target_date in exdates:
return None
if not recurrence_occurs_on(target_date, start_dt.date(), rule):
return None
new_start = datetime.combine(target_date, start_dt.timetz())
new_end = new_start + duration
return new_start, new_end
def build_event_view_from_calcium_occurrence(
event: Dict[str, str],
start_dt: datetime,
end_dt: datetime,
) -> EventView:
summary = normalize_summary(event.get("SUMMARY", "")) or "(no summary)"
description = event.get("DESCRIPTION")
categories = event.get("CATEGORIES")
rrule = event.get("RRULE")
recurrence_label = format_ics_recurrence(rrule, start_dt)
details: List[Tuple[str, str]] = []
if categories:
details.append(("TYP", categories))
details.append(("SUM", summary))
if recurrence_label != "none":
details.append(("REC", recurrence_label))
compare_fields = {
"TYP": categories or "",
"SUM": summary or "",
"REC": recurrence_label or "none",
}
patient_source = extract_patient_source(summary or "") or ""
body_part = extract_body_part(summary or "") or ""
spi_values = extract_initials(summary or "")
if categories and categories.strip().lower() == "open":
patient_source = ""
body_part = ""
spi_values = []
elif categories and "Clinical" in categories:
pat_parsed, body_parsed, spi_parsed, _cleaned = parse_clinical_prefix(summary or "")
if pat_parsed:
patient_source = pat_parsed
if body_parsed:
body_part = body_parsed
if spi_parsed:
spi_values = [spi_parsed]
ai_payload = {
"categories": categories or "",
"summary": summary or "",
"description": description or "",
"emails": extract_emails(summary or ""),
"initials": spi_values,
"patient_source": patient_source,
"body_part": body_part,
"recurrence": recurrence_label or "none",
}
return EventView(
start=start_dt,
end=end_dt,
title=summary,
description=description,
details=details,
recurrence=recurrence_label,
reference=None,
compare_fields=compare_fields,
ai_payload=ai_payload,
calcium_event=event,
scandium_detail=None,
)
def build_calcium_events_for_date(
events_path: Path,
preferences_path: Optional[Path],
tz: ZoneInfo,
target_date: date,
start_time: Optional[time],
end_time: Optional[time],
) -> List[EventView]:
vevents = build_vevents(
events_path,
category_filter=None,
tz_name=str(tz.key),
preferences_path=preferences_path,
exclude_categories=None,
debug=False,
use_utc=True,
)
results: List[EventView] = []
extra_open: List[Dict[str, str]] = []
for calcium_event in parse_calcium_events(
events_path,
category_filter=None,
exclude_categories=None,
preferences_path=preferences_path,
debug=False,
):
if calcium_event.categories:
continue
summary_text = (calcium_event.text or "").lstrip()
if summary_text[:4].lower() != "open":
continue
for vevent in build_vevent(
calcium_event,
tz,
category_value="Open",
use_utc=True,
debug=False,
):
if vevent:
extra_open.append(vevent)
def append_event(event: Dict[str, str]) -> None:
occurrence = calcium_occurrence_for_date(event, target_date, tz)
if not occurrence:
return
start_dt, end_dt = occurrence
if start_time and end_time:
if start_dt.timetz().replace(tzinfo=None) != start_time:
return
if end_dt.timetz().replace(tzinfo=None) != end_time:
return
results.append(build_event_view_from_calcium_occurrence(event, start_dt, end_dt))
for item in vevents:
append_event(item.event)
for event in extra_open:
append_event(event)
return results
def build_calcium_events_by_date(
events_path: Path,
preferences_path: Optional[Path],
tz: ZoneInfo,
dates: Iterable[date],
start_time: Optional[time],
end_time: Optional[time],
) -> Dict[date, List[EventView]]:
dates_sorted = sorted(dates)
results: Dict[date, List[EventView]] = {value: [] for value in dates_sorted}
if not dates_sorted:
return results
calcium_events = parse_calcium_events(
events_path,
category_filter=None,
exclude_categories=None,
preferences_path=preferences_path,
debug=False,
)
vevents: List[Dict[str, str]] = []
for calcium_event in calcium_events:
category_value = calcium_event.categories[0] if calcium_event.categories else None
if category_value:
for vevent in build_vevent(
calcium_event,
tz,
category_value=category_value,
use_utc=True,
debug=False,
):
if vevent:
vevents.append(vevent)
if calcium_event.categories:
continue
summary_text = (calcium_event.text or "").lstrip()
if summary_text[:4].lower() != "open":
continue
for vevent in build_vevent(
calcium_event,
tz,
category_value="Open",
use_utc=True,
debug=False,
):
if vevent:
vevents.append(vevent)
parsed: List[ParsedCalciumVEvent] = []
for event in vevents:
start_dt = parse_ics_datetime(event.get("DTSTART", ""), tz)
end_dt = parse_ics_datetime(event.get("DTEND", ""), tz)
if not start_dt or not end_dt:
continue
rrule = event.get("RRULE")
rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) if rrule else None
until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) if rule else None
parsed.append(
ParsedCalciumVEvent(
event=event,
start_dt=start_dt,
end_dt=end_dt,
duration=end_dt - start_dt,
rrule=rrule,
rule=rule,
until_date=until_date,
exdates=parse_exdate_dates(event.get("EXDATE", "")),
)
)
min_date = dates_sorted[0]
max_date = dates_sorted[-1]
for item in parsed:
start_date = item.start_dt.date()
if not item.rrule:
if start_date not in results:
continue
if start_time and end_time:
if item.start_dt.timetz().replace(tzinfo=None) != start_time:
continue
if item.end_dt.timetz().replace(tzinfo=None) != end_time:
continue
results[start_date].append(
build_event_view_from_calcium_occurrence(item.event, item.start_dt, item.end_dt)
)
continue
range_start = max(start_date, min_date)
range_end = min(item.until_date, max_date) if item.until_date else max_date
if range_end < range_start:
continue
start_idx = bisect_left(dates_sorted, range_start)
end_idx = bisect_right(dates_sorted, range_end)
for target_date in dates_sorted[start_idx:end_idx]:
if target_date in item.exdates:
continue
if not item.rule or not recurrence_occurs_on(target_date, start_date, item.rule):
continue
new_start = datetime.combine(target_date, item.start_dt.timetz())
new_end = new_start + item.duration
if start_time and end_time:
if new_start.timetz().replace(tzinfo=None) != start_time:
continue
if new_end.timetz().replace(tzinfo=None) != end_time:
continue
results[target_date].append(
build_event_view_from_calcium_occurrence(item.event, new_start, new_end)
)
return results
def format_recurrence(rule: Optional[Dict], start_dt: datetime) -> str:
if not rule:
return "none"
rule_type = rule.get("type") or "none"
if rule_type == "none":
return "none"
parts = [rule_type]
interval = rule.get("interval")
if interval:
parts.append(f"every {interval}")
weekdays = rule.get("weekdays") or []
if rule_type == "weekly":
normalized_days: List[int] = []
for day in weekdays:
try:
normalized_days.append(int(day))
except (TypeError, ValueError):
continue
if not normalized_days:
normalized_days = [(start_dt.weekday() + 1) % 7]
readable = format_weekdays(normalized_days)
if readable:
parts.append(f"on {readable}")
until = rule.get("repeatTerminationDate")
if until:
until_date = parse_booked_until_date(until)
if until_date:
parts.append(f"until {until_date.isoformat()}")
return " ".join(parts)
def build_scandium_event_view_from_detail(
detail: Dict[str, object],
tz: ZoneInfo,
) -> Optional[EventView]:
start_raw = detail.get("startDate") or detail.get("startDateTime")
end_raw = detail.get("endDate") or detail.get("endDateTime")
start_dt = parse_api_datetime(start_raw, tz) if start_raw else None
end_dt = parse_api_datetime(end_raw, tz) if end_raw else None
if not start_dt or not end_dt:
return None
title = (detail.get("title") or "").strip()
description = detail.get("description")
reference = detail.get("referenceNumber") or None
participant_items = detail.get("participants", [])
invitee_items = detail.get("invitees", [])
participants: List[str] = []
for entry in list(participant_items) + list(invitee_items):
if isinstance(entry, dict):
first = (entry.get("firstName") or "").strip()
last = (entry.get("lastName") or "").strip()
email = (entry.get("emailAddress") or "").strip()
label = " ".join(piece for piece in [first, last] if piece).strip()
if email:
if label:
label = f"{label} <{email}>"
else:
label = email
if label:
participants.append(label)
elif isinstance(entry, str):
label = entry.strip()
if label:
participants.append(label)
participant_text = ", ".join(p.strip() for p in participants if p.strip())
label_map = {
"patient source": "PAT",
"scanned person initials": "SPI",
"body part": "BOD",
"contrast": "CON",
}
attributes: List[Tuple[str, str]] = []
scan_type_value: Optional[str] = None
spi_value: Optional[str] = None
patient_source: Optional[str] = None
body_part: Optional[str] = None
for attr in detail.get("customAttributes", []):
label = (attr.get("label") or "").strip()
value = (attr.get("value") or "").strip()
label_key = label.lower()
if not label or not value:
continue
if label_key == "scan type / pi" and not scan_type_value:
scan_type_value = value
continue
if label_key == "scanned person initials" and not spi_value:
spi_value = value
if label_key == "patient source" and not patient_source:
patient_source = value
if label_key == "body part" and not body_part:
body_part = value
attributes.append((label_map.get(label_key, label), value))
recurrence = format_recurrence(detail.get("recurrenceRule"), start_dt)
details: List[Tuple[str, str]] = []
compare_fields: Dict[str, str] = {}
if scan_type_value:
details.append(("TYP", scan_type_value))
compare_fields["TYP"] = scan_type_value
if scan_type_value and scan_type_value.strip().lower() == "notice":
if description and title and description.strip() == title.strip():
title = ""
else:
if description and title and description.strip() == title.strip():
description = ""
if description:
details.append(("SUM", description))
compare_fields["SUM"] = description
if title:
details.append(("TIT", title))
if not compare_fields.get("SUM"):
compare_fields["SUM"] = title
if recurrence != "none":
details.append(("REC", recurrence))
compare_fields["REC"] = recurrence
if participant_text:
details.append(("PAR", participant_text))
for label, value in attributes:
details.append((label, value))
ai_payload = {
"scan_type_pi": scan_type_value or "",
"description": description or "",
"title": title,
"recurrence": recurrence or "none",
"participants": participants,
"attributes": {label: value for label, value in attributes},
"spi": spi_value or "",
"patient_source": patient_source or "",
"body_part": body_part or "",
}
return EventView(
start=start_dt,
end=end_dt,
title=title,
description=description,
details=details,
recurrence=recurrence,
reference=reference,
compare_fields=compare_fields,
ai_payload=ai_payload,
calcium_event=None,
scandium_detail=detail,
)
def build_scandium_events_for_date(
api: BookedAPI,
resource_name: str,
tz: ZoneInfo,
target_date: date,
start_time: Optional[time],
end_time: Optional[time],
resource: Optional[Dict[str, object]] = None,
) -> List[EventView]:
if resource is None:
resource = api.find_resource_by_name(resource_name)
if not resource:
raise RuntimeError(f"Resource not found: {resource_name}")
start_date = target_date.strftime("%Y-%m-%d")
end_date = (target_date + timedelta(days=1)).strftime("%Y-%m-%d")
reservations = api.list_reservations(
start_date=start_date,
end_date=end_date,
resource_id=resource["resourceId"],
)
results: List[EventView] = []
for reservation in reservations:
reference = reservation.get("referenceNumber", "")
detail = reservation
needs_detail = False
if reference:
for field in ("customAttributes", "participants", "invitees", "recurrenceRule"):
if field not in reservation:
needs_detail = True
break
if not needs_detail:
if not reservation.get("customAttributes") or reservation.get("recurrenceRule") is None:
needs_detail = True
if needs_detail:
try:
detail = api.get_reservation(reference)
except Exception:
detail = reservation
view = build_scandium_event_view_from_detail(detail, tz)
if not view:
continue
if view.start.date() != target_date:
continue
if start_time and end_time:
if view.start.timetz().replace(tzinfo=None) != start_time:
continue
if view.end.timetz().replace(tzinfo=None) != end_time:
continue
results.append(view)
return results
def group_by_time(events: Iterable[EventView]) -> Dict[Tuple[datetime, datetime], List[EventView]]:
grouped: Dict[Tuple[datetime, datetime], List[EventView]] = {}
for event in events:
key = (event.start, event.end)
grouped.setdefault(key, []).append(event)
return grouped
def format_event_label(event: EventView, width: int) -> str:
if not event.details:
title = event.title.strip() if event.title else "(no title)"
if event.description:
return f"{title}\n{event.description.strip()}"
return title
lines: List[str] = []
for key, value in event.details:
label = f"{key}: "
available = max(10, width - len(label))
wrapped = textwrap.wrap(value, width=available) or [""]
for idx, chunk in enumerate(wrapped):
prefix = label if idx == 0 else " " * len(label)
lines.append(f"{prefix}{BOLD}{chunk}{RESET}")
return "\n".join(lines)
def build_compare_key(calcium: EventView, scandium: EventView) -> str:
payload = {
"calcium": calcium.ai_payload,
"scandium": scandium.ai_payload,
}
return json.dumps(payload, sort_keys=True)
def build_ai_prompt(calcium: EventView, scandium: EventView) -> str:
return (
"You are comparing a Calcium calendar event (unstructured) to a Scandium event "
"(structured). Determine whether they refer to the same booking. The Scandium "
"side has structured fields (PAT, Body part, SPI, Participants, Title) that may "
"appear embedded in Calcium SUM. The first words of Calcium SUM often map to "
"Scandium TIT. Scanned person initials might be in Calcium SUM and Scandium SPI. "
"Body part wording can differ (e.g., 'L/Spine' vs 'L Spine'). "
"Emails or names in Calcium SUM often correspond to Scandium Participants. "
"If Scandium fields are contained in Calcium SUM, treat that as a match even if "
"the strings are not identical. If both sides provide PAT or Body part and they "
"conflict, mark as mismatch.\n\n"
"Calcium event:\n"
f"{json.dumps(calcium.ai_payload, indent=2)}\n\n"
"Scandium event:\n"
f"{json.dumps(scandium.ai_payload, indent=2)}\n\n"
"Respond with JSON:\n"
"{\n"
' "match": true|false,\n'
' "field_matches": {"TYP": "match|mismatch|unclear", "SUM": "match|mismatch|unclear", "REC": "match|mismatch|unclear"},\n'
' "notes": "short reason"\n'
"}\n"
)
def ai_compare(
calcium: EventView,
scandium: EventView,
model: str,
api_key: str,
cache: Dict[str, Dict[str, object]],
) -> Dict[str, object]:
key = build_compare_key(calcium, scandium)
with CACHE_LOCK:
if key in cache:
return cache[key]
import requests
prompt = build_ai_prompt(calcium, scandium)
response = requests.post(
"https://api.openai.com/v1/responses",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": model,
"input": [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": prompt,
}
],
}
],
},
timeout=30,
)
if response.status_code != 200:
raise RuntimeError(f"OpenAI error {response.status_code}: {response.text}")
payload = response.json()
text = ""
for item in payload.get("output", []):
if item.get("type") == "message":
for content in item.get("content", []):
if content.get("type") == "output_text":
text += content.get("text", "")
try:
result = json.loads(text)
except json.JSONDecodeError:
result = {"match": False, "field_matches": {}, "notes": "AI response parse error"}
with CACHE_LOCK:
cache[key] = result
return result
def compare_events(
calcium: EventView,
scandium: EventView,
use_ai: bool,
model: str,
api_key: Optional[str],
cache: Dict[str, Dict[str, object]],
) -> Dict[str, object]:
calcium_spi = set(calcium.ai_payload.get("initials", []) or [])
scandium_spi = (scandium.ai_payload.get("spi") or "").strip()
if scandium_spi and calcium_spi and scandium_spi not in calcium_spi:
return {
"match": False,
"field_matches": {"SUM": "mismatch"},
"notes": "initials mismatch",
"source": "guard",
}
calcium_typ = calcium.compare_fields.get("TYP", "")
scandium_typ = scandium.compare_fields.get("TYP", "")
if calcium_typ in {"PM / Service", "PM-Service", "PM Service", "PM/Service"} and scandium_typ == "Service":
calcium_typ = "Service"
exact = {
"TYP": calcium_typ == scandium_typ,
"SUM": calcium.compare_fields.get("SUM", "") == scandium.compare_fields.get("SUM", ""),
"REC": calcium.compare_fields.get("REC", "") == scandium.compare_fields.get("REC", ""),
}
if all(exact.values()):
return {
"match": True,
"field_matches": {key: "match" for key in exact},
"notes": "exact match",
"source": "exact",
}
if not use_ai or not api_key:
mismatched = [key for key, ok in exact.items() if not ok]
return {
"match": False,
"field_matches": {key: "mismatch" if key in mismatched else "match" for key in exact},
"notes": "exact mismatch",
"source": "exact",
}
try:
result = ai_compare(calcium, scandium, model, api_key, cache)
# Post-check: PAT/body mismatches override AI match when both sides provide values.
calcium_pat = (calcium.ai_payload.get("patient_source") or "").strip()
scandium_pat = (scandium.ai_payload.get("patient_source") or "").strip()
calcium_body = (calcium.ai_payload.get("body_part") or "").strip()
scandium_body = (scandium.ai_payload.get("body_part") or "").strip()
if result.get("match"):
if scandium_pat and calcium_pat:
if normalize_compare_value(scandium_pat) != normalize_compare_value(calcium_pat):
result["match"] = False
field_matches = result.get("field_matches") or {}
field_matches["SUM"] = "mismatch"
result["field_matches"] = field_matches
result["notes"] = "patient source mismatch"
if result.get("match") and scandium_body and calcium_body:
norm_scandium_body = normalize_compare_value(scandium_body)
norm_calcium_body = normalize_compare_value(calcium_body)
if norm_scandium_body != norm_calcium_body:
if norm_scandium_body not in norm_calcium_body and norm_calcium_body not in norm_scandium_body:
result["match"] = False
field_matches = result.get("field_matches") or {}
field_matches["SUM"] = "mismatch"
result["field_matches"] = field_matches
result["notes"] = "body part mismatch"
result["source"] = "ai"
return result
except Exception as exc:
mismatched = [key for key, ok in exact.items() if not ok]
return {
"match": False,
"field_matches": {key: "mismatch" if key in mismatched else "match" for key in exact},
"notes": f"ai error: {exc}",
"source": "ai-error",
}
def input_with_default(label: str, current: str) -> str:
prompt = f"{label}: "
if HAS_PROMPT_TOOLKIT:
response = pt_prompt(prompt, default=current or "").strip()
else:
def hook() -> None:
readline.insert_text(current)
readline.redisplay()
readline.set_startup_hook(hook)
try:
response = input(prompt).strip()
finally:
readline.set_startup_hook(None)
if response == "":
return current
if response == "-":
return ""
return response
def input_sum_with_default(current: str, calcium_sum: str) -> str:
suffix = " (c=copy from calcium, -=clear)"
label = f"SUM (Description){suffix}"
prompt = f"{label}: "
if HAS_PROMPT_TOOLKIT:
response = pt_prompt(prompt, default=current or "").strip()
if response == "":
return current
else:
def hook() -> None:
readline.insert_text(current)
readline.redisplay()
readline.set_startup_hook(hook)
try:
response = input(prompt).strip()
finally:
readline.set_startup_hook(None)
if response == "":
return current
if response == "-":
return ""
if response.lower() == "c" and calcium_sum:
return calcium_sum
return response
def prompt_yes_no(prompt: str, default: bool = False) -> bool:
suffix = " [Y/n/h]: " if default else " [y/N/h]: "
while True:
response = input(prompt + suffix).strip().lower()
if response == "h":
print("Options: y=yes, n=no.")
continue
if not response:
return default
return response in {"y", "yes"}
def prompt_yes_no_accept(
prompt: str,
default: bool = False,
include_reload: bool = False,
include_clear: bool = False,
include_update_rec: bool = False,
) -> str:
if include_reload and include_clear and include_update_rec:
suffix = " [y/N/a/r/c/u/h]: " if not default else " [Y/n/a/r/c/u/h]: "
help_text = "Options: y=yes, n=no, a=accept, r=reload, c=clear SUM, u=update REC."
elif include_reload and include_clear:
suffix = " [y/N/a/r/c/h]: " if not default else " [Y/n/a/r/c/h]: "
help_text = "Options: y=yes, n=no, a=accept, r=reload, c=clear SUM."
elif include_reload and include_update_rec:
suffix = " [y/N/a/r/u/h]: " if not default else " [Y/n/a/r/u/h]: "
help_text = "Options: y=yes, n=no, a=accept, r=reload, u=update REC."
elif include_clear and include_update_rec:
suffix = " [y/N/a/c/u/h]: " if not default else " [Y/n/a/c/u/h]: "
help_text = "Options: y=yes, n=no, a=accept, c=clear SUM, u=update REC."
elif include_reload:
suffix = " [y/N/a/r/h]: " if not default else " [Y/n/a/r/h]: "
help_text = "Options: y=yes, n=no, a=accept, r=reload."
elif include_clear:
suffix = " [y/N/a/c/h]: " if not default else " [Y/n/a/c/h]: "
help_text = "Options: y=yes, n=no, a=accept, c=clear SUM."
elif include_update_rec:
suffix = " [y/N/a/u/h]: " if not default else " [Y/n/a/u/h]: "
help_text = "Options: y=yes, n=no, a=accept, u=update REC."
else:
suffix = " [y/N/a/h]: " if not default else " [Y/n/a/h]: "
help_text = "Options: y=yes, n=no, a=accept."
while True:
response = input(prompt + suffix).strip().lower()
if response == "h":
print(help_text)
continue
if response == "r" and include_reload:
return "reload"
if response == "c" and include_clear:
return "clear"
if response == "u" and include_update_rec:
return "update_rec"
if response == "a":
return "accept"
if not response:
return "yes" if default else "no"
return "yes" if response in {"y", "yes"} else "no"
def prompt_edit_scope(
is_recurring: bool,
default_all: bool = False,
) -> str:
if not is_recurring:
return "this"
while True:
prompt = "Edit scope [t]his/[a]ll/[h]: "
if default_all:
prompt = "Edit scope [t]his/[A]ll/[h]: "
response = input(prompt).strip().lower()
if response == "h":
print("Options: t=this instance, a=all instances.")
continue
if not response:
return "full" if default_all else "this"
if response.startswith("a"):
return "full"
return "this"
def update_reservation_raw(
api: BookedAPI,
reference: str,
data: Dict[str, object],
update_scope: str,
) -> Dict:
return api._request(
"POST",
f"Reservations/{reference}",
params={"updateScope": update_scope},
json=data,
)
def update_scandium_event(
api: BookedAPI,
sca: EventView,
tz: ZoneInfo,
fields: Optional[set[str]] = None,
calcium: Optional[EventView] = None,
prefill: Optional[Dict[str, str]] = None,
prompt: bool = True,
update_scope_override: Optional[str] = None,
recurrence_rule_override: Optional[Dict[str, object]] = None,
) -> Optional[EventView]:
detail = sca.scandium_detail or {}
reference = sca.reference
if not reference:
return sca
title_current = (detail.get("title") or "").strip()
desc_current = (detail.get("description") or "").strip()
typ_current = ""
pat_current = ""
body_current = ""
spi_current = ""
attr_map: Dict[str, Tuple[int, str]] = {}
for attr in detail.get("customAttributes", []):
label = (attr.get("label") or "").strip()
value = (attr.get("value") or "").strip()
if label and value:
attr_map[label.lower()] = (int(attr.get("id")), value)
if "scan type / pi" in attr_map:
typ_current = attr_map["scan type / pi"][1]
if "patient source" in attr_map:
pat_current = attr_map["patient source"][1]
if "body part" in attr_map:
body_current = attr_map["body part"][1]
if "scanned person initials" in attr_map:
spi_current = attr_map["scanned person initials"][1]
if not typ_current:
typ_current = sca.compare_fields.get("TYP", "")
if not pat_current:
pat_current = (sca.ai_payload.get("patient_source") or "")
if not body_current:
body_current = (sca.ai_payload.get("body_part") or "")
if not spi_current:
spi_current = (sca.ai_payload.get("spi") or "")
fields = fields or {"TYP", "PAT", "BODY", "SPI", "SUM"}
typ_input = typ_current
sum_input = desc_current
tit_input = title_current
pat_input = pat_current
body_input = body_current
spi_input = spi_current
par_input = ""
rec_input = sca.recurrence or "none"
if prefill and "TYP" in prefill:
typ_current = prefill["TYP"]
typ_input = prefill["TYP"]
if "TYP" in fields and prompt:
typ_input = input_with_default("TYP (Scan Type / PI)", typ_current)
if prefill and "SPI" in prefill:
spi_current = prefill["SPI"]
spi_input = prefill["SPI"]
if "SPI" in fields and prompt:
spi_input = input_with_default("SPI (Scanned Person Initials)", spi_current)
typ_is_clinical = (typ_input or "").strip().lower() == "clinical"
typ_changed_from_clinical_to_open = (
(typ_current or "").strip().lower() == "clinical"
and (typ_input or "").strip().lower() == "open"
)
if prefill and "PAT" in prefill:
pat_current = prefill["PAT"]
pat_input = prefill["PAT"]
if "PAT" in fields and typ_is_clinical and prompt:
pat_input = input_with_default("PAT (Patient Source)", pat_current)
else:
pat_input = ""
if prefill and "BODY" in prefill:
body_current = prefill["BODY"]
body_input = prefill["BODY"]
if "BODY" in fields and typ_is_clinical and prompt:
body_input = input_with_default("Body part", body_current)
else:
body_input = ""
if typ_changed_from_clinical_to_open:
body_input = ""
spi_input = ""
# Prompt SUM last.
if prefill and "SUM" in prefill:
sum_input = prefill["SUM"]
if "SUM" in fields and prompt:
calcium_sum = ""
if calcium:
calcium_sum = calcium.compare_fields.get("SUM", "")
sum_input = input_sum_with_default(sum_input, calcium_sum)
participants_current = []
for p in list(detail.get("participants", [])) + list(detail.get("invitees", [])):
if isinstance(p, dict):
email = (p.get("emailAddress") or "").strip()
if email:
participants_current.append(email)
elif isinstance(p, str) and p.strip():
participants_current.append(p.strip())
par_current = ", ".join(participants_current)
par_input = par_current
rec_current = sca.recurrence or "none"
rec_input = rec_current
if update_scope_override:
update_scope = update_scope_override
else:
update_scope = prompt_edit_scope(bool(detail.get("isRecurring")))
custom_attrs: List[Dict[str, object]] = []
label_to_id = {
"Scan Type / PI": 1,
"Patient Source": 11,
"Body part": 10,
"Scanned Person Initials": 2,
}
def apply_attr(label: str, value: str) -> None:
custom_attrs.append({"attributeId": label_to_id[label], "attributeValue": value})
apply_attr("Scan Type / PI", typ_input or "")
apply_attr("Patient Source", pat_input or "")
apply_attr("Body part", body_input or "")
apply_attr("Scanned Person Initials", spi_input or "")
participant_ids: List[int] = []
if par_input.strip():
for raw in par_input.split(","):
email = raw.strip()
if not email:
continue
user = api.find_user_by_email(email)
if user:
participant_ids.append(user["id"])
else:
print(f"Participant not found: {email}")
recurrence_rule = None
if rec_input.strip().lower() == "none":
recurrence_rule = {"type": "none"}
elif rec_input.strip() and rec_input.strip() != rec_current:
rrule = rec_input.strip()
if rrule.upper().startswith("RRULE:"):
rrule = rrule.split(":", 1)[1]
recurrence_rule = parse_rrule(rrule, sca.start.replace(tzinfo=None))
data = {
"resourceId": int(detail.get("resourceId")),
"userId": api.get_user_id(),
"startDateTime": detail.get("startDate") or detail.get("startDateTime"),
"endDateTime": detail.get("endDate") or detail.get("endDateTime"),
"title": tit_input,
"description": sum_input or "",
"participants": participant_ids,
"customAttributes": custom_attrs,
}
if recurrence_rule_override is not None:
data["recurrenceRule"] = recurrence_rule_override
elif recurrence_rule is not None:
data["recurrenceRule"] = recurrence_rule
update_reservation_raw(api, reference, data, update_scope)
new_detail = api.get_reservation(reference)
return build_scandium_event_view_from_detail(new_detail, tz)
def build_event_payload_simple(
api: BookedAPI,
event: Dict[str, str],
resource: Dict[str, object],
tz: ZoneInfo,
) -> Dict[str, object]:
dtstart = event.get("DTSTART", "")
dtend = event.get("DTEND", "")
if not dtstart or not dtend:
raise ValueError("DTSTART and DTEND are required")
start_iso = ics_datetime_to_iso(dtstart)
end_iso = ics_datetime_to_iso(dtend)
summary_raw = event.get("SUMMARY", "").strip()
summary = normalize_summary(summary_raw)
categories = event.get("CATEGORIES", "")
is_open_category = categories.strip().lower() == "open"
summary_clean = summary
title = summary_clean.splitlines()[0].strip() if summary_clean else ""
matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories)
if not matched_pi and categories.strip():
matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories.split()[0])
if not matched_pi:
matched_pi = categories.strip() or "Unknown"
initials: List[str] = []
if not is_open_category:
initials = extract_initials(summary)
custom_attributes = [
{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": matched_pi}
]
if initials:
custom_attributes.append(
{
"attributeId": SCANNED_INITIALS_ATTRIBUTE_ID,
"attributeValue": initials[0],
}
)
participant_ids: List[int] = []
seen_user_ids: set[int] = set()
if not is_open_category:
for email in extract_emails(summary):
user = api.find_user_by_email(email)
if not user:
swapped = swap_email_domain(email)
if swapped:
user = api.find_user_by_email(swapped)
if user:
user_id = int(user["id"])
if user_id not in seen_user_ids:
participant_ids.append(user_id)
seen_user_ids.add(user_id)
recurrence_rule = None
if event.get("RRULE"):
start_dt = parse_ics_datetime(dtstart, tz)
if start_dt:
recurrence_rule = parse_rrule(event.get("RRULE", ""), start_dt.replace(tzinfo=None))
is_notice_category = categories.strip().lower() == "notice"
if title and summary_clean and title.strip() == summary_clean.strip():
if is_notice_category:
title = ""
else:
summary_clean = ""
if is_open_category:
title = ""
return {
"resource_id": resource["resourceId"],
"start_datetime": start_iso,
"end_datetime": end_iso,
"title": title,
"description": summary_clean or None,
"participants": participant_ids if participant_ids else None,
"custom_attributes": custom_attributes,
"recurrence_rule": recurrence_rule,
}
def create_scandium_from_calcium(
api: BookedAPI,
resource: Dict[str, object],
tz: ZoneInfo,
calcium_event: Dict[str, str],
) -> Optional[EventView]:
categories = calcium_event.get("CATEGORIES", "") or ""
clinical_created = False
if "Clinical" in categories:
description_raw = calcium_event.get("SUMMARY", "")
pat, body, spi, description = parse_clinical_prefix(description_raw)
if spi and contains_spi_blocker(description_raw):
spi = None
if not spi:
spi = extract_scanned_initials(description_raw)
pi_value = api.find_attribute_value(1, "Clinical") or "Clinical"
custom_attrs = [{"attributeId": 1, "attributeValue": pi_value}]
if pat:
custom_attrs.append({"attributeId": 11, "attributeValue": pat})
if body:
custom_attrs.append({"attributeId": 10, "attributeValue": body})
if spi:
custom_attrs.append({"attributeId": 2, "attributeValue": spi})
if has_contrast_indicator(description):
custom_attrs.append({"attributeId": 3, "attributeValue": "Yes"})
recurrence_rule = None
if calcium_event.get("RRULE"):
start_dt = parse_ics_datetime(calcium_event.get("DTSTART", ""), tz)
if start_dt:
recurrence_rule = parse_rrule(
calcium_event.get("RRULE", ""), start_dt.replace(tzinfo=None)
)
result = api.create_reservation(
resource_id=resource["resourceId"],
start_datetime=ics_datetime_to_iso(calcium_event.get("DTSTART", "")),
end_datetime=ics_datetime_to_iso(calcium_event.get("DTEND", "")),
title="",
description=description,
custom_attributes=custom_attrs,
recurrence_rule=recurrence_rule,
)
clinical_created = True
elif "Service" in categories:
service_value = api.find_attribute_value(PI_ATTRIBUTE_ID, "Service") or "Service"
start_iso = ics_datetime_to_iso(calcium_event.get("DTSTART", ""))
end_iso = ics_datetime_to_iso(calcium_event.get("DTEND", ""))
summary = calcium_event.get("SUMMARY", "")
recurrence_rule = None
if calcium_event.get("RRULE"):
start_dt = parse_ics_datetime(calcium_event.get("DTSTART", ""), tz)
if start_dt:
recurrence_rule = parse_rrule(
calcium_event.get("RRULE", ""), start_dt.replace(tzinfo=None)
)
result = api.create_reservation(
resource_id=resource["resourceId"],
start_datetime=start_iso,
end_datetime=end_iso,
title="",
description=normalize_summary(summary),
custom_attributes=[{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": service_value}],
recurrence_rule=recurrence_rule,
)
else:
payload = build_event_payload_simple(api, calcium_event, resource, tz)
result = api.create_reservation(
resource_id=payload["resource_id"],
start_datetime=payload["start_datetime"],
end_datetime=payload["end_datetime"],
title=payload["title"],
description=payload.get("description"),
participants=payload.get("participants"),
custom_attributes=payload.get("custom_attributes"),
recurrence_rule=payload.get("recurrence_rule"),
)
reference = result.get("referenceNumber")
if not reference:
print(f"Create failed: {result}")
return None
new_detail = api.get_reservation(reference)
view = build_scandium_event_view_from_detail(new_detail, tz)
return view
def build_pairs(
calcium_events: List[EventView],
scandium_events: List[EventView],
date_value: date,
) -> List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]]:
def normalize_match_text(text: str) -> str:
cleaned = " ".join(text.strip().split())
return cleaned.lower()
def normalize_open_summary(text: str) -> str:
cleaned = normalize_match_text(text)
return re.sub(r"^open\b\s*[-:]*\s*", "", cleaned, flags=re.IGNORECASE)
def normalize_typ_for_pairing(raw: str) -> str:
cleaned = raw.strip().lower()
if not cleaned:
return ""
if "service" in cleaned and "pm" in cleaned:
return "service"
if "/" in cleaned:
cleaned = cleaned.split("/")[-1].strip()
if cleaned:
cleaned = cleaned.split()[0]
return cleaned
def match_fields(event: EventView) -> Tuple[str, str]:
typ = (event.compare_fields.get("TYP") or "").strip()
summary = (event.compare_fields.get("SUM") or "").strip()
if not typ and summary.lower().startswith("open"):
typ = "Open"
typ_key = normalize_typ_for_pairing(typ)
if typ_key == "open":
summary = normalize_open_summary(summary)
else:
summary = normalize_match_text(summary)
return (typ_key, summary)
calcium_grouped = group_by_time(calcium_events)
scandium_grouped = group_by_time(scandium_events)
all_keys = sorted(
set(calcium_grouped) | set(scandium_grouped),
key=lambda key: (key[0], key[1]),
)
pairs: List[
Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]
] = []
for key in all_keys:
calcium_list = calcium_grouped.get(key, [])
scandium_list = scandium_grouped.get(key, [])
if not calcium_list or not scandium_list:
max_count = max(len(calcium_list), len(scandium_list))
for idx in range(max_count):
cal = calcium_list[idx] if idx < len(calcium_list) else None
sca = scandium_list[idx] if idx < len(scandium_list) else None
pairs.append((cal, sca, key, date_value))
continue
cal_remaining = list(range(len(calcium_list)))
sca_remaining = list(range(len(scandium_list)))
matched: List[Tuple[int, int]] = []
sca_by_exact: Dict[Tuple[str, str], List[int]] = {}
for idx in sca_remaining:
sca_key = match_fields(scandium_list[idx])
sca_by_exact.setdefault(sca_key, []).append(idx)
for cal_idx in cal_remaining[:]:
cal_key = match_fields(calcium_list[cal_idx])
candidates = sca_by_exact.get(cal_key, [])
if candidates:
sca_idx = candidates.pop(0)
matched.append((cal_idx, sca_idx))
cal_remaining.remove(cal_idx)
sca_remaining.remove(sca_idx)
sca_by_typ: Dict[str, List[int]] = {}
for idx in sca_remaining:
typ, _ = match_fields(scandium_list[idx])
sca_by_typ.setdefault(typ, []).append(idx)
for cal_idx in cal_remaining[:]:
typ, _ = match_fields(calcium_list[cal_idx])
candidates = sca_by_typ.get(typ, [])
if candidates:
sca_idx = candidates.pop(0)
matched.append((cal_idx, sca_idx))
cal_remaining.remove(cal_idx)
sca_remaining.remove(sca_idx)
for cal_idx, sca_idx in matched:
pairs.append((calcium_list[cal_idx], scandium_list[sca_idx], key, date_value))
for cal_idx in cal_remaining:
pairs.append((calcium_list[cal_idx], None, key, date_value))
for sca_idx in sca_remaining:
pairs.append((None, scandium_list[sca_idx], key, date_value))
return pairs
def precompute_ai_results(
pairs: List[
Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]
],
hashes: List[str],
accepted_hashes: set[str],
use_ai: bool,
model: str,
api_key: Optional[str],
) -> Dict[int, Dict[str, object]]:
cache: Dict[str, Dict[str, object]] = {}
results: Dict[int, Dict[str, object]] = {}
to_schedule: List[Tuple[int, EventView, EventView]] = []
for idx, (cal, sca, _key, _date_value) in enumerate(pairs):
accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes)
if not cal or not sca:
if accepted_hash:
results[idx] = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
continue
if accepted_hash:
results[idx] = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
continue
prelim = compare_events(cal, sca, False, model, api_key, cache)
if not use_ai or not api_key or prelim.get("source") == "guard" or prelim.get("match") is True:
results[idx] = prelim
continue
to_schedule.append((idx, cal, sca))
if not use_ai or not api_key or not to_schedule:
return results
max_workers = min(20, len(to_schedule))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(compare_events, cal, sca, True, model, api_key, cache): idx
for (idx, cal, sca) in to_schedule
}
for future in tqdm(as_completed(future_map), total=len(future_map), desc="AI compare"):
idx = future_map[future]
results[idx] = future.result()
return results
def compute_compare_results_for_pairs(
pairs: List[
Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]
],
hashes: List[str],
accepted_hashes: set[str],
use_ai: bool,
model: str,
api_key: Optional[str],
cache: Optional[Dict[str, Dict[str, object]]] = None,
) -> List[Optional[Dict[str, object]]]:
if cache is None:
cache = {}
results_list: List[Optional[Dict[str, object]]] = [None] * len(pairs)
to_schedule: List[Tuple[int, EventView, EventView]] = []
for idx, (cal, sca, _key, _date_value) in enumerate(pairs):
accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes)
if not cal or not sca:
if accepted_hash:
results_list[idx] = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
continue
if accepted_hash:
results_list[idx] = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
continue
prelim = compare_events(cal, sca, False, model, api_key, cache)
if not use_ai or not api_key or prelim.get("source") == "guard" or prelim.get("match") is True:
results_list[idx] = prelim
continue
to_schedule.append((idx, cal, sca))
if not use_ai or not api_key or not to_schedule:
return results_list
max_workers = min(20, len(to_schedule))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(compare_events, cal, sca, True, model, api_key, cache): idx
for (idx, cal, sca) in to_schedule
}
for future in tqdm(as_completed(future_map), total=len(future_map), desc="AI compare"):
idx = future_map[future]
results_list[idx] = future.result()
return results_list
def side_by_side_lines(
left: str,
right: str,
width: int,
gutter: int,
) -> List[str]:
left_lines = left.splitlines() or [""]
right_lines = right.splitlines() or [""]
rows = max(len(left_lines), len(right_lines))
output: List[str] = []
def pad_to_width(text: str) -> str:
visible_len = len(ANSI_RE.sub("", text))
if visible_len >= width:
return text
return text + (" " * (width - visible_len))
for idx in range(rows):
left_text = left_lines[idx] if idx < len(left_lines) else ""
right_text = right_lines[idx] if idx < len(right_lines) else ""
output.append(
f"{pad_to_width(left_text)}{' ' * gutter}{pad_to_width(right_text)}"
)
return output
def display_pairs(
dates_sorted: List[date],
pairs_by_date: Dict[
date,
List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]],
],
hashes_by_date: Dict[date, List[str]],
compare_results_by_date: Dict[date, List[Optional[Dict[str, object]]]],
calcium_by_date: Dict[date, List[EventView]],
scandium_by_date: Dict[date, List[EventView]],
accepted_hashes: set[str],
memory_path: Path,
tz: ZoneInfo,
use_ai: bool,
model: str,
api_key: Optional[str],
api: BookedAPI,
resource: Dict[str, object],
resource_name: str,
typ_filter: Optional[str],
start_time: Optional[time],
end_time: Optional[time],
) -> None:
terminal_width = shutil.get_terminal_size((120, 20)).columns
gutter = 4
column_width = max(30, (terminal_width - gutter) // 2)
if not pairs_by_date:
print("No events found on either side.")
return
def recurrence_dates_for_event(
calcium_event: Dict[str, str],
dates: Iterable[date],
) -> List[date]:
affected: List[date] = []
for candidate in dates:
occurrence = calcium_occurrence_for_date(calcium_event, candidate, tz)
if not occurrence:
continue
if start_time and end_time:
occ_start, occ_end = occurrence
if occ_start.timetz().replace(tzinfo=None) != start_time:
continue
if occ_end.timetz().replace(tzinfo=None) != end_time:
continue
affected.append(candidate)
return affected
def refresh_scandium_for_dates(dates_to_refresh: Iterable[date]) -> None:
for date_value in dates_to_refresh:
scandium_events = build_scandium_events_for_date(
api=api,
resource_name=resource_name,
tz=tz,
target_date=date_value,
start_time=start_time,
end_time=end_time,
resource=resource,
)
if typ_filter:
scandium_events = [
event
for event in scandium_events
if (event.compare_fields.get("TYP") or "").lower() == typ_filter.lower()
]
scandium_by_date[date_value] = scandium_events
pairs_by_date[date_value] = build_pairs(
calcium_by_date.get(date_value, []),
scandium_events,
date_value,
)
hashes = [
canonical_hash(cal, sca)
for cal, sca, _key, _date in pairs_by_date[date_value]
]
hashes_by_date[date_value] = hashes
compare_results_by_date[date_value] = compute_compare_results_for_pairs(
pairs_by_date[date_value],
hashes,
accepted_hashes,
use_ai=use_ai,
model=model,
api_key=api_key,
)
for date_value in dates_sorted:
pairs = pairs_by_date.get(date_value, [])
hashes = hashes_by_date.get(date_value, [])
compare_results = compare_results_by_date.get(date_value, [])
if not pairs:
continue
for idx, (cal, sca, key, _date_value) in enumerate(pairs):
start_dt, end_dt = key
compare_result = compare_results[idx] if idx < len(compare_results) else None
while True:
current_hash = canonical_hash(cal, sca)
if idx < len(hashes):
hashes[idx] = current_hash
accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes)
print("\n\n\n\n")
start_label = start_dt.astimezone(tz).strftime("%Y-%m-%d %H:%M")
end_label = end_dt.astimezone(tz).strftime("%H:%M")
header = f"{start_label} -> {end_label}"
print("=" * terminal_width)
print(header.center(terminal_width))
print("-" * terminal_width)
sca_label = "Scandium"
if sca and sca.reference:
sca_label = f"Scandium ({sca.reference})"
sca_link = f"https://scandium.mclean.harvard.edu/reservation/?rn={sca.reference}"
print(f"{''.ljust(column_width)}{' ' * gutter}{sca_link.ljust(column_width)}")
print(f"{'Calcium'.ljust(column_width)}{' ' * gutter}{sca_label.ljust(column_width)}")
print("-" * terminal_width)
left_text = format_event_label(cal, column_width) if cal else "missing"
right_text = format_event_label(sca, column_width) if sca else "missing"
for line in side_by_side_lines(
left_text, right_text, width=column_width, gutter=gutter
):
print(line)
if accepted_hash:
compare_result = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
if compare_result:
match_label = "MATCH" if compare_result.get("match") else "DIFF"
field_matches = compare_result.get("field_matches", {})
mismatched = [
key for key, status in field_matches.items() if status == "mismatch"
]
diff_line = "DIFF: none" if not mismatched else f"DIFF: {', '.join(mismatched)}"
source = compare_result.get("source", "exact")
notes = compare_result.get("notes", "")
note_suffix = ""
if source == "ai-error" and notes:
note_suffix = f" | {notes[:60]}"
is_match = compare_result.get("match") is True
is_clean = not mismatched
if source == "accepted":
color = BLUE
else:
color = GREEN if is_match and is_clean else ORANGE
compare_text = f"CHK: {match_label} | {diff_line} | {source}{note_suffix}"
left_only = f"{color}{compare_text}{RESET}"
for line in side_by_side_lines(
left_only,
"",
width=column_width,
gutter=gutter,
):
print(line)
if source == "accepted":
short_hash = accepted_hash[:8]
accepted_text = f"{BLUE}[ ACCEPTED - HASH {short_hash} ]{RESET}"
for line in side_by_side_lines(
accepted_text,
"",
width=column_width,
gutter=gutter,
):
print(line)
break
if compare_result and compare_result.get("match"):
action = prompt_yes_no_accept(
"Edit?",
default=False,
include_reload=True,
include_clear=True,
)
if action == "clear":
sca = update_scandium_event(
api,
sca,
tz,
fields={"SUM"},
calcium=cal,
prefill={"SUM": ""},
prompt=False,
update_scope_override="this",
)
if sca and cal:
compare_result = compare_events(cal, sca, False, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
if action == "reload":
if sca and sca.reference:
new_detail = api.get_reservation(sca.reference)
sca = build_scandium_event_view_from_detail(new_detail, tz)
if sca and cal:
compare_result = compare_events(cal, sca, use_ai, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
if action == "accept":
current_hash = canonical_hash(cal, sca)
if idx < len(hashes):
hashes[idx] = current_hash
if current_hash not in accepted_hashes:
store_accepted_hash(memory_path, current_hash)
accepted_hashes.add(current_hash)
compare_result = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
if idx < len(compare_results):
compare_results[idx] = compare_result
break
if action == "yes":
sca = update_scandium_event(api, sca, tz, calcium=cal)
if sca and cal:
compare_result = compare_events(cal, sca, False, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
break
if cal is None and sca is not None:
api.delete_reservation(sca.reference, update_scope="this")
sca = None
compare_result = None
if idx < len(compare_results):
compare_results[idx] = None
break
if sca is None and cal is not None:
action = prompt_yes_no_accept("Scandium missing. Create from Calcium?", default=True)
if action == "accept":
current_hash = canonical_hash(cal, sca)
if idx < len(hashes):
hashes[idx] = current_hash
if current_hash not in accepted_hashes:
store_accepted_hash(memory_path, current_hash)
accepted_hashes.add(current_hash)
compare_result = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
if idx < len(compare_results):
compare_results[idx] = compare_result
break
if action == "yes":
sca = create_scandium_from_calcium(api, resource, tz, cal.calcium_event)
if sca and cal:
compare_result = compare_events(cal, sca, use_ai, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
if cal.calcium_event and cal.calcium_event.get("RRULE"):
future_dates = [d for d in dates_sorted if d > date_value]
affected_dates = recurrence_dates_for_event(cal.calcium_event, future_dates)
if affected_dates:
refresh_scandium_for_dates(affected_dates)
continue
break
if cal is not None and sca is not None:
field_matches = compare_result.get("field_matches", {}) if compare_result else {}
rec_mismatch = field_matches.get("REC") == "mismatch"
action = prompt_yes_no_accept(
"Edit Scandium fields?",
default=False,
include_update_rec=rec_mismatch,
)
if action == "accept":
current_hash = canonical_hash(cal, sca)
if idx < len(hashes):
hashes[idx] = current_hash
if current_hash not in accepted_hashes:
store_accepted_hash(memory_path, current_hash)
accepted_hashes.add(current_hash)
compare_result = {
"match": True,
"field_matches": {},
"notes": "accepted",
"source": "accepted",
}
if idx < len(compare_results):
compare_results[idx] = compare_result
break
if action == "update_rec":
cal_rrule = (cal.calcium_event or {}).get("RRULE", "") if cal else ""
if cal_rrule:
recurrence_override = parse_rrule(
cal_rrule,
cal.start.replace(tzinfo=None),
)
else:
recurrence_override = {"type": "none"}
is_recurring = bool(sca.scandium_detail.get("isRecurring") if sca.scandium_detail else False)
if is_recurring:
scope = prompt_edit_scope(is_recurring, default_all=True)
else:
scope = "full"
sca = update_scandium_event(
api,
sca,
tz,
calcium=cal,
recurrence_rule_override=recurrence_override,
prompt=False,
update_scope_override=scope,
)
if sca:
compare_result = compare_events(cal, sca, False, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
if action == "yes":
recurrence_override = None
update_scope_override = None
if cal and compare_result:
if rec_mismatch:
if prompt_yes_no("Update REC to match Calcium?", default=True):
cal_rrule = (cal.calcium_event or {}).get("RRULE", "")
if cal_rrule:
recurrence_override = parse_rrule(
cal_rrule,
cal.start.replace(tzinfo=None),
)
else:
recurrence_override = {"type": "none"}
is_recurring = bool(sca.scandium_detail.get("isRecurring") if sca.scandium_detail else False)
if is_recurring:
update_scope_override = prompt_edit_scope(
is_recurring,
default_all=True,
)
else:
update_scope_override = "full"
sca = update_scandium_event(
api,
sca,
tz,
calcium=cal,
recurrence_rule_override=recurrence_override,
update_scope_override=update_scope_override,
)
if sca:
compare_result = compare_events(cal, sca, False, model, api_key, {})
if idx < len(compare_results):
compare_results[idx] = compare_result
continue
break
if compare_result and compare_result.get("match"):
pass
def resolve_preferences_path(events_path: Path, override: Optional[Path]) -> Optional[Path]:
if override:
return override
candidate = events_path.with_suffix(".Preferences")
if candidate.exists():
return candidate
return None
def main() -> int:
parser = argparse.ArgumentParser(
description="Compare Calcium and Scandium events for a single date.",
)
parser.add_argument(
"dates",
nargs="*",
type=parse_date,
help="Date(s) to compare (YYYY-MM-DD).",
)
parser.add_argument(
"--start-date",
type=parse_date,
default=None,
help="Start date (YYYY-MM-DD) for inclusive range.",
)
parser.add_argument(
"--end-date",
type=parse_date,
default=None,
help="End date (YYYY-MM-DD) for inclusive range.",
)
parser.add_argument(
"--calcium-events",
default=None,
help="Path to Calcium .Events file (defaults based on --resource).",
)
parser.add_argument(
"--preferences",
type=Path,
default=None,
help="Optional path to Calcium .Preferences file.",
)
parser.add_argument(
"--resource",
default=DEFAULT_RESOURCE,
help="Scandium resource name.",
)
parser.add_argument(
"--timezone",
default=DEFAULT_TZ,
help="Timezone for display and matching.",
)
parser.add_argument(
"--typ",
default=None,
help="Filter by TYP (Calcium category/Scandium PI value).",
)
parser.add_argument(
"--start-time",
type=parse_time_value,
default=None,
help="Filter by exact start time (HH:MM). Requires --end-time.",
)
parser.add_argument(
"--end-time",
type=parse_time_value,
default=None,
help="Filter by exact end time (HH:MM). Requires --start-time.",
)
parser.add_argument(
"--no-ai",
action="store_true",
help="Disable AI matching fallback.",
)
parser.add_argument(
"--ai-model",
default="gpt-5-nano",
help="OpenAI model for AI matching.",
)
args = parser.parse_args()
tz = ZoneInfo(args.timezone)
events_path_value = args.calcium_events
if not events_path_value:
events_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource)
if not events_path_value:
raise SystemExit(
f"No default Calcium .Events for resource '{args.resource}'. "
"Use --calcium-events to specify a file."
)
events_path = Path(events_path_value)
preferences_path = resolve_preferences_path(events_path, args.preferences)
rsync_cmd = [
"rsync",
"-rltv",
"ddrucker@calendar-actual:/var/www/cgi-bin/CalciumDir40/data/",
"data/",
]
try:
subprocess.run(rsync_cmd, check=True)
except subprocess.CalledProcessError as exc:
raise SystemExit(f"rsync failed: {exc}") from exc
if (args.start_time and not args.end_time) or (args.end_time and not args.start_time):
raise SystemExit("Both --start-time and --end-time are required together.")
if (args.start_date and not args.end_date) or (args.end_date and not args.start_date):
raise SystemExit("Both --start-date and --end-date are required together.")
if args.start_date and args.end_date and args.dates:
raise SystemExit("Use either explicit dates or --start-date/--end-date, not both.")
if not args.dates and not (args.start_date and args.end_date):
raise SystemExit("Provide at least one date or a --start-date/--end-date range.")
if args.start_date and args.end_date:
if args.end_date < args.start_date:
raise SystemExit("--end-date must be on or after --start-date.")
dates: List[date] = []
cursor = args.start_date
while cursor <= args.end_date:
dates.append(cursor)
cursor += timedelta(days=1)
else:
dates = list(args.dates)
api = BookedAPI()
resource = api.find_resource_by_name(args.resource)
if not resource:
raise SystemExit(f"Resource not found: {args.resource}")
dates_sorted = sorted(dates)
calcium_by_date: Dict[date, List[EventView]] = {}
scandium_by_date: Dict[date, List[EventView]] = {}
pairs_by_date: Dict[
date,
List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]],
] = {}
calcium_by_date = build_calcium_events_by_date(
events_path=events_path,
preferences_path=preferences_path,
tz=tz,
dates=dates_sorted,
start_time=args.start_time,
end_time=args.end_time,
)
if args.typ:
for date_value in dates_sorted:
calcium_by_date[date_value] = [
event
for event in calcium_by_date.get(date_value, [])
if (event.compare_fields.get("TYP") or "").lower() == args.typ.lower()
]
for date_value in tqdm(dates_sorted, desc="Reading Scandium"):
scandium_events = build_scandium_events_for_date(
api=api,
resource_name=args.resource,
tz=tz,
target_date=date_value,
start_time=args.start_time,
end_time=args.end_time,
resource=resource,
)
if args.typ:
scandium_events = [
event
for event in scandium_events
if (event.compare_fields.get("TYP") or "").lower() == args.typ.lower()
]
scandium_by_date[date_value] = scandium_events
for date_value in dates_sorted:
pairs_by_date[date_value] = build_pairs(
calcium_by_date.get(date_value, []),
scandium_by_date.get(date_value, []),
date_value,
)
api_key = os.environ.get("OPENAI_API_KEY")
memory_path = Path("sync-memory.dat")
accepted_hashes = load_accepted_hashes(memory_path)
hashes_by_date: Dict[date, List[str]] = {}
compare_results_by_date: Dict[date, List[Optional[Dict[str, object]]]] = {}
cache: Dict[str, Dict[str, object]] = {}
for date_value in dates_sorted:
pairs = pairs_by_date.get(date_value, [])
hashes = [canonical_hash(cal, sca) for cal, sca, _key, _date in pairs]
hashes_by_date[date_value] = hashes
compare_results_by_date[date_value] = compute_compare_results_for_pairs(
pairs,
hashes=hashes,
accepted_hashes=accepted_hashes,
use_ai=not args.no_ai,
model=args.ai_model,
api_key=api_key,
cache=cache,
)
display_pairs(
dates_sorted,
pairs_by_date,
hashes_by_date,
compare_results_by_date,
calcium_by_date,
scandium_by_date,
accepted_hashes,
memory_path,
tz,
use_ai=not args.no_ai,
model=args.ai_model,
api_key=api_key,
api=api,
resource=resource,
resource_name=args.resource,
typ_filter=args.typ,
start_time=args.start_time,
end_time=args.end_time,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())
@dmd
Copy link
Author

dmd commented Jan 14, 2026

jfc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment