Created
January 14, 2026 13:30
-
-
Save dmd/5fe58cbbe966c041a0ea608a9747f89a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # dependencies = ["requests"] | |
| # /// | |
| """ | |
| Booked Scheduler API client library. | |
| Common functions for interacting with the Booked Scheduler API at scandium.mclean.harvard.edu. | |
| Usage: | |
| from booked_api import BookedAPI | |
| api = BookedAPI() | |
| user = api.get_current_user() | |
| resources = api.list_resources() | |
| """ | |
| import json | |
| import os | |
| import requests | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Dict, Any | |
| class BookedAPI: | |
| """Client for Booked Scheduler API.""" | |
| def __init__(self, server: str = None, api_id: str = None, api_key: str = None): | |
| """Initialize the API client. | |
| Args: | |
| server: Server URL (default: from env or scandium.mclean.harvard.edu) | |
| api_id: API ID (default: from SCANDIUM_API_KEY env var) | |
| api_key: API Key (default: from SCANDIUM_API_KEY env var) | |
| """ | |
| self.server = server or os.environ.get( | |
| "BOOKED_SERVER", "https://scandium.mclean.harvard.edu" | |
| ) | |
| if api_id and api_key: | |
| self._api_id = api_id | |
| self._api_key = api_key | |
| else: | |
| creds = os.environ.get("SCANDIUM_API_KEY", "") | |
| if ":" in creds: | |
| self._api_id, self._api_key = creds.split(":", 1) | |
| else: | |
| raise ValueError( | |
| "SCANDIUM_API_KEY environment variable must be set (format: api_id:api_key)" | |
| ) | |
| self.headers = { | |
| "X-Booked-ApiId": self._api_id, | |
| "X-Booked-ApiKey": self._api_key, | |
| } | |
| self._session = requests.Session() | |
| self._user_id: Optional[int] = None | |
| def _request( | |
| self, | |
| method: str, | |
| endpoint: str, | |
| params: Dict = None, | |
| json: Dict = None, | |
| expect_json: bool = True, | |
| ) -> Dict | str: | |
| """Make an API request.""" | |
| url = f"{self.server}/Services/{endpoint.lstrip('/')}" | |
| r = self._session.request( | |
| method, | |
| url, | |
| headers=self.headers, | |
| params=params, | |
| json=json, | |
| ) | |
| r.raise_for_status() | |
| if not expect_json: | |
| return r.text | |
| try: | |
| return r.json() | |
| except requests.exceptions.JSONDecodeError as exc: | |
| snippet = r.text[:500].strip() | |
| raise RuntimeError( | |
| f"Non-JSON response from {endpoint}: {snippet or '<empty>'}" | |
| ) from exc | |
| # ==================== Account Methods ==================== | |
| def get_current_user(self) -> Dict: | |
| """Get current authenticated user info.""" | |
| attempts = [ | |
| ("Accounts/", None), | |
| ("Accounts/Current", None), | |
| ("Users/Current", None), | |
| ] | |
| last_result = None | |
| for endpoint, params in attempts: | |
| try: | |
| result = self._request("GET", endpoint, params=params, expect_json=True) | |
| last_result = result | |
| if isinstance(result, dict): | |
| if result.get("message") and not ( | |
| result.get("userId") | |
| or result.get("id") | |
| or result.get("user") | |
| ): | |
| continue | |
| return result | |
| except (requests.HTTPError, RuntimeError): | |
| pass | |
| try: | |
| raw = self._request("GET", endpoint, params=params, expect_json=False) | |
| except requests.HTTPError: | |
| continue | |
| last_result = raw | |
| if isinstance(raw, str) and raw.strip(): | |
| try: | |
| parsed = json.loads(raw) | |
| except json.JSONDecodeError: | |
| continue | |
| last_result = parsed | |
| if isinstance(parsed, dict): | |
| if parsed.get("message") and not ( | |
| parsed.get("userId") | |
| or parsed.get("id") | |
| or parsed.get("user") | |
| ): | |
| continue | |
| return parsed | |
| raise RuntimeError(f"Unexpected current user response: {last_result!r}") | |
| def get_user_id(self) -> int: | |
| """Get the current user's ID (cached).""" | |
| if self._user_id is None: | |
| env_user_id = os.environ.get("SCANDIUM_USER_ID", "").strip() | |
| if env_user_id: | |
| self._user_id = int(env_user_id) | |
| else: | |
| env_user_email = os.environ.get("SCANDIUM_USER_EMAIL", "").strip() | |
| if not env_user_email: | |
| env_user_email = "[email protected]" | |
| if env_user_email: | |
| user = self.find_user_by_email(env_user_email) | |
| if user and user.get("id"): | |
| self._user_id = int(user["id"]) | |
| if self._user_id is None: | |
| user = self.get_current_user() | |
| self._user_id = ( | |
| user.get("userId") | |
| or user.get("id") | |
| or user.get("user", {}).get("userId") | |
| or user.get("user", {}).get("id") | |
| ) | |
| if self._user_id is None: | |
| raise RuntimeError( | |
| "Unable to resolve user id from API. Set SCANDIUM_USER_ID " | |
| "or SCANDIUM_USER_EMAIL." | |
| ) | |
| return self._user_id | |
| # ==================== Resource Methods ==================== | |
| def list_resources(self) -> List[Dict]: | |
| """List all resources the user can access.""" | |
| return self._request("GET", "Resources/").get("resources", []) | |
| def get_resource(self, resource_id: int) -> Dict: | |
| """Get a single resource by ID.""" | |
| return self._request("GET", f"Resources/{resource_id}") | |
| def find_resource_by_name(self, name: str) -> Optional[Dict]: | |
| """Find a resource by name (partial match).""" | |
| resources = self.list_resources() | |
| for r in resources: | |
| if name.lower() in r.get("name", "").lower(): | |
| return r | |
| return None | |
| # ==================== User Methods ==================== | |
| def list_users(self, **filters) -> List[Dict]: | |
| """List users with optional filters (email, firstName, lastName, etc.).""" | |
| return self._request("GET", "Users/", params=filters).get("users", []) | |
| def find_user_by_email(self, email: str) -> Optional[Dict]: | |
| """Find a user by exact email match.""" | |
| users = self.list_users(email=email) | |
| for u in users: | |
| if u.get("emailAddress", "").lower() == email.lower(): | |
| return u | |
| return None | |
| def get_user(self, user_id: int) -> Dict: | |
| """Get a single user by ID.""" | |
| return self._request("GET", f"Users/{user_id}") | |
| # ==================== Schedule Methods ==================== | |
| def list_schedules(self) -> List[Dict]: | |
| """List all schedules.""" | |
| return self._request("GET", "Schedules/").get("schedules", []) | |
| def get_schedule(self, schedule_id: int) -> Dict: | |
| """Get a single schedule by ID.""" | |
| return self._request("GET", f"Schedules/{schedule_id}") | |
| def get_schedule_slots( | |
| self, | |
| schedule_id: int, | |
| start_date: str, | |
| end_date: str, | |
| resource_id: int = None, | |
| ) -> Dict: | |
| """Get availability slots for a schedule.""" | |
| params = {"startDateTime": start_date, "endDateTime": end_date} | |
| if resource_id: | |
| params["resourceId"] = resource_id | |
| return self._request("GET", f"Schedules/{schedule_id}/Slots", params=params) | |
| # ==================== Reservation Methods ==================== | |
| def list_reservations( | |
| self, | |
| start_date: str = None, | |
| end_date: str = None, | |
| resource_id: int = None, | |
| user_id: int = None, | |
| ) -> List[Dict]: | |
| """List reservations with optional filters.""" | |
| params = {} | |
| if start_date: | |
| params["startDateTime"] = start_date | |
| if end_date: | |
| params["endDateTime"] = end_date | |
| if resource_id: | |
| params["resourceId"] = resource_id | |
| if user_id: | |
| params["userId"] = user_id | |
| return self._request("GET", "Reservations/", params=params).get( | |
| "reservations", [] | |
| ) | |
| def get_reservation(self, reference_number: str) -> Dict: | |
| """Get a single reservation by reference number.""" | |
| return self._request("GET", f"Reservations/{reference_number}") | |
| def create_reservation( | |
| self, | |
| resource_id: int, | |
| start_datetime: str, | |
| end_datetime: str, | |
| title: str, | |
| description: str = None, | |
| user_id: int = None, | |
| participants: List[int] = None, | |
| invitees: List[int] = None, | |
| custom_attributes: List[Dict] = None, | |
| recurrence_rule: Dict = None, | |
| ) -> Dict: | |
| """Create a new reservation. | |
| Args: | |
| resource_id: ID of the resource to book | |
| start_datetime: Start time in ISO8601 format with TZ (e.g., "2026-01-15T14:00:00+0000") | |
| end_datetime: End time in ISO8601 format with TZ | |
| title: Reservation title | |
| description: Optional description | |
| user_id: Owner user ID (defaults to current user) | |
| participants: List of user IDs to add as participants | |
| invitees: List of user IDs to invite | |
| custom_attributes: List of {"attributeId": id, "attributeValue": value} | |
| recurrence_rule: Recurrence configuration (see skill.md for format) | |
| Returns: | |
| API response with referenceNumber on success, errors on failure | |
| Note: | |
| Due to an API quirk, participants may not be added on initial creation. | |
| This method automatically performs a follow-up update to ensure | |
| participants are added. | |
| """ | |
| resolved_user_id = user_id or self.get_user_id() | |
| data = { | |
| "resourceId": resource_id, | |
| "userId": resolved_user_id, | |
| "startDateTime": start_datetime, | |
| "endDateTime": end_datetime, | |
| "title": title, | |
| } | |
| if description: | |
| data["description"] = description | |
| if participants: | |
| data["participants"] = participants | |
| if invitees: | |
| data["invitees"] = invitees | |
| if custom_attributes: | |
| data["customAttributes"] = custom_attributes | |
| if recurrence_rule: | |
| data["recurrenceRule"] = recurrence_rule | |
| result = self._request("POST", "Reservations/", json=data) | |
| # Handle participants quirk: they often don't get added on creation | |
| # Do a follow-up update to ensure they're added | |
| if participants and result.get("referenceNumber"): | |
| ref_num = result["referenceNumber"] | |
| created = self.get_reservation(ref_num) | |
| # Check if participants were actually added | |
| created_participant_ids = [ | |
| int(p.get("userId")) for p in created.get("participants", []) | |
| ] | |
| if set(participants) != set(created_participant_ids): | |
| # Need to update to add participants | |
| # Use startDate/endDate from the created reservation (API returns these keys) | |
| self.update_reservation( | |
| reference_number=ref_num, | |
| resource_id=resource_id, | |
| start_datetime=created.get("startDate"), | |
| end_datetime=created.get("endDate"), | |
| title=title, | |
| update_scope="full", | |
| user_id=resolved_user_id, | |
| description=description, | |
| participants=participants, | |
| invitees=invitees, | |
| custom_attributes=custom_attributes, | |
| ) | |
| return result | |
| def update_reservation( | |
| self, | |
| reference_number: str, | |
| resource_id: int, | |
| start_datetime: str, | |
| end_datetime: str, | |
| title: str, | |
| update_scope: str = "full", | |
| **kwargs, | |
| ) -> Dict: | |
| """Update an existing reservation. | |
| Args: | |
| reference_number: Reservation reference number | |
| update_scope: "this", "full", or "future" | |
| **kwargs: Same as create_reservation | |
| """ | |
| data = { | |
| "resourceId": resource_id, | |
| "userId": kwargs.get("user_id") or self.get_user_id(), | |
| "startDateTime": start_datetime, | |
| "endDateTime": end_datetime, | |
| "title": title, | |
| } | |
| for key in [ | |
| "description", | |
| "participants", | |
| "invitees", | |
| "custom_attributes", | |
| "recurrence_rule", | |
| ]: | |
| if key in kwargs and kwargs[key]: | |
| # Convert snake_case to camelCase for API | |
| api_key = key.replace("_", " ").title().replace(" ", "") | |
| api_key = api_key[0].lower() + api_key[1:] | |
| data[api_key] = kwargs[key] | |
| url = f"{self.server}/Services/Reservations/{reference_number}" | |
| r = requests.post( | |
| url, headers=self.headers, params={"updateScope": update_scope}, json=data | |
| ) | |
| r.raise_for_status() | |
| return r.json() | |
| def delete_reservation( | |
| self, reference_number: str, update_scope: str = "this" | |
| ) -> Dict: | |
| """Delete a reservation. | |
| Args: | |
| reference_number: Reservation reference number | |
| update_scope: "this", "full", or "future" for recurring | |
| """ | |
| url = f"{self.server}/Services/Reservations/{reference_number}" | |
| r = requests.delete( | |
| url, headers=self.headers, params={"updateScope": update_scope} | |
| ) | |
| r.raise_for_status() | |
| return r.json() if r.text else {} | |
| # ==================== Attribute Methods ==================== | |
| def get_reservation_attributes(self) -> List[Dict]: | |
| """Get custom attributes for reservations (category 1).""" | |
| return self._request("GET", "Attributes/Category/1").get("attributes", []) | |
| def find_attribute_value( | |
| self, attribute_id: int, search_value: str | |
| ) -> Optional[str]: | |
| """Find the closest matching value for a select-list attribute. | |
| Args: | |
| attribute_id: The attribute ID | |
| search_value: Value to search for (case-insensitive partial match) | |
| Returns: | |
| The matching value from possibleValues, or None | |
| """ | |
| attrs = self.get_reservation_attributes() | |
| for attr in attrs: | |
| if attr.get("id") == attribute_id: | |
| possible = attr.get("possibleValues", []) | |
| # Try exact match first | |
| for v in possible: | |
| if v.lower() == search_value.lower(): | |
| return v | |
| # Try partial match | |
| for v in possible: | |
| if search_value.lower() in v.lower(): | |
| return v | |
| return None | |
| # ==================== Utility Functions ==================== | |
| def ics_datetime_to_iso(ics_dt: str) -> str: | |
| """Convert ICS datetime to ISO8601 with UTC offset. | |
| Args: | |
| ics_dt: ICS format datetime (e.g., "20260112T130000Z") | |
| Returns: | |
| ISO8601 format with timezone (e.g., "2026-01-12T13:00:00+0000") | |
| """ | |
| # Remove the Z suffix if present | |
| if ics_dt.endswith("Z"): | |
| ics_dt = ics_dt[:-1] | |
| tz_offset = "+0000" | |
| else: | |
| tz_offset = "+0000" # Assume UTC if no suffix | |
| dt = datetime.strptime(ics_dt, "%Y%m%dT%H%M%S") | |
| return dt.strftime(f"%Y-%m-%dT%H:%M:%S{tz_offset}") | |
| def parse_rrule(rrule: str, start_dt: datetime) -> Dict: | |
| """Parse an ICS RRULE into Booked recurrenceRule format. | |
| Args: | |
| rrule: ICS RRULE string (e.g., "FREQ=WEEKLY;INTERVAL=4;UNTIL=20260711") | |
| start_dt: Start datetime (used to determine weekday) | |
| Returns: | |
| Booked recurrenceRule dictionary | |
| """ | |
| parts = {} | |
| for part in rrule.split(";"): | |
| if "=" in part: | |
| key, value = part.split("=", 1) | |
| parts[key] = value | |
| freq_map = { | |
| "DAILY": "daily", | |
| "WEEKLY": "weekly", | |
| "MONTHLY": "monthly", | |
| "YEARLY": "yearly", | |
| } | |
| result = { | |
| "type": freq_map.get(parts.get("FREQ", ""), "none"), | |
| "interval": int(parts.get("INTERVAL", 1)), | |
| } | |
| # For weekly, include the weekday from start date | |
| # Python: 0=Monday, Booked: 0=Sunday, so add 1 and mod 7 | |
| if result["type"] == "weekly": | |
| python_weekday = start_dt.weekday() # 0=Monday in Python | |
| booked_weekday = (python_weekday + 1) % 7 # Convert to 0=Sunday | |
| result["weekdays"] = [booked_weekday] | |
| # Parse UNTIL date | |
| if "UNTIL" in parts: | |
| until = parts["UNTIL"] | |
| if len(until) == 8: # YYYYMMDD format | |
| until_dt = datetime.strptime(until, "%Y%m%d") | |
| else: | |
| until_dt = datetime.strptime(until[:15], "%Y%m%dT%H%M%S") | |
| result["repeatTerminationDate"] = until_dt.strftime("%Y-%m-%dT00:00:00+0000") | |
| return result | |
| if __name__ == "__main__": | |
| # Example usage | |
| api = BookedAPI() | |
| user = api.get_current_user() | |
| print(f"Logged in as: {user['firstName']} {user['lastName']}") | |
| resources = api.list_resources() | |
| print(f"\nAvailable resources: {len(resources)}") | |
| for r in resources[:5]: | |
| print(f" - {r['resourceId']}: {r['name']}") | |
| # Get this week's reservations | |
| today = datetime.now().strftime("%Y-%m-%d") | |
| next_week = (datetime.now() + timedelta(days=7)).strftime("%Y-%m-%d") | |
| reservations = api.list_reservations(today, next_week) | |
| print(f"\nReservations this week: {len(reservations)}") | |
| for res in reservations[:5]: | |
| print(f" - {res['referenceNumber']}: {res['title']}") | |
| #!/usr/bin/env python3 | |
| import re | |
| from dataclasses import dataclass | |
| from datetime import date, datetime, time, timedelta | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional | |
| from zoneinfo import ZoneInfo | |
| UNIT_SEP = "\x1c" | |
| CATEGORY_SEP = "\x1d" | |
| TIME_PERIOD_SEP = "\x1e" | |
| OPEN_FUTURE_DATE = date(2037, 1, 1) | |
| FIELD_MAP = { | |
| "a": "text", | |
| "b": "link", | |
| "c": "popup", | |
| "d": "export", | |
| "e": "start_time", | |
| "f": "end_time", | |
| "g": "id", | |
| "h": "owner", | |
| "i": "draw_border", | |
| "j": "bg_color", | |
| "k": "fg_color", | |
| "l": "mail_to", | |
| "m": "mail_cc", | |
| "n": "mail_bcc", | |
| "o": "mail_text", | |
| "p": "reminder_times", | |
| "q": "reminder_to", | |
| "r": "category", | |
| "s": "is_tentative", | |
| "t": "subscriptions", | |
| "u": "time_period", | |
| "A": "repeat_start", | |
| "B": "repeat_end", | |
| "C": "period", | |
| "D": "frequency", | |
| "E": "month_week", | |
| "F": "month_month", | |
| "G": "exclusions", | |
| "H": "skip_weekends", | |
| } | |
| DAY_NAMES = [None, "MO", "TU", "WE", "TH", "FR", "SA", "SU"] | |
| @dataclass(frozen=True) | |
| class CalciumEvent: | |
| text: str | |
| popup: Optional[str] | |
| link: Optional[str] | |
| categories: List[str] | |
| start_time: Optional[int] | |
| end_time: Optional[int] | |
| time_period: Optional[str] | |
| date: Optional[date] | |
| repeat_start: Optional[date] | |
| repeat_end: Optional[date] | |
| period: Optional[str | List[int]] | |
| frequency: Optional[int] | |
| month_week: Optional[List[int]] | |
| month_month: Optional[int] | |
| exclusions: List[date] | |
| skip_weekends: bool | |
| @property | |
| def is_repeating(self) -> bool: | |
| return self.repeat_start is not None | |
| @dataclass(frozen=True) | |
| class VEventData: | |
| event: Dict[str, str] | |
| lines: List[str] | |
| def _parse_date(value: str) -> Optional[date]: | |
| value = value.strip() | |
| if not value: | |
| return None | |
| try: | |
| year, month, day = value.split("/") | |
| return date(int(year), int(month), int(day)) | |
| except ValueError: | |
| return None | |
| def _parse_minutes(value: str) -> Optional[int]: | |
| value = value.strip() | |
| if not value: | |
| return None | |
| try: | |
| return int(value) | |
| except ValueError: | |
| return None | |
| def _parse_int_list(value: str) -> Optional[List[int]]: | |
| if not value: | |
| return None | |
| try: | |
| return [int(piece) for piece in value.split() if piece.strip()] | |
| except ValueError: | |
| return None | |
| def _unescape(value: str) -> str: | |
| return value.replace("\\n", "\n").replace("\r", "").strip() | |
| def _load_time_periods(preferences_path: Path) -> Dict[str, List[str]]: | |
| periods: Dict[str, List[str]] = {} | |
| if not preferences_path.exists(): | |
| return periods | |
| try: | |
| lines = preferences_path.read_text(encoding="utf-8").splitlines() | |
| except OSError: | |
| return periods | |
| for raw in lines: | |
| parts = raw.split(UNIT_SEP) | |
| if not parts: | |
| continue | |
| if parts[0] != "TimePeriods": | |
| continue | |
| values = parts[1:] | |
| for idx in range(0, len(values) - 1, 2): | |
| period_id = values[idx] | |
| period_value = values[idx + 1] | |
| if not period_id: | |
| continue | |
| fields = (period_value or "").split(TIME_PERIOD_SEP) | |
| periods[period_id] = fields | |
| return periods | |
| def _resolve_time_period( | |
| time_period: Optional[str], | |
| time_periods: Dict[str, List[str]], | |
| ) -> tuple[Optional[int], Optional[int]]: | |
| if not time_period: | |
| return None, None | |
| fields = time_periods.get(time_period) | |
| if not fields or len(fields) < 3: | |
| return None, None | |
| try: | |
| start = int(fields[1]) | |
| end = int(fields[2]) | |
| except ValueError: | |
| return None, None | |
| return start, end | |
| def _parse_categories(value: str) -> List[str]: | |
| if not value: | |
| return [] | |
| return [cat for cat in value.split(CATEGORY_SEP) if cat] | |
| def _parse_exclusions(value: str) -> List[date]: | |
| if not value: | |
| return [] | |
| dates: List[date] = [] | |
| for raw in value.split(): | |
| parsed = _parse_date(raw) | |
| if parsed: | |
| dates.append(parsed) | |
| return dates | |
| def parse_calcium_events( | |
| events_path: Path, | |
| category_filter: Optional[str], | |
| exclude_categories: Optional[Iterable[str]], | |
| preferences_path: Optional[Path], | |
| debug: bool, | |
| ) -> List[CalciumEvent]: | |
| time_periods = _load_time_periods(preferences_path) if preferences_path else {} | |
| events: List[CalciumEvent] = [] | |
| try: | |
| raw_text = events_path.read_text(encoding="utf-8", errors="replace") | |
| except OSError as exc: | |
| raise RuntimeError(f"Failed to read {events_path}: {exc}") from exc | |
| lines = raw_text.split("\n") | |
| exclude_set = {value for value in (exclude_categories or []) if value} | |
| for raw in lines: | |
| if not raw: | |
| continue | |
| parts = raw.split(UNIT_SEP) | |
| if not parts: | |
| continue | |
| key = parts[0] | |
| if key == "LastID": | |
| continue | |
| raw_fields = {} | |
| values = parts[1:] | |
| for idx in range(0, len(values) - 1, 2): | |
| field = values[idx] | |
| value = _unescape(values[idx + 1]) | |
| if not field: | |
| continue | |
| raw_fields[field] = value | |
| text = raw_fields.get("a", "").strip() | |
| popup = raw_fields.get("c") or None | |
| link = raw_fields.get("b") or None | |
| categories = _parse_categories(raw_fields.get("r", "")) | |
| if category_filter and category_filter not in categories: | |
| continue | |
| if exclude_set and any(category in exclude_set for category in categories): | |
| continue | |
| start_time = _parse_minutes(raw_fields.get("e", "")) | |
| end_time = _parse_minutes(raw_fields.get("f", "")) | |
| time_period = raw_fields.get("u") or None | |
| if (start_time is None or end_time is None) and time_period: | |
| resolved_start, resolved_end = _resolve_time_period(time_period, time_periods) | |
| start_time = start_time if start_time is not None else resolved_start | |
| end_time = end_time if end_time is not None else resolved_end | |
| period_raw = raw_fields.get("C") | |
| period = None | |
| if period_raw: | |
| if re.fullmatch(r"[\d\s]+", period_raw): | |
| period = _parse_int_list(period_raw) | |
| else: | |
| period = period_raw | |
| month_week = _parse_int_list(raw_fields.get("E", "")) | |
| month_month = _parse_minutes(raw_fields.get("F", "")) | |
| frequency = _parse_minutes(raw_fields.get("D", "")) | |
| exclusions = _parse_exclusions(raw_fields.get("G", "")) | |
| skip_weekends = raw_fields.get("H", "").strip() == "1" | |
| event_date = _parse_date(key) if key != "Repeat" else None | |
| repeat_start = _parse_date(raw_fields.get("A", "")) | |
| repeat_end = _parse_date(raw_fields.get("B", "")) | |
| events.append( | |
| CalciumEvent( | |
| text=text, | |
| popup=popup, | |
| link=link, | |
| categories=categories, | |
| start_time=start_time, | |
| end_time=end_time, | |
| time_period=time_period, | |
| date=event_date, | |
| repeat_start=repeat_start, | |
| repeat_end=repeat_end, | |
| period=period, | |
| frequency=frequency, | |
| month_week=month_week, | |
| month_month=month_month, | |
| exclusions=exclusions, | |
| skip_weekends=skip_weekends, | |
| ) | |
| ) | |
| if debug: | |
| print(f"Parsed {len(events)} events from {events_path}") | |
| return events | |
| def _format_local_datetime(dt: datetime, use_utc: bool) -> str: | |
| if use_utc: | |
| utc = dt.astimezone(ZoneInfo("UTC")) | |
| return utc.strftime("%Y%m%dT%H%M%SZ") | |
| return dt.strftime("%Y%m%dT%H%M%S") | |
| def _minutes_to_time(minutes: int) -> time: | |
| return time(minutes // 60, minutes % 60) | |
| def _build_rrule(event: CalciumEvent) -> Optional[str]: | |
| if not event.is_repeating: | |
| return None | |
| start_date = event.repeat_start | |
| if not start_date: | |
| return None | |
| freq = None | |
| interval = event.frequency or 1 | |
| by_day = None | |
| by_month_day = None | |
| by_set_pos = None | |
| period = event.period | |
| if period: | |
| if isinstance(period, list): | |
| freq = "WEEKLY" | |
| by_day = "BYDAY=" + ",".join(DAY_NAMES[day] for day in period if 0 < day < 8) | |
| else: | |
| freq = { | |
| "day": "DAILY", | |
| "dayBanner": "DAILY", | |
| "week": "WEEKLY", | |
| "month": "MONTHLY", | |
| "year": "YEARLY", | |
| }.get(period) | |
| if freq == "MONTHLY": | |
| by_month_day = f"BYMONTHDAY={start_date.day}" | |
| if event.month_week: | |
| freq = "MONTHLY" | |
| interval = event.month_month or 1 | |
| dow = start_date.isoweekday() | |
| by_day = "BYDAY=" + ",".join(DAY_NAMES[dow] for _ in event.month_week) | |
| for week_num in event.month_week: | |
| if week_num == 5: | |
| by_set_pos = "BYSETPOS=-1" | |
| elif week_num == 6: | |
| by_set_pos = "BYSETPOS=5" | |
| else: | |
| by_set_pos = f"BYSETPOS={week_num}" | |
| if not freq: | |
| return None | |
| parts = [f"FREQ={freq}", f"INTERVAL={interval}"] | |
| if by_month_day: | |
| parts.append(by_month_day) | |
| if by_day: | |
| parts.append(by_day) | |
| if event.repeat_end and event.repeat_end < OPEN_FUTURE_DATE: | |
| parts.append(f"UNTIL={event.repeat_end.strftime('%Y%m%d')}") | |
| if by_set_pos: | |
| parts.append(by_set_pos) | |
| return ";".join(parts) | |
| def _format_rrule_with_until(rrule: str, until_date: date) -> str: | |
| parts: Dict[str, str] = {} | |
| for part in rrule.split(";"): | |
| if "=" in part: | |
| key, value = part.split("=", 1) | |
| parts[key] = value | |
| parts["UNTIL"] = until_date.strftime("%Y%m%d") | |
| ordered = ["FREQ", "INTERVAL", "BYDAY", "UNTIL"] | |
| items = [] | |
| for key in ordered: | |
| if key in parts: | |
| items.append(f"{key}={parts[key]}") | |
| for key, value in parts.items(): | |
| if key not in ordered: | |
| items.append(f"{key}={value}") | |
| return ";".join(items) | |
| def _build_exdates(event: CalciumEvent) -> Optional[str]: | |
| if not event.exclusions: | |
| return None | |
| return ",".join(exclusion.strftime("%Y%m%d") for exclusion in event.exclusions) | |
| def _parse_rrule_simple(rrule: str, start_date: date) -> Dict[str, Optional[object]]: | |
| parts: Dict[str, str] = {} | |
| for part in rrule.split(";"): | |
| if "=" in part: | |
| key, value = part.split("=", 1) | |
| parts[key] = value | |
| freq_map = { | |
| "DAILY": "daily", | |
| "WEEKLY": "weekly", | |
| "MONTHLY": "monthly", | |
| "YEARLY": "yearly", | |
| } | |
| rule = { | |
| "type": freq_map.get(parts.get("FREQ", ""), "none"), | |
| "interval": int(parts.get("INTERVAL", 1)), | |
| "weekdays": None, | |
| "until": None, | |
| } | |
| if rule["type"] == "weekly": | |
| booked_weekday = (start_date.weekday() + 1) % 7 | |
| rule["weekdays"] = [booked_weekday] | |
| until_value = parts.get("UNTIL") | |
| if until_value: | |
| try: | |
| if len(until_value) >= 8: | |
| rule["until"] = datetime.strptime(until_value[:8], "%Y%m%d").date() | |
| except ValueError: | |
| rule["until"] = None | |
| return rule | |
| def _recurrence_occurs_on(entry_date: date, start_date: date, rule: Dict[str, Optional[object]]) -> bool: | |
| rtype = rule.get("type") | |
| if not rtype or rtype == "none": | |
| return False | |
| if entry_date < start_date: | |
| return False | |
| until = rule.get("until") | |
| if until and entry_date > until: | |
| return False | |
| interval = int(rule.get("interval") or 1) | |
| if rtype == "daily": | |
| diff_days = (entry_date - start_date).days | |
| return diff_days % interval == 0 | |
| if rtype == "weekly": | |
| weekdays = rule.get("weekdays") or [] | |
| booked_weekday = (entry_date.weekday() + 1) % 7 | |
| if booked_weekday not in weekdays: | |
| return False | |
| diff_days = (entry_date - start_date).days | |
| return diff_days % (interval * 7) == 0 | |
| return False | |
| def _build_description(event: CalciumEvent) -> Optional[str]: | |
| return event.popup or event.link | |
| def _build_dtstart_dtend( | |
| event: CalciumEvent, | |
| tz: ZoneInfo, | |
| use_utc: bool, | |
| ) -> Optional[tuple[str, str]]: | |
| start_date = event.repeat_start or event.date | |
| if not start_date: | |
| return None | |
| if event.start_time is None: | |
| if use_utc: | |
| return None | |
| start_value = start_date.strftime("%Y%m%d") | |
| end_value = (start_date + timedelta(days=1)).strftime("%Y%m%d") | |
| return start_value, end_value | |
| start_dt = datetime.combine(start_date, _minutes_to_time(event.start_time), tzinfo=tz) | |
| if event.end_time is None: | |
| end_dt = start_dt | |
| else: | |
| end_date = start_date | |
| if event.end_time < event.start_time: | |
| end_date = start_date + timedelta(days=1) | |
| end_dt = datetime.combine(end_date, _minutes_to_time(event.end_time), tzinfo=tz) | |
| return _format_local_datetime(start_dt, use_utc), _format_local_datetime(end_dt, use_utc) | |
| def _resolve_category_value(event: CalciumEvent, category_filter: Optional[str]) -> Optional[str]: | |
| if category_filter: | |
| return category_filter | |
| return event.categories[0] if event.categories else None | |
| def _base_vevent_fields(event: CalciumEvent, category_value: str) -> Dict[str, str]: | |
| vevent = { | |
| "SUMMARY": event.text, | |
| "CATEGORIES": category_value, | |
| } | |
| description = _build_description(event) | |
| if description: | |
| vevent["DESCRIPTION"] = description | |
| return vevent | |
| def _event_duration(event: CalciumEvent, tz: ZoneInfo) -> Optional[tuple[time, timedelta]]: | |
| start_date = event.repeat_start or event.date | |
| if not start_date or event.start_time is None: | |
| return None | |
| start_time = _minutes_to_time(event.start_time) | |
| start_dt = datetime.combine(start_date, start_time, tzinfo=tz) | |
| if event.end_time is None: | |
| end_dt = start_dt | |
| else: | |
| end_date = start_date | |
| if event.end_time < event.start_time: | |
| end_date = start_date + timedelta(days=1) | |
| end_dt = datetime.combine(end_date, _minutes_to_time(event.end_time), tzinfo=tz) | |
| return start_time, end_dt - start_dt | |
| def _build_vevent_for_start( | |
| event: CalciumEvent, | |
| tz: ZoneInfo, | |
| category_value: str, | |
| use_utc: bool, | |
| start_date: date, | |
| rrule: Optional[str], | |
| until_date: Optional[date], | |
| include_exdates: bool, | |
| ) -> Optional[Dict[str, str]]: | |
| duration_info = _event_duration(event, tz) | |
| if not duration_info: | |
| return None | |
| start_time, duration = duration_info | |
| start_dt = datetime.combine(start_date, start_time, tzinfo=tz) | |
| end_dt = start_dt + duration | |
| vevent = _base_vevent_fields(event, category_value) | |
| vevent["DTSTART"] = _format_local_datetime(start_dt, use_utc) | |
| vevent["DTEND"] = _format_local_datetime(end_dt, use_utc) | |
| if rrule: | |
| if until_date: | |
| if until_date < start_date: | |
| return None | |
| if until_date != start_date: | |
| vevent["RRULE"] = _format_rrule_with_until(rrule, until_date) | |
| else: | |
| vevent["RRULE"] = rrule | |
| if include_exdates: | |
| exdates = _build_exdates(event) | |
| if exdates: | |
| vevent["EXDATE"] = exdates | |
| return vevent | |
| def _split_recurring_event( | |
| event: CalciumEvent, | |
| tz: ZoneInfo, | |
| category_value: str, | |
| use_utc: bool, | |
| debug: bool, | |
| ) -> List[Dict[str, str]]: | |
| start_date = event.repeat_start or event.date | |
| if not start_date: | |
| return [] | |
| rrule = _build_rrule(event) | |
| if not rrule: | |
| vevent = _build_vevent_for_start( | |
| event, | |
| tz, | |
| category_value, | |
| use_utc, | |
| start_date, | |
| rrule=None, | |
| until_date=None, | |
| include_exdates=False, | |
| ) | |
| return [vevent] if vevent else [] | |
| rule = _parse_rrule_simple(rrule, start_date) | |
| if rule.get("type") not in {"daily", "weekly"}: | |
| if debug: | |
| print(f"Leaving EXDATE untouched for non-daily/weekly recurrence: {event.text}") | |
| vevent = _build_vevent_for_start( | |
| event, | |
| tz, | |
| category_value, | |
| use_utc, | |
| start_date, | |
| rrule=rrule, | |
| until_date=event.repeat_end if event.repeat_end and event.repeat_end < OPEN_FUTURE_DATE else None, | |
| include_exdates=True, | |
| ) | |
| return [vevent] if vevent else [] | |
| interval = int(rule.get("interval") or 1) | |
| step_days = interval if rule.get("type") == "daily" else interval * 7 | |
| exclusions = sorted({exdate for exdate in event.exclusions}) | |
| current_start = start_date | |
| last_allowed = event.repeat_end if event.repeat_end and event.repeat_end < OPEN_FUTURE_DATE else None | |
| segments: List[tuple[date, Optional[date]]] = [] | |
| for exdate in exclusions: | |
| if last_allowed and exdate > last_allowed: | |
| continue | |
| if exdate < current_start: | |
| continue | |
| if not _recurrence_occurs_on(exdate, start_date, rule): | |
| continue | |
| prev_date = exdate - timedelta(days=step_days) | |
| if prev_date >= current_start: | |
| segment_end = prev_date | |
| if last_allowed and segment_end > last_allowed: | |
| segment_end = last_allowed | |
| if segment_end >= current_start: | |
| segments.append((current_start, segment_end)) | |
| next_date = exdate + timedelta(days=step_days) | |
| if last_allowed and next_date > last_allowed: | |
| current_start = None | |
| break | |
| current_start = next_date | |
| if current_start: | |
| if not last_allowed or current_start <= last_allowed: | |
| segments.append((current_start, last_allowed)) | |
| vevents: List[Dict[str, str]] = [] | |
| for segment_start, segment_until in segments: | |
| vevent = _build_vevent_for_start( | |
| event, | |
| tz, | |
| category_value, | |
| use_utc, | |
| segment_start, | |
| rrule=rrule, | |
| until_date=segment_until, | |
| include_exdates=False, | |
| ) | |
| if vevent: | |
| vevents.append(vevent) | |
| return vevents | |
| def build_vevent( | |
| event: CalciumEvent, | |
| tz: ZoneInfo, | |
| category_value: str, | |
| use_utc: bool, | |
| debug: bool, | |
| ) -> List[Dict[str, str]]: | |
| start_date = event.repeat_start or event.date | |
| if not start_date: | |
| return [] | |
| if event.exclusions and event.is_repeating: | |
| return _split_recurring_event(event, tz, category_value, use_utc, debug) | |
| vevent = _build_vevent_for_start( | |
| event, | |
| tz, | |
| category_value, | |
| use_utc, | |
| start_date, | |
| rrule=_build_rrule(event), | |
| until_date=event.repeat_end if event.repeat_end and event.repeat_end < OPEN_FUTURE_DATE else None, | |
| include_exdates=False, | |
| ) | |
| return [vevent] if vevent else [] | |
| def build_vevents( | |
| events_path: Path, | |
| category_filter: Optional[str], | |
| tz_name: str, | |
| preferences_path: Optional[Path] = None, | |
| exclude_categories: Optional[Iterable[str]] = None, | |
| debug: bool = False, | |
| use_utc: bool = True, | |
| ) -> List[VEventData]: | |
| tz = ZoneInfo(tz_name) | |
| events = parse_calcium_events( | |
| events_path, | |
| category_filter, | |
| exclude_categories, | |
| preferences_path, | |
| debug, | |
| ) | |
| vevents: List[VEventData] = [] | |
| for event in events: | |
| category_value = _resolve_category_value(event, category_filter) | |
| if not category_value: | |
| if debug: | |
| print(f"Skipping event with no category: {event.text}") | |
| continue | |
| for vevent in build_vevent(event, tz, category_value, use_utc=use_utc, debug=debug): | |
| if not vevent: | |
| if debug: | |
| print(f"Skipping event with missing datetime: {event.text}") | |
| continue | |
| lines = _build_vevent_lines(vevent) | |
| vevents.append(VEventData(event=vevent, lines=lines)) | |
| return vevents | |
| def _build_vevent_lines(event: Dict[str, str]) -> List[str]: | |
| lines = ["BEGIN:VEVENT"] | |
| for key in ( | |
| "DTSTART", | |
| "DTEND", | |
| "DTSTAMP", | |
| "UID", | |
| "SUMMARY", | |
| "DESCRIPTION", | |
| "CATEGORIES", | |
| "RRULE", | |
| "EXDATE", | |
| ): | |
| value = event.get(key) | |
| if value: | |
| lines.append(f"{key}:{value}") | |
| lines.append("END:VEVENT") | |
| return lines | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import re | |
| import sys | |
| from datetime import datetime, time, timedelta | |
| from pathlib import Path | |
| from typing import Optional | |
| from zoneinfo import ZoneInfo | |
| try: | |
| from prompt_toolkit import prompt as pt_prompt | |
| HAS_PROMPT_TOOLKIT = True | |
| except Exception: | |
| HAS_PROMPT_TOOLKIT = False | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402 | |
| native_root = Path(__file__).resolve().parent | |
| sys.path.insert(0, str(native_root)) | |
| from calcium_events import build_vevents # noqa: E402 | |
| DEFAULT_RESOURCE = "P1 Prisma" | |
| DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = { | |
| "P1 Prisma": "data/3T_Prisma_P1.Events", | |
| "P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events", | |
| "4T Varian": "data/4T_Varian_Inova.Events", | |
| } | |
| DEFAULT_TZ = "America/New_York" | |
| PI_ATTRIBUTE_ID = 1 | |
| SCANNED_INITIALS_ATTRIBUTE_ID = 2 | |
| PATIENT_SOURCE_ATTRIBUTE_ID = 11 | |
| BODY_PART_ATTRIBUTE_ID = 10 | |
| CONTRAST_ATTRIBUTE_ID = 3 | |
| def local_datetime_from_ics(value: str, tz: ZoneInfo) -> datetime: | |
| if not value: | |
| return datetime.min.replace(tzinfo=tz) | |
| if "T" not in value: | |
| date_value = datetime.strptime(value[:8], "%Y%m%d").date() | |
| return datetime.combine(date_value, time(0, 0), tzinfo=tz) | |
| iso = ics_datetime_to_iso(value) | |
| dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z") | |
| return dt.astimezone(tz) | |
| def format_ics_like(template: str, dt: datetime) -> str: | |
| if "T" not in template: | |
| return dt.strftime("%Y%m%d") | |
| if template.endswith("Z"): | |
| utc = dt.astimezone(ZoneInfo("UTC")) | |
| return utc.strftime("%Y%m%dT%H%M%SZ") | |
| return dt.strftime("%Y%m%dT%H%M%S") | |
| def parse_termination_date(value: str | None, tz: ZoneInfo) -> Optional[datetime.date]: | |
| if not value: | |
| return None | |
| if value.endswith("Z"): | |
| value = value[:-1] + "+0000" | |
| if value[-3] == ":": | |
| value = value[:-3] + value[-2:] | |
| return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date() | |
| def parse_exdate_dates(value: str) -> set[datetime.date]: | |
| if not value: | |
| return set() | |
| dates: set[datetime.date] = set() | |
| for part in value.split(","): | |
| chunk = part.strip() | |
| if len(chunk) >= 8: | |
| try: | |
| dates.add(datetime.strptime(chunk[:8], "%Y%m%d").date()) | |
| except ValueError: | |
| continue | |
| return dates | |
| def update_rrule_until(rrule: str, until_date: datetime.date) -> str: | |
| if not rrule: | |
| return rrule | |
| parts = [] | |
| updated = False | |
| for piece in rrule.split(";"): | |
| if piece.startswith("UNTIL="): | |
| parts.append(f"UNTIL={until_date.strftime('%Y%m%d')}") | |
| updated = True | |
| else: | |
| parts.append(piece) | |
| if not updated: | |
| parts.append(f"UNTIL={until_date.strftime('%Y%m%d')}") | |
| return ";".join(parts) | |
| def recurrence_occurs_on( | |
| target_date: datetime.date, | |
| start_date: datetime.date, | |
| rule: dict, | |
| ) -> bool: | |
| if target_date < start_date: | |
| return False | |
| interval = int(rule.get("interval", 1)) | |
| rtype = rule.get("type") | |
| if rtype == "daily": | |
| diff_days = (target_date - start_date).days | |
| return diff_days % interval == 0 | |
| if rtype == "weekly": | |
| weekdays = rule.get("weekdays") or [] | |
| booked_weekday = (target_date.weekday() + 1) % 7 | |
| if booked_weekday not in weekdays: | |
| return False | |
| diff_days = (target_date - start_date).days | |
| return diff_days % (interval * 7) == 0 | |
| if rtype == "monthly": | |
| if target_date.day != start_date.day: | |
| return False | |
| months = (target_date.year - start_date.year) * 12 + ( | |
| target_date.month - start_date.month | |
| ) | |
| return months % interval == 0 | |
| if rtype == "yearly": | |
| if (target_date.month, target_date.day) != (start_date.month, start_date.day): | |
| return False | |
| return (target_date.year - start_date.year) % interval == 0 | |
| return False | |
| def expand_exdates( | |
| events: list[dict], | |
| tz: ZoneInfo, | |
| debug: bool, | |
| ) -> list[dict]: | |
| expanded: list[dict] = [] | |
| for event in events: | |
| rrule = event.get("RRULE") or "" | |
| exdates = parse_exdate_dates(event.get("EXDATE", "")) | |
| if not rrule or not exdates: | |
| event.pop("EXDATE", None) | |
| expanded.append(event) | |
| continue | |
| start_raw = event.get("DTSTART", "") | |
| end_raw = event.get("DTEND", "") | |
| start_dt = local_datetime_from_ics(start_raw, tz) | |
| end_dt = local_datetime_from_ics(end_raw, tz) | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) | |
| until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) | |
| current_start = start_dt | |
| duration = end_dt - start_dt | |
| for exdate in sorted(exdates): | |
| if until_date and exdate > until_date: | |
| continue | |
| if not recurrence_occurs_on(exdate, start_dt.date(), rule): | |
| continue | |
| segment_until = exdate - timedelta(days=1) | |
| if segment_until >= current_start.date(): | |
| segment = dict(event) | |
| segment["DTSTART"] = format_ics_like(start_raw, current_start) | |
| segment["DTEND"] = format_ics_like(end_raw, current_start + duration) | |
| segment["RRULE"] = update_rrule_until(rrule, segment_until) | |
| segment.pop("EXDATE", None) | |
| expanded.append(segment) | |
| next_dt = next_occurrence( | |
| start_dt, | |
| rule, | |
| datetime.combine(exdate + timedelta(days=1), time.min, tzinfo=tz), | |
| ) | |
| if not next_dt: | |
| current_start = None | |
| break | |
| current_start = next_dt | |
| if current_start and (not until_date or current_start.date() <= until_date): | |
| segment = dict(event) | |
| segment["DTSTART"] = format_ics_like(start_raw, current_start) | |
| segment["DTEND"] = format_ics_like(end_raw, current_start + duration) | |
| if until_date: | |
| segment["RRULE"] = update_rrule_until(rrule, until_date) | |
| segment.pop("EXDATE", None) | |
| expanded.append(segment) | |
| elif debug: | |
| print(f"Dropped event after EXDATE expansion: {event.get('SUMMARY', '').strip()}") | |
| return expanded | |
| def add_months(date_value: datetime.date, months: int) -> datetime.date: | |
| year = date_value.year + (date_value.month - 1 + months) // 12 | |
| month = (date_value.month - 1 + months) % 12 + 1 | |
| day = date_value.day | |
| while True: | |
| try: | |
| return datetime(year, month, day).date() | |
| except ValueError: | |
| day -= 1 | |
| def next_occurrence(start_dt: datetime, rule: dict, now_dt: datetime) -> Optional[datetime]: | |
| interval = int(rule.get("interval", 1)) | |
| occurrence = start_dt | |
| if rule.get("type") == "daily": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| steps = delta_days // interval | |
| occurrence = occurrence + timedelta(days=steps * interval) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval) | |
| elif rule.get("type") == "weekly": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| weeks = delta_days // 7 | |
| steps = (weeks // interval) * interval | |
| occurrence = occurrence + timedelta(days=steps * 7) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval * 7) | |
| elif rule.get("type") == "monthly": | |
| if now_dt > occurrence: | |
| months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month) | |
| steps = (months // interval) * interval | |
| occurrence_date = add_months(occurrence.date(), steps) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| if occurrence < now_dt: | |
| occurrence_date = add_months(occurrence.date(), interval) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| elif rule.get("type") == "yearly": | |
| if now_dt > occurrence: | |
| years = now_dt.year - occurrence.year | |
| steps = (years // interval) * interval | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + steps) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + steps, day=28) | |
| if occurrence < now_dt: | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + interval) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + interval, day=28) | |
| else: | |
| return None | |
| return occurrence | |
| def adjust_event_to_today(event: dict, tz: ZoneInfo, today_date: datetime.date) -> Optional[dict]: | |
| start_raw = event.get("DTSTART", "") | |
| end_raw = event.get("DTEND", "") | |
| if not start_raw or not end_raw: | |
| return None | |
| start_dt = local_datetime_from_ics(start_raw, tz) | |
| end_dt = local_datetime_from_ics(end_raw, tz) | |
| rrule = event.get("RRULE") | |
| if not rrule: | |
| if start_dt.date() < today_date: | |
| return None | |
| return event | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) | |
| until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) | |
| if until_date and today_date > until_date: | |
| return None | |
| if start_dt.date() >= today_date: | |
| return event | |
| today_dt = datetime.combine(today_date, time.min, tzinfo=tz) | |
| occurrence = next_occurrence(start_dt, rule, today_dt) | |
| if not occurrence: | |
| return None | |
| if until_date and occurrence.date() > until_date: | |
| return None | |
| duration = end_dt - start_dt | |
| new_start = occurrence | |
| new_end = new_start + duration | |
| updated = dict(event) | |
| updated["DTSTART"] = format_ics_like(start_raw, new_start) | |
| updated["DTEND"] = format_ics_like(end_raw, new_end) | |
| return updated | |
| def extract_scanned_initials(description: str) -> Optional[str]: | |
| first_line = description.splitlines()[0].strip() if description else "" | |
| if not first_line: | |
| return None | |
| if first_line in {"Clinical - IP", "Clinical IP"}: | |
| return None | |
| parts = first_line.split() | |
| if not parts: | |
| return None | |
| candidate = parts[-1] | |
| if len(candidate) in (2, 3) and candidate.isalpha() and candidate.isupper(): | |
| return candidate | |
| if "-" in candidate: | |
| tail = candidate.rsplit("-", 1)[-1] | |
| if len(tail) in (2, 3) and tail.isalpha() and tail.isupper(): | |
| return tail | |
| return None | |
| def parse_clinical_prefix(description: str) -> tuple[Optional[str], Optional[str], Optional[str], str]: | |
| lines = description.splitlines() if description else [] | |
| if not lines: | |
| return None, None, None, "" | |
| first = lines[0] | |
| match = re.match( | |
| r"^\s*(?P<pat>[A-Za-z0-9]+)\s*-\s*(?P<body>.+?)\s*-\s*(?P<spi>[A-Za-z]{2,3})(?P<rest>.*)$", | |
| first, | |
| ) | |
| if not match: | |
| return None, None, None, description | |
| pat = match.group("pat").strip() | |
| body = match.group("body").strip() | |
| spi = match.group("spi").strip() | |
| rest = match.group("rest").strip().lstrip("-").strip() | |
| new_lines: list[str] = [] | |
| if rest: | |
| new_lines.append(rest) | |
| new_lines.extend(lines[1:]) | |
| return pat, body, spi, "\n".join(new_lines).strip() | |
| def input_with_default(label: str, current: str) -> str: | |
| prompt = f"{label}: " | |
| if HAS_PROMPT_TOOLKIT: | |
| response = pt_prompt(prompt, default=current or "").strip() | |
| else: | |
| prompt = f"{label} [{current}]: " if current else f"{label}: " | |
| response = input(prompt).strip() | |
| if response == "": | |
| return current | |
| if response == "-": | |
| return "" | |
| return response | |
| def contains_spi_blocker(text: str) -> bool: | |
| if not text: | |
| return False | |
| lowered = text.lower() | |
| if "acr qa" in lowered: | |
| return True | |
| if "hd cleanup" in lowered or "hd clean up" in lowered: | |
| return True | |
| return False | |
| def has_contrast_indicator(text: str) -> bool: | |
| if not text: | |
| return False | |
| lowered = text.lower() | |
| if "no contrast" in lowered: | |
| return False | |
| if "non contr" in lowered: | |
| return False | |
| if "contrast" in lowered: | |
| return True | |
| patterns = [ | |
| r"w\s*/\s*o\s*&\s*w", | |
| r"w\s*&\s*w\s*/\s*o", | |
| r"w\s*/\s*o\s*\+\s*w", | |
| r"w\s*\+\s*w\s*/\s*o", | |
| r"w\s*/\s*wo", | |
| r"wo\s*/\s*w", | |
| r"w\s*-\s*wo", | |
| r"wo\s*-\s*w", | |
| ] | |
| for pattern in patterns: | |
| if re.search(pattern, lowered): | |
| return True | |
| return False | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Import Clinical events from a Calcium .Events file" | |
| ) | |
| parser.add_argument( | |
| "input", | |
| nargs="?", | |
| help="Input Calcium .Events file (defaults based on --resource)", | |
| ) | |
| parser.add_argument( | |
| "--preferences", | |
| help="Optional Calcium .Preferences file to resolve time periods", | |
| ) | |
| parser.add_argument( | |
| "--resource", | |
| default=DEFAULT_RESOURCE, | |
| help="Resource name (default: P1 Prisma)", | |
| ) | |
| parser.add_argument( | |
| "--timezone", | |
| default=DEFAULT_TZ, | |
| help="Timezone for Calcium events (default: America/New_York)", | |
| ) | |
| parser.add_argument( | |
| "--yes", | |
| action="store_true", | |
| help="Create all events without prompting", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be created without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--list", | |
| action="store_true", | |
| help="List events without creating reservations", | |
| ) | |
| args = parser.parse_args() | |
| input_path_value = args.input | |
| if not input_path_value: | |
| input_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource) | |
| if not input_path_value: | |
| raise SystemExit( | |
| f"No default Calcium .Events for resource '{args.resource}'. " | |
| "Provide an input file." | |
| ) | |
| args.input = input_path_value | |
| events_path = Path(args.input) | |
| prefs_path = Path(args.preferences) if args.preferences else None | |
| tz = ZoneInfo(args.timezone) | |
| today_date = datetime.now(tz).date() | |
| vevents = build_vevents( | |
| events_path, | |
| category_filter="Clinical", | |
| tz_name=args.timezone, | |
| preferences_path=prefs_path, | |
| use_utc=True, | |
| ) | |
| events = expand_exdates([item.event for item in vevents], tz, args.debug) | |
| adjusted_events = [] | |
| for event in events: | |
| adjusted = adjust_event_to_today(event, tz, today_date) | |
| if not adjusted: | |
| continue | |
| adjusted_events.append(adjusted) | |
| events = adjusted_events | |
| if not events: | |
| print("No matching clinical events found.") | |
| return 0 | |
| if args.list or args.dry_run: | |
| for idx, event in enumerate(events, start=1): | |
| start_local = local_datetime_from_ics(event.get("DTSTART", ""), tz) | |
| end_local = local_datetime_from_ics(event.get("DTEND", ""), tz) | |
| summary = event.get("SUMMARY", "").strip() | |
| print( | |
| f"{idx}. {start_local.strftime('%Y-%m-%d %H:%M')} -> " | |
| f"{end_local.strftime('%H:%M')} | {summary}" | |
| ) | |
| return 0 | |
| api = BookedAPI() | |
| resource = api.find_resource_by_name(args.resource) | |
| if not resource: | |
| print(f"Resource '{args.resource}' not found.") | |
| return 1 | |
| pi_value = api.find_attribute_value(PI_ATTRIBUTE_ID, "Clinical") or "Clinical" | |
| if not args.yes: | |
| response = input("Create these clinical events? [Y]es / [n]o: ").strip().lower() | |
| if response in {"n", "no"}: | |
| print("Aborted.") | |
| return 0 | |
| for idx, event in enumerate(events, start=1): | |
| description = event.get("SUMMARY", "") | |
| pat, body, spi, description_clean = parse_clinical_prefix(description) | |
| if spi and contains_spi_blocker(description): | |
| spi = None | |
| if not spi: | |
| spi = extract_scanned_initials(description) | |
| if not args.yes: | |
| print(f"\nEvent {idx}:") | |
| pat = input_with_default("PAT (Patient Source)", pat or "") | |
| body = input_with_default("Body part", body or "") | |
| spi = input_with_default("SPI (Scanned Person Initials)", spi or "") | |
| description_clean = input_with_default("SUM (Description)", description_clean or "") | |
| custom_attributes = [ | |
| {"attributeId": PI_ATTRIBUTE_ID, "attributeValue": pi_value} | |
| ] | |
| if pat: | |
| custom_attributes.append( | |
| { | |
| "attributeId": PATIENT_SOURCE_ATTRIBUTE_ID, | |
| "attributeValue": pat, | |
| } | |
| ) | |
| if body: | |
| custom_attributes.append( | |
| { | |
| "attributeId": BODY_PART_ATTRIBUTE_ID, | |
| "attributeValue": body, | |
| } | |
| ) | |
| if spi: | |
| custom_attributes.append( | |
| { | |
| "attributeId": SCANNED_INITIALS_ATTRIBUTE_ID, | |
| "attributeValue": spi, | |
| } | |
| ) | |
| if has_contrast_indicator(description): | |
| custom_attributes.append( | |
| { | |
| "attributeId": CONTRAST_ATTRIBUTE_ID, | |
| "attributeValue": "Yes", | |
| } | |
| ) | |
| result = api.create_reservation( | |
| resource_id=resource["resourceId"], | |
| start_datetime=ics_datetime_to_iso(event.get("DTSTART", "")), | |
| end_datetime=ics_datetime_to_iso(event.get("DTEND", "")), | |
| title="", | |
| description=description_clean, | |
| custom_attributes=custom_attributes if custom_attributes else None, | |
| recurrence_rule=parse_rrule( | |
| event.get("RRULE", ""), | |
| local_datetime_from_ics(event.get("DTSTART", ""), tz).replace(tzinfo=None), | |
| ) | |
| if event.get("RRULE") | |
| else None, | |
| ) | |
| reference = result.get("referenceNumber") | |
| if not reference: | |
| errors = result.get("errors", result) | |
| print(f"Create failed for item {idx}: {errors}") | |
| else: | |
| print(f"Created {reference}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| import urllib.request | |
| from dataclasses import replace | |
| from datetime import datetime, time, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| from zoneinfo import ZoneInfo | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402 | |
| native_root = Path(__file__).resolve().parent | |
| sys.path.insert(0, str(native_root)) | |
| from calcium_events import CalciumEvent, build_vevent, parse_calcium_events # noqa: E402 | |
| DEFAULT_RESOURCE = "Coverage" | |
| DEFAULT_INPUT = "data/Technologist.Events" | |
| DEFAULT_TZ = "America/New_York" | |
| COVERAGE_ATTRIBUTE_ID = 7 | |
| COVERAGE_CATEGORIES = { | |
| "MD Coverage": "MD", | |
| "NP Coverage": "NP", | |
| } | |
| DEFAULT_START_MINUTES = 6 * 60 | |
| DEFAULT_END_MINUTES = 6 * 60 + 5 | |
| OPENAI_MODEL = "gpt-5.2" | |
| def local_datetime_from_ics(value: str, tz: ZoneInfo) -> datetime: | |
| if not value: | |
| return datetime.min.replace(tzinfo=tz) | |
| if "T" not in value: | |
| date_value = datetime.strptime(value[:8], "%Y%m%d").date() | |
| return datetime.combine(date_value, time(0, 0), tzinfo=tz) | |
| iso = ics_datetime_to_iso(value) | |
| dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z") | |
| return dt.astimezone(tz) | |
| def format_ics_like(template: str, dt: datetime) -> str: | |
| if "T" not in template: | |
| return dt.strftime("%Y%m%d") | |
| if template.endswith("Z"): | |
| utc = dt.astimezone(ZoneInfo("UTC")) | |
| return utc.strftime("%Y%m%dT%H%M%SZ") | |
| return dt.strftime("%Y%m%dT%H%M%S") | |
| def parse_termination_date(value: str | None, tz: ZoneInfo) -> Optional[datetime.date]: | |
| if not value: | |
| return None | |
| if value.endswith("Z"): | |
| value = value[:-1] + "+0000" | |
| if value[-3] == ":": | |
| value = value[:-3] + value[-2:] | |
| return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date() | |
| def add_months(date_value: datetime.date, months: int) -> datetime.date: | |
| year = date_value.year + (date_value.month - 1 + months) // 12 | |
| month = (date_value.month - 1 + months) % 12 + 1 | |
| day = date_value.day | |
| while True: | |
| try: | |
| return datetime(year, month, day).date() | |
| except ValueError: | |
| day -= 1 | |
| def next_occurrence(start_dt: datetime, rule: dict, now_dt: datetime) -> Optional[datetime]: | |
| interval = int(rule.get("interval", 1)) | |
| occurrence = start_dt | |
| if rule.get("type") == "daily": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| steps = delta_days // interval | |
| occurrence = occurrence + timedelta(days=steps * interval) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval) | |
| elif rule.get("type") == "weekly": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| weeks = delta_days // 7 | |
| steps = (weeks // interval) * interval | |
| occurrence = occurrence + timedelta(days=steps * 7) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval * 7) | |
| elif rule.get("type") == "monthly": | |
| if now_dt > occurrence: | |
| months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month) | |
| steps = (months // interval) * interval | |
| occurrence_date = add_months(occurrence.date(), steps) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| if occurrence < now_dt: | |
| occurrence_date = add_months(occurrence.date(), interval) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| elif rule.get("type") == "yearly": | |
| if now_dt > occurrence: | |
| years = now_dt.year - occurrence.year | |
| steps = (years // interval) * interval | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + steps) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + steps, day=28) | |
| if occurrence < now_dt: | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + interval) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + interval, day=28) | |
| else: | |
| return None | |
| return occurrence | |
| def adjust_event_to_today(event: dict, tz: ZoneInfo, today_date: datetime.date) -> Optional[dict]: | |
| start_raw = event.get("DTSTART", "") | |
| end_raw = event.get("DTEND", "") | |
| if not start_raw or not end_raw: | |
| return None | |
| start_dt = local_datetime_from_ics(start_raw, tz) | |
| end_dt = local_datetime_from_ics(end_raw, tz) | |
| rrule = event.get("RRULE") | |
| if not rrule: | |
| if start_dt.date() < today_date: | |
| return None | |
| return event | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) | |
| until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) | |
| if until_date and today_date > until_date: | |
| return None | |
| if start_dt.date() >= today_date: | |
| return event | |
| today_dt = datetime.combine(today_date, time.min, tzinfo=tz) | |
| occurrence = next_occurrence(start_dt, rule, today_dt) | |
| if not occurrence: | |
| return None | |
| if until_date and occurrence.date() > until_date: | |
| return None | |
| duration = end_dt - start_dt | |
| new_start = occurrence | |
| new_end = new_start + duration | |
| updated = dict(event) | |
| updated["DTSTART"] = format_ics_like(start_raw, new_start) | |
| updated["DTEND"] = format_ics_like(end_raw, new_end) | |
| return updated | |
| def coverage_type_for_event(event: CalciumEvent) -> Optional[str]: | |
| for category, coverage_type in COVERAGE_CATEGORIES.items(): | |
| if category in event.categories: | |
| return coverage_type | |
| return None | |
| def normalize_coverage_event(event: CalciumEvent) -> CalciumEvent: | |
| return replace( | |
| event, | |
| start_time=DEFAULT_START_MINUTES, | |
| end_time=DEFAULT_END_MINUTES, | |
| ) | |
| def build_items( | |
| events: List[CalciumEvent], | |
| tz: ZoneInfo, | |
| today_date: datetime.date, | |
| debug: bool, | |
| ) -> List[Tuple[dict, str]]: | |
| items: List[Tuple[dict, str]] = [] | |
| for event in events: | |
| coverage_type = coverage_type_for_event(event) | |
| if not coverage_type: | |
| continue | |
| normalized = normalize_coverage_event(event) | |
| category_value = "MD Coverage" if coverage_type == "MD" else "NP Coverage" | |
| vevents = build_vevent(normalized, tz, category_value, use_utc=True, debug=debug) | |
| for vevent in vevents: | |
| adjusted = adjust_event_to_today(vevent, tz, today_date) | |
| if not adjusted: | |
| continue | |
| items.append((adjusted, coverage_type)) | |
| return items | |
| def format_list_entry(index: int, event: dict, coverage_type: str, tz: ZoneInfo) -> str: | |
| start_local = local_datetime_from_ics(event.get("DTSTART", ""), tz) | |
| end_local = local_datetime_from_ics(event.get("DTEND", ""), tz) | |
| summary = event.get("SUMMARY", "").strip() | |
| return ( | |
| f"{index}. {start_local.strftime('%Y-%m-%d %H:%M')} -> " | |
| f"{end_local.strftime('%H:%M')} | {coverage_type} | {summary}" | |
| ) | |
| def parse_time_value(value: Optional[str]) -> Optional[time]: | |
| if not value: | |
| return None | |
| raw = value.strip() | |
| if not raw: | |
| return None | |
| for fmt in ("%H:%M", "%H%M"): | |
| try: | |
| return datetime.strptime(raw, fmt).time() | |
| except ValueError: | |
| continue | |
| return None | |
| def normalize_name(value: str) -> str: | |
| return " ".join(value.replace(",", " ").split()).strip().lower() | |
| def build_user_maps(users: List[Dict]) -> tuple[Dict[str, int], Dict[str, int]]: | |
| email_map: Dict[str, int] = {} | |
| name_map: Dict[str, int] = {} | |
| for user in users: | |
| user_id = user.get("id") | |
| if user_id is None: | |
| continue | |
| email = (user.get("emailAddress") or "").strip().lower() | |
| if email: | |
| email_map[email] = user_id | |
| first = (user.get("firstName") or "").strip() | |
| last = (user.get("lastName") or "").strip() | |
| if first or last: | |
| full = normalize_name(f"{first} {last}") | |
| if full: | |
| name_map[full] = user_id | |
| if first and last: | |
| alt = normalize_name(f"{last}, {first}") | |
| if alt: | |
| name_map[alt] = user_id | |
| return email_map, name_map | |
| def resolve_participants( | |
| participants: List[str], | |
| email_map: Dict[str, int], | |
| name_map: Dict[str, int], | |
| ) -> Tuple[List[int], List[str]]: | |
| resolved: List[int] = [] | |
| unresolved: List[str] = [] | |
| seen_ids = set() | |
| for token in participants: | |
| item = token.strip() | |
| if not item: | |
| continue | |
| key = item.lower() | |
| user_id = None | |
| if "@" in key: | |
| user_id = email_map.get(key) | |
| if user_id is None: | |
| user_id = name_map.get(normalize_name(item)) | |
| if user_id is None: | |
| unresolved.append(item) | |
| continue | |
| if user_id in seen_ids: | |
| continue | |
| seen_ids.add(user_id) | |
| resolved.append(user_id) | |
| return resolved, unresolved | |
| def call_openai(description: str) -> Dict: | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| raise RuntimeError("OPENAI_API_KEY is not set in the environment.") | |
| system_msg = ( | |
| "Extract coverage details from the description. Return JSON only with keys: " | |
| "start_time (HH:MM 24h), end_time (HH:MM 24h), participants (list of names/emails), " | |
| "description_time_removed (string), description_participants_removed (string), " | |
| "description_time_and_participants_removed (string). " | |
| "Remove only what is clearly a time range for time_removed, only names/emails for " | |
| "participants_removed, and both for time_and_participants_removed. Preserve all other notes. " | |
| "If no time is present, set start_time/end_time to null and description_time_removed should " | |
| "match the original. If nothing remains, return an empty string." | |
| ) | |
| user_msg = f"Description:\\n{description}" | |
| payload = { | |
| "model": OPENAI_MODEL, | |
| "temperature": 0, | |
| "response_format": {"type": "json_object"}, | |
| "messages": [ | |
| {"role": "system", "content": system_msg}, | |
| {"role": "user", "content": user_msg}, | |
| ], | |
| } | |
| data = json.dumps(payload).encode("utf-8") | |
| request = urllib.request.Request( | |
| "https://api.openai.com/v1/chat/completions", | |
| data=data, | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {api_key}", | |
| }, | |
| ) | |
| with urllib.request.urlopen(request, timeout=60) as response: | |
| raw = response.read().decode("utf-8") | |
| parsed = json.loads(raw) | |
| choices = parsed.get("choices") or [] | |
| if not choices: | |
| return {} | |
| text = choices[0].get("message", {}).get("content", "") | |
| return json.loads(text) if text else {} | |
| def update_event_times(event: dict, tz: ZoneInfo, start: time, end: time) -> dict: | |
| start_raw = event.get("DTSTART", "") | |
| end_raw = event.get("DTEND", "") | |
| start_dt = local_datetime_from_ics(start_raw, tz) | |
| date_value = start_dt.date() | |
| start_dt = datetime.combine(date_value, start, tzinfo=tz) | |
| end_dt = datetime.combine(date_value, end, tzinfo=tz) | |
| if end_dt <= start_dt: | |
| end_dt = end_dt + timedelta(days=1) | |
| updated = dict(event) | |
| updated["DTSTART"] = format_ics_like(start_raw, start_dt) | |
| updated["DTEND"] = format_ics_like(end_raw, end_dt) | |
| return updated | |
| def format_time_12h(value: time) -> str: | |
| formatted = value.strftime("%I:%M %p") | |
| return formatted.lstrip("0") | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Import MD/NP Coverage events from a Calcium .Events file" | |
| ) | |
| parser.add_argument( | |
| "--timezone", | |
| default=DEFAULT_TZ, | |
| help="Timezone for Calcium events (default: America/New_York)", | |
| ) | |
| parser.add_argument( | |
| "--yes", | |
| action="store_true", | |
| help="Create all reservations without prompting", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be created without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--list", | |
| action="store_true", | |
| help="List events without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--debug", | |
| action="store_true", | |
| help="Log progress details to stderr", | |
| ) | |
| args = parser.parse_args() | |
| events_path = Path(DEFAULT_INPUT) | |
| prefs_path = None | |
| tz = ZoneInfo(args.timezone) | |
| today_date = datetime.now(tz).date() | |
| events = parse_calcium_events( | |
| events_path, | |
| category_filter=None, | |
| exclude_categories=None, | |
| preferences_path=prefs_path, | |
| debug=args.debug, | |
| ) | |
| items = build_items(events, tz, today_date, args.debug) | |
| if not items: | |
| print("No matching coverage events found.") | |
| return 0 | |
| if args.list or args.dry_run: | |
| for idx, (event, coverage_type) in enumerate(items, start=1): | |
| print(format_list_entry(idx, event, coverage_type, tz)) | |
| return 0 | |
| api = BookedAPI() | |
| resource = api.find_resource_by_name(DEFAULT_RESOURCE) | |
| if not resource: | |
| print(f"Resource '{DEFAULT_RESOURCE}' not found.") | |
| return 1 | |
| users = api.list_users() | |
| email_map, name_map = build_user_maps(users) | |
| rejected_ids: List[str] = [] | |
| for idx, (event, coverage_type) in enumerate(items, start=1): | |
| description = event.get("SUMMARY", "") | |
| ai_data = {} | |
| ai_error = None | |
| try: | |
| ai_data = call_openai(description) | |
| except Exception as exc: | |
| ai_error = str(exc) | |
| ai_start = parse_time_value(ai_data.get("start_time")) | |
| ai_end = parse_time_value(ai_data.get("end_time")) | |
| ai_participants = ai_data.get("participants") or [] | |
| ai_desc_time = ai_data.get("description_time_removed") | |
| ai_desc_participants = ai_data.get("description_participants_removed") | |
| ai_desc_full = ai_data.get("description_time_and_participants_removed") | |
| if not isinstance(ai_participants, list): | |
| ai_participants = [] | |
| if ai_desc_time is not None: | |
| ai_desc_time = str(ai_desc_time).strip() | |
| if ai_desc_participants is not None: | |
| ai_desc_participants = str(ai_desc_participants).strip() | |
| if ai_desc_full is not None: | |
| ai_desc_full = str(ai_desc_full).strip() | |
| if ai_start and ai_end: | |
| proposed_event = update_event_times(event, tz, ai_start, ai_end) | |
| else: | |
| proposed_event = event | |
| resolved_ids, unresolved = resolve_participants( | |
| [str(p) for p in ai_participants], email_map, name_map | |
| ) | |
| ai_description = None | |
| if ai_start and ai_end: | |
| if resolved_ids: | |
| ai_description = ai_desc_full or ai_desc_time or description | |
| else: | |
| ai_description = ai_desc_time or description | |
| else: | |
| if resolved_ids: | |
| ai_description = ai_desc_participants or description | |
| else: | |
| ai_description = description | |
| proposed_start = local_datetime_from_ics(proposed_event.get("DTSTART", ""), tz) | |
| proposed_rrule = proposed_event.get("RRULE") | |
| print("\n" + "=" * 72) | |
| print(f"Event {idx} of {len(items)}") | |
| print("-" * 72) | |
| print(f"\033[1mCoverage Type:\033[0m {coverage_type}") | |
| print(f"\033[1mDate:\033[0m {proposed_start.strftime('%Y-%m-%d')}") | |
| if proposed_rrule: | |
| print(f"\033[1mRecurrence:\033[0m {proposed_rrule}") | |
| else: | |
| print("\033[1mRecurrence:\033[0m (none)") | |
| if ai_error: | |
| print(f"\033[1mAI error:\033[0m {ai_error}") | |
| if ai_start and ai_end: | |
| print( | |
| f"\033[1mAI times:\033[0m {format_time_12h(ai_start)} -> {format_time_12h(ai_end)}" | |
| ) | |
| else: | |
| print( | |
| "\033[1mAI times:\033[0m (none found; will use 06:00 -> 06:05 if rejected)" | |
| ) | |
| if ai_participants: | |
| print( | |
| f"\033[1mAI participants:\033[0m {', '.join(str(p) for p in ai_participants)}" | |
| ) | |
| else: | |
| print("\033[1mAI participants:\033[0m (none found)") | |
| if resolved_ids: | |
| print( | |
| f"\033[1mResolved participants:\033[0m {', '.join(str(pid) for pid in resolved_ids)}" | |
| ) | |
| if unresolved: | |
| print( | |
| f"\033[1mUnresolved participants:\033[0m {', '.join(unresolved)}" | |
| ) | |
| print("\033[1mOriginal description:\033[0m") | |
| print(description) | |
| print("\033[1mAI description:\033[0m") | |
| if ai_description is None: | |
| print("(none)") | |
| else: | |
| print(ai_description) | |
| response = "y" if args.yes else input("Accept AI selection? [Y]es / [n]o / [q]uit: ").strip().lower() | |
| if response in {"q", "quit"}: | |
| print("Stopping.") | |
| break | |
| use_ai = response not in {"n", "no"} | |
| if use_ai: | |
| final_event = proposed_event | |
| participants = resolved_ids | |
| final_description = ai_description if ai_description is not None else description | |
| else: | |
| final_event = update_event_times( | |
| event, | |
| tz, | |
| time(DEFAULT_START_MINUTES // 60, DEFAULT_START_MINUTES % 60), | |
| time(DEFAULT_END_MINUTES // 60, DEFAULT_END_MINUTES % 60), | |
| ) | |
| participants = [] | |
| final_description = description | |
| start_iso = ics_datetime_to_iso(final_event.get("DTSTART", "")) | |
| end_iso = ics_datetime_to_iso(final_event.get("DTEND", "")) | |
| result = api.create_reservation( | |
| resource_id=resource["resourceId"], | |
| start_datetime=start_iso, | |
| end_datetime=end_iso, | |
| title="", | |
| description=final_description, | |
| participants=participants if participants else None, | |
| custom_attributes=[ | |
| {"attributeId": COVERAGE_ATTRIBUTE_ID, "attributeValue": coverage_type} | |
| ], | |
| recurrence_rule=parse_rrule( | |
| final_event.get("RRULE", ""), | |
| local_datetime_from_ics(final_event.get("DTSTART", ""), tz).replace(tzinfo=None), | |
| ) | |
| if final_event.get("RRULE") | |
| else None, | |
| ) | |
| reference = result.get("referenceNumber") | |
| if not reference: | |
| errors = result.get("errors", result) | |
| print(f"Create failed for item {idx}: {errors}") | |
| continue | |
| print(f"Created {reference}") | |
| if not use_ai: | |
| rejected_ids.append(reference) | |
| if rejected_ids: | |
| print("\nRejected AI selections (created with defaults):") | |
| print(", ".join(rejected_ids)) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import sys | |
| from datetime import datetime, time, timedelta | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple | |
| from zoneinfo import ZoneInfo | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402 | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) | |
| from create_scandium_event import ( # noqa: E402 | |
| PI_ATTRIBUTE_ID, | |
| SCANNED_INITIALS_ATTRIBUTE_ID, | |
| build_description, | |
| find_resource, | |
| parse_start_datetime, | |
| parse_summary, | |
| ) | |
| from create_scandium_event_multiple import ( # noqa: E402 | |
| dedupe_events, | |
| format_event_summary, | |
| log_debug, | |
| ) | |
| from native.calcium_events import build_vevents | |
| DEFAULT_RESOURCE = "P1 Prisma" | |
| DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = { | |
| "P1 Prisma": "data/3T_Prisma_P1.Events", | |
| "P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events", | |
| "4T Varian": "data/4T_Varian_Inova.Events", | |
| } | |
| DEFAULT_TZ = "America/New_York" | |
| EMAIL_DOMAIN_SWAPS = { | |
| "mgb.org": "mclean.harvard.edu", | |
| "mclean.harvard.edu": "mgb.org", | |
| } | |
| def swap_email_domain(email: str) -> Optional[str]: | |
| if "@" not in email: | |
| return None | |
| local, domain = email.rsplit("@", 1) | |
| swapped = EMAIL_DOMAIN_SWAPS.get(domain.lower()) | |
| if not swapped: | |
| return None | |
| return f"{local}@{swapped}" | |
| def local_datetime_from_ics(value: str, tz: ZoneInfo) -> datetime: | |
| if not value: | |
| return datetime.min.replace(tzinfo=tz) | |
| if "T" not in value: | |
| date_value = datetime.strptime(value[:8], "%Y%m%d").date() | |
| return datetime.combine(date_value, time(0, 0), tzinfo=tz) | |
| iso = ics_datetime_to_iso(value) | |
| dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z") | |
| return dt.astimezone(tz) | |
| def format_ics_like(template: str, dt: datetime) -> str: | |
| if "T" not in template: | |
| return dt.strftime("%Y%m%d") | |
| if template.endswith("Z"): | |
| utc = dt.astimezone(ZoneInfo("UTC")) | |
| return utc.strftime("%Y%m%dT%H%M%SZ") | |
| return dt.strftime("%Y%m%dT%H%M%S") | |
| def parse_termination_date(value: str | None, tz: ZoneInfo) -> Optional[datetime.date]: | |
| if not value: | |
| return None | |
| if value.endswith("Z"): | |
| value = value[:-1] + "+0000" | |
| if value[-3] == ":": | |
| value = value[:-3] + value[-2:] | |
| return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date() | |
| def add_months(date_value: datetime.date, months: int) -> datetime.date: | |
| year = date_value.year + (date_value.month - 1 + months) // 12 | |
| month = (date_value.month - 1 + months) % 12 + 1 | |
| day = date_value.day | |
| while True: | |
| try: | |
| return datetime(year, month, day).date() | |
| except ValueError: | |
| day -= 1 | |
| def next_occurrence(start_dt: datetime, rule: dict, now_dt: datetime) -> Optional[datetime]: | |
| interval = int(rule.get("interval", 1)) | |
| occurrence = start_dt | |
| if rule.get("type") == "daily": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| steps = delta_days // interval | |
| occurrence = occurrence + timedelta(days=steps * interval) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval) | |
| elif rule.get("type") == "weekly": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| weeks = delta_days // 7 | |
| steps = (weeks // interval) * interval | |
| occurrence = occurrence + timedelta(days=steps * 7) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval * 7) | |
| elif rule.get("type") == "monthly": | |
| if now_dt > occurrence: | |
| months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month) | |
| steps = (months // interval) * interval | |
| occurrence_date = add_months(occurrence.date(), steps) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| if occurrence < now_dt: | |
| occurrence_date = add_months(occurrence.date(), interval) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| elif rule.get("type") == "yearly": | |
| if now_dt > occurrence: | |
| years = now_dt.year - occurrence.year | |
| steps = (years // interval) * interval | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + steps) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + steps, day=28) | |
| if occurrence < now_dt: | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + interval) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + interval, day=28) | |
| else: | |
| return None | |
| return occurrence | |
| def adjust_event_to_today(event: dict, tz: ZoneInfo, today_date: datetime.date) -> Optional[dict]: | |
| start_raw = event.get("DTSTART", "") | |
| end_raw = event.get("DTEND", "") | |
| if not start_raw or not end_raw: | |
| return None | |
| start_dt = local_datetime_from_ics(start_raw, tz) | |
| end_dt = local_datetime_from_ics(end_raw, tz) | |
| rrule = event.get("RRULE") | |
| if not rrule: | |
| if start_dt.date() < today_date: | |
| return None | |
| return event | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) | |
| until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) | |
| if until_date and today_date > until_date: | |
| return None | |
| if start_dt.date() >= today_date: | |
| return event | |
| today_dt = datetime.combine(today_date, time.min, tzinfo=tz) | |
| occurrence = next_occurrence(start_dt, rule, today_dt) | |
| if not occurrence: | |
| return None | |
| if until_date and occurrence.date() > until_date: | |
| return None | |
| duration = end_dt - start_dt | |
| new_start = occurrence | |
| new_end = new_start + duration | |
| updated = dict(event) | |
| updated["DTSTART"] = format_ics_like(start_raw, new_start) | |
| updated["DTEND"] = format_ics_like(end_raw, new_end) | |
| return updated | |
| def build_reservation_payload( | |
| api: BookedAPI, event: dict, resource: dict | |
| ) -> Tuple[dict, dict]: | |
| dtstart = event.get("DTSTART", "") | |
| dtend = event.get("DTEND", "") | |
| if not dtstart or not dtend: | |
| raise ValueError("DTSTART and DTEND are required") | |
| start_iso = ics_datetime_to_iso(dtstart) | |
| end_iso = ics_datetime_to_iso(dtend) | |
| start_dt = parse_start_datetime(dtstart) | |
| summary = event.get("SUMMARY", "Untitled") | |
| title, participant_info, confirmed_initials = parse_summary(summary) | |
| categories = event.get("CATEGORIES", "") | |
| if not categories: | |
| raise ValueError("CATEGORIES is required to map Principal Investigator") | |
| matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories) | |
| if not matched_pi and categories.strip(): | |
| first_word = categories.strip().split()[0] | |
| matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, first_word) | |
| if not matched_pi: | |
| raise ValueError( | |
| f"CATEGORIES '{categories}' did not match a Principal Investigator value" | |
| ) | |
| custom_attributes = [ | |
| {"attributeId": PI_ATTRIBUTE_ID, "attributeValue": matched_pi} | |
| ] | |
| if confirmed_initials: | |
| custom_attributes.append( | |
| { | |
| "attributeId": SCANNED_INITIALS_ATTRIBUTE_ID, | |
| "attributeValue": confirmed_initials, | |
| } | |
| ) | |
| participant_ids: List[int] = [] | |
| seen_user_ids: set[int] = set() | |
| matched_emails: List[str] = [] | |
| unmatched_emails: List[str] = [] | |
| for _, email in participant_info: | |
| user = api.find_user_by_email(email) | |
| if not user: | |
| swapped = swap_email_domain(email) | |
| if swapped: | |
| user = api.find_user_by_email(swapped) | |
| if user: | |
| user_id = int(user["id"]) | |
| if user_id not in seen_user_ids: | |
| participant_ids.append(user_id) | |
| seen_user_ids.add(user_id) | |
| matched_emails.append(email) | |
| else: | |
| unmatched_emails.append(email) | |
| description = build_description(event.get("DESCRIPTION", ""), unmatched_emails) | |
| description = description if description else None | |
| recurrence_rule = None | |
| rrule = event.get("RRULE", "") | |
| if rrule: | |
| recurrence_rule = parse_rrule(rrule, start_dt) | |
| payload = { | |
| "resource_id": resource["resourceId"], | |
| "start_datetime": start_iso, | |
| "end_datetime": end_iso, | |
| "title": title, | |
| "description": description, | |
| "participants": participant_ids if participant_ids else None, | |
| "custom_attributes": custom_attributes, | |
| "recurrence_rule": recurrence_rule, | |
| } | |
| preview = { | |
| "resource_name": resource["name"], | |
| "resource_id": resource["resourceId"], | |
| "title": title, | |
| "start_iso": start_iso, | |
| "end_iso": end_iso, | |
| "matched_pi": matched_pi, | |
| "confirmed_initials": confirmed_initials, | |
| "matched_emails": matched_emails, | |
| "unmatched_emails": unmatched_emails, | |
| "recurrence_rule": recurrence_rule, | |
| } | |
| return payload, preview | |
| def print_dry_run(preview: dict) -> None: | |
| print("Dry run: reservation not created") | |
| print(f"Resource: {preview['resource_name']} (ID {preview['resource_id']})") | |
| print(f"Title: {preview['title']}") | |
| print(f"Start: {preview['start_iso']}") | |
| print(f"End: {preview['end_iso']}") | |
| print(f"Principal Investigator: {preview['matched_pi']}") | |
| confirmed = preview.get("confirmed_initials") | |
| if confirmed: | |
| print(f"Scanned Person Initials: {confirmed}") | |
| matched = preview.get("matched_emails") or [] | |
| if matched: | |
| print(f"Participants: {', '.join(matched)}") | |
| unmatched = preview.get("unmatched_emails") or [] | |
| if unmatched: | |
| print(f"Unreg: {', '.join(unmatched)}") | |
| recurrence = preview.get("recurrence_rule") | |
| if recurrence: | |
| print(f"Recurrence: {recurrence}") | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description=( | |
| "Import Calcium .Events data with dedupe, preview each with --dry-run, " | |
| "and optionally create it in Scandium." | |
| ) | |
| ) | |
| parser.add_argument( | |
| "input", | |
| nargs="?", | |
| help="Input Calcium .Events file (defaults based on --resource)", | |
| ) | |
| parser.add_argument( | |
| "--category", | |
| help="Calcium category to import (exact match)", | |
| ) | |
| parser.add_argument( | |
| "--exclude-category", | |
| action="append", | |
| help=( | |
| "Calcium category to exclude (repeatable). If set without --category, " | |
| "imports all categories except these." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--preferences", | |
| help="Optional Calcium .Preferences file to resolve time periods", | |
| ) | |
| parser.add_argument( | |
| "--resource", | |
| default=DEFAULT_RESOURCE, | |
| help="Resource name (default: P1 Prisma)", | |
| ) | |
| parser.add_argument( | |
| "--timezone", | |
| default=DEFAULT_TZ, | |
| help="Timezone for Calcium events (default: America/New_York)", | |
| ) | |
| parser.add_argument( | |
| "--yes", | |
| action="store_true", | |
| help="Create all events without prompting", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show deduped events without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--list", | |
| action="store_true", | |
| help="List deduped events without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--debug", | |
| action="store_true", | |
| help="Log progress details to stderr", | |
| ) | |
| args = parser.parse_args() | |
| input_path_value = args.input | |
| if not input_path_value: | |
| input_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource) | |
| if not input_path_value: | |
| raise SystemExit( | |
| f"No default Calcium .Events for resource '{args.resource}'. " | |
| "Provide an input file." | |
| ) | |
| args.input = input_path_value | |
| if args.category and args.exclude_category: | |
| parser.error("Use either --category or --exclude-category, not both.") | |
| if not args.category and not args.exclude_category: | |
| parser.error("Provide --category or at least one --exclude-category.") | |
| events_path = Path(args.input) | |
| prefs_path = Path(args.preferences) if args.preferences else None | |
| exclude_categories: List[str] = [] | |
| if args.exclude_category: | |
| for value in args.exclude_category: | |
| for piece in value.split(","): | |
| item = piece.strip() | |
| if item: | |
| exclude_categories.append(item) | |
| log_debug(args.debug, f"Reading Calcium events: {events_path}") | |
| vevents = build_vevents( | |
| events_path, | |
| category_filter=args.category, | |
| tz_name=args.timezone, | |
| preferences_path=prefs_path, | |
| exclude_categories=exclude_categories, | |
| debug=args.debug, | |
| use_utc=True, | |
| ) | |
| if not vevents: | |
| print("No matching events found.") | |
| return 1 | |
| tz = ZoneInfo(args.timezone) | |
| today_date = datetime.now(tz).date() | |
| events = [] | |
| for item in vevents: | |
| adjusted = adjust_event_to_today(item.event, tz, today_date) | |
| if not adjusted: | |
| continue | |
| events.append({"event": adjusted, "lines": item.lines}) | |
| log_debug(args.debug, f"Parsed {len(events)} events") | |
| log_debug(args.debug, "Starting dedupe (API-resolved)") | |
| api = BookedAPI() | |
| kept, removed = dedupe_events(events, api, args.resource, args.debug) | |
| if removed: | |
| print(f"Removed {len(removed)} duplicate events:") | |
| for item in removed: | |
| print(f"- {format_event_summary(item['event'])}") | |
| log_debug(args.debug, f"Deduped events: kept={len(kept)}, removed={len(removed)}") | |
| if args.dry_run: | |
| resource = find_resource(api, args.resource) | |
| for idx, item in enumerate(kept, start=1): | |
| print("\n" + "=" * 72) | |
| print(f"Event {idx} of {len(kept)}") | |
| print("-" * 72) | |
| try: | |
| _, preview = build_reservation_payload(api, item["event"], resource) | |
| except ValueError as exc: | |
| print(f"Skipping event: {exc}") | |
| continue | |
| print_dry_run(preview) | |
| log_debug(args.debug, "Dry run enabled; exiting before creation") | |
| return 0 | |
| if args.list: | |
| for item in kept: | |
| print(f"- {format_event_summary(item['event'])}") | |
| return 0 | |
| resource = find_resource(api, args.resource) | |
| for idx, item in enumerate(kept, start=1): | |
| event = item["event"] | |
| print("\n" + "=" * 72) | |
| print(f"Event {idx} of {len(kept)}") | |
| print("-" * 72) | |
| try: | |
| payload, preview = build_reservation_payload(api, event, resource) | |
| except ValueError as exc: | |
| print(f"Skipping event: {exc}") | |
| continue | |
| print_dry_run(preview) | |
| if not args.yes: | |
| response = input("Create this event? [Y]es / [n]o / [q]uit: ").strip().lower() | |
| if response in {"q", "quit"}: | |
| print("Stopping.") | |
| return 0 | |
| if response in {"n", "no"}: | |
| print("Skipped.") | |
| continue | |
| log_debug(args.debug, f"Creating event {idx}") | |
| result = api.create_reservation( | |
| resource_id=payload["resource_id"], | |
| start_datetime=payload["start_datetime"], | |
| end_datetime=payload["end_datetime"], | |
| title=payload["title"], | |
| description=payload["description"], | |
| participants=payload["participants"], | |
| custom_attributes=payload["custom_attributes"], | |
| recurrence_rule=payload["recurrence_rule"], | |
| ) | |
| reference = result.get("referenceNumber") | |
| if not reference: | |
| errors = result.get("errors", result) | |
| print(f"Create failed: {errors}") | |
| else: | |
| print(f"Created ({reference}).") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import html | |
| import sys | |
| from datetime import datetime, time, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| from zoneinfo import ZoneInfo | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402 | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) | |
| from create_scandium_event import parse_start_datetime # noqa: E402 | |
| from create_scandium_event_multiple import expand_exdates, get_resource_timezone, parse_exdate_dates # noqa: E402 | |
| from native.calcium_events import build_vevents | |
| DEFAULT_RESOURCE = "P1 Prisma" | |
| DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = { | |
| "P1 Prisma": "data/3T_Prisma_P1.Events", | |
| "P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events", | |
| "4T Varian": "data/4T_Varian_Inova.Events", | |
| } | |
| PI_ATTRIBUTE_ID = 1 | |
| def normalize_summary(summary: str) -> str: | |
| normalized = summary.replace("\\n", "\n") | |
| normalized = normalized.replace("\\,", ",").replace("\\;", ";").replace("\\\\", "\\") | |
| normalized = html.unescape(normalized) | |
| return normalized.strip() | |
| def local_datetime_from_ics(value: str, tz: ZoneInfo) -> datetime: | |
| if not value: | |
| return datetime.min.replace(tzinfo=tz) | |
| if "T" not in value: | |
| date_value = datetime.strptime(value[:8], "%Y%m%d").date() | |
| return datetime.combine(date_value, datetime.min.time(), tzinfo=tz) | |
| iso = ics_datetime_to_iso(value) | |
| dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z") | |
| return dt.astimezone(tz) | |
| def local_iso_datetime(value: str, tz: ZoneInfo) -> str: | |
| if not value: | |
| return "" | |
| local_dt = local_datetime_from_ics(value, tz) | |
| return local_dt.strftime("%Y-%m-%dT%H:%M:%S%z") | |
| def format_ics_like(template: str, dt: datetime) -> str: | |
| if "T" not in template: | |
| return dt.strftime("%Y%m%d") | |
| if template.endswith("Z"): | |
| utc = dt.astimezone(ZoneInfo("UTC")) | |
| return utc.strftime("%Y%m%dT%H%M%SZ") | |
| return dt.strftime("%Y%m%dT%H%M%S") | |
| def truncate_end(start_iso: str, end_iso: str, max_end_iso: Optional[str]) -> str: | |
| if not max_end_iso: | |
| return end_iso | |
| start_dt = datetime.strptime(start_iso, "%Y-%m-%dT%H:%M:%S%z") | |
| end_dt = datetime.strptime(end_iso, "%Y-%m-%dT%H:%M:%S%z") | |
| max_end_dt = datetime.strptime(max_end_iso, "%Y-%m-%dT%H:%M:%S%z") | |
| if end_dt <= max_end_dt: | |
| return end_iso | |
| if max_end_dt <= start_dt: | |
| return end_iso | |
| return max_end_iso | |
| def format_dry_run_entry(index: int, event: Dict[str, str], payload: Dict[str, str]) -> str: | |
| rrule = event.get("RRULE") | |
| recurring = "yes" if rrule else "no" | |
| pieces = [ | |
| f"{index}. {payload['startDateTime']} -> {payload['endDateTime']}", | |
| f"recurring={recurring}", | |
| ] | |
| if rrule: | |
| pieces.append(f"RRULE={rrule}") | |
| return " | ".join(pieces) | |
| def resolve_service_value(api: BookedAPI, override_value: Optional[str]) -> str: | |
| if override_value: | |
| return override_value | |
| return api.find_attribute_value(PI_ATTRIBUTE_ID, "Service") or "Service" | |
| def add_months(date_value: datetime.date, months: int) -> datetime.date: | |
| year = date_value.year + (date_value.month - 1 + months) // 12 | |
| month = (date_value.month - 1 + months) % 12 + 1 | |
| day = date_value.day | |
| while True: | |
| try: | |
| return datetime(year, month, day).date() | |
| except ValueError: | |
| day -= 1 | |
| def next_occurrence( | |
| start_dt: datetime, | |
| rule: Dict, | |
| now_dt: datetime, | |
| excluded_dates: set[datetime.date], | |
| ) -> Optional[datetime]: | |
| interval = int(rule.get("interval", 1)) | |
| occurrence = start_dt | |
| if rule.get("type") == "daily": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| steps = delta_days // interval | |
| occurrence = occurrence + timedelta(days=steps * interval) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval) | |
| elif rule.get("type") == "weekly": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| weeks = delta_days // 7 | |
| steps = (weeks // interval) * interval | |
| occurrence = occurrence + timedelta(days=steps * 7) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval * 7) | |
| elif rule.get("type") == "monthly": | |
| if now_dt > occurrence: | |
| months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month) | |
| steps = (months // interval) * interval | |
| occurrence_date = add_months(occurrence.date(), steps) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| if occurrence < now_dt: | |
| occurrence_date = add_months(occurrence.date(), interval) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| elif rule.get("type") == "yearly": | |
| if now_dt > occurrence: | |
| years = now_dt.year - occurrence.year | |
| steps = (years // interval) * interval | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + steps) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + steps, day=28) | |
| if occurrence < now_dt: | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + interval) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + interval, day=28) | |
| else: | |
| return None | |
| attempts = 0 | |
| while occurrence.date() in excluded_dates and attempts < 500: | |
| if rule.get("type") == "daily": | |
| occurrence += timedelta(days=interval) | |
| elif rule.get("type") == "weekly": | |
| occurrence += timedelta(days=interval * 7) | |
| elif rule.get("type") == "monthly": | |
| occurrence_date = add_months(occurrence.date(), interval) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| elif rule.get("type") == "yearly": | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + interval) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + interval, day=28) | |
| attempts += 1 | |
| return occurrence | |
| def adjust_event_to_today( | |
| event: Dict[str, str], | |
| tz: ZoneInfo, | |
| today_date: datetime.date, | |
| ) -> Optional[Dict[str, str]]: | |
| start_raw = event.get("DTSTART", "") | |
| end_raw = event.get("DTEND", "") | |
| if not start_raw or not end_raw: | |
| return None | |
| start_dt = local_datetime_from_ics(start_raw, tz) | |
| end_dt = local_datetime_from_ics(end_raw, tz) | |
| rrule = event.get("RRULE") | |
| exdates = parse_exdate_dates(event.get("EXDATE", ""), tz) | |
| if not rrule: | |
| if start_dt.date() < today_date: | |
| return None | |
| return event | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) | |
| termination = rule.get("repeatTerminationDate") | |
| until_date = None | |
| if termination: | |
| until_date = datetime.strptime( | |
| termination, "%Y-%m-%dT%H:%M:%S%z" | |
| ).astimezone(tz).date() | |
| if until_date and today_date > until_date: | |
| return None | |
| if start_dt.date() >= today_date: | |
| return event | |
| today_dt = datetime.combine(today_date, time.min, tzinfo=tz) | |
| occurrence = next_occurrence(start_dt, rule, today_dt, exdates) | |
| if not occurrence: | |
| return None | |
| if until_date and occurrence.date() > until_date: | |
| return None | |
| duration = end_dt - start_dt | |
| new_start = occurrence | |
| new_end = new_start + duration | |
| updated = dict(event) | |
| updated["DTSTART"] = format_ics_like(start_raw, new_start) | |
| updated["DTEND"] = format_ics_like(end_raw, new_end) | |
| return updated | |
| def event_has_future_occurrence(event: Dict[str, str], now_dt: datetime) -> bool: | |
| tz = now_dt.tzinfo or ZoneInfo("America/New_York") | |
| start_raw = event.get("DTSTART", "") | |
| if not start_raw: | |
| return False | |
| start_dt = local_datetime_from_ics(start_raw, tz) | |
| rrule = event.get("RRULE") | |
| exdates = parse_exdate_dates(event.get("EXDATE", ""), tz) | |
| if not rrule: | |
| return start_dt >= now_dt | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) | |
| termination = rule.get("repeatTerminationDate") | |
| until_date = None | |
| if termination: | |
| until_date = datetime.strptime( | |
| termination, "%Y-%m-%dT%H:%M:%S%z" | |
| ).astimezone(tz).date() | |
| occurrence = next_occurrence(start_dt, rule, now_dt, exdates) | |
| if not occurrence: | |
| return False | |
| if until_date and occurrence.date() > until_date: | |
| return False | |
| return True | |
| def build_payload( | |
| event: Dict[str, str], | |
| resource_id: int, | |
| user_id: int, | |
| service_value: str, | |
| tz: ZoneInfo, | |
| max_end_iso: Optional[str] = None, | |
| ) -> Dict: | |
| start_iso = local_iso_datetime(event.get("DTSTART", ""), tz) | |
| end_iso = local_iso_datetime(event.get("DTEND", ""), tz) | |
| end_iso = truncate_end(start_iso, end_iso, max_end_iso) | |
| summary = event.get("SUMMARY", "") | |
| payload = { | |
| "resourceId": resource_id, | |
| "userId": user_id, | |
| "startDateTime": start_iso, | |
| "endDateTime": end_iso, | |
| "title": "", | |
| "description": normalize_summary(summary), | |
| "customAttributes": [ | |
| {"attributeId": PI_ATTRIBUTE_ID, "attributeValue": service_value} | |
| ], | |
| } | |
| rrule = event.get("RRULE") | |
| if rrule: | |
| start_raw = event.get("DTSTART", "") | |
| if start_raw: | |
| local_start = local_datetime_from_ics(start_raw, tz).replace(tzinfo=None) | |
| else: | |
| local_start = parse_start_datetime(start_raw) | |
| payload["recurrenceRule"] = parse_rrule(rrule, local_start) | |
| if payload["recurrenceRule"]["type"] == "monthly": | |
| payload["recurrenceRule"].setdefault("monthlyType", "dayOfMonth") | |
| payload["recurrenceRule"].setdefault("dayOfMonth", local_start.day) | |
| return payload | |
| def get_last_reservable_end( | |
| api: BookedAPI, | |
| schedule_id: int, | |
| resource_id: int, | |
| date_value: datetime.date, | |
| cache: dict, | |
| ) -> Optional[str]: | |
| key = (schedule_id, resource_id, date_value) | |
| if key in cache: | |
| return cache[key] | |
| date_str = date_value.strftime("%Y-%m-%d") | |
| slots = api.get_schedule_slots(schedule_id, date_str, date_str, resource_id=resource_id) | |
| max_end = None | |
| for date_entry in slots.get("dates", []): | |
| for resource in date_entry.get("resources", []): | |
| for slot in resource.get("slots", []): | |
| if slot.get("isReservable"): | |
| end_time = slot.get("endDateTime") | |
| if end_time and (max_end is None or end_time > max_end): | |
| max_end = end_time | |
| cache[key] = max_end | |
| return max_end | |
| def delete_service_reservations( | |
| api: BookedAPI, | |
| start_date: str, | |
| end_date: str, | |
| target_pi_value: str, | |
| ) -> None: | |
| resources = api.list_resources() | |
| for resource in resources: | |
| resource_id = resource.get("resourceId") | |
| if not resource_id: | |
| continue | |
| reservations = api.list_reservations( | |
| resource_id=resource_id, | |
| start_date=start_date, | |
| end_date=end_date, | |
| ) | |
| for res in reservations: | |
| attrs = res.get("customAttributes") or [] | |
| pi_value = None | |
| for attr in attrs: | |
| if attr.get("id") == PI_ATTRIBUTE_ID: | |
| pi_value = attr.get("value") | |
| break | |
| if pi_value != target_pi_value: | |
| continue | |
| ref = res.get("referenceNumber") | |
| if not ref: | |
| continue | |
| scope = "full" if res.get("isRecurring") else "this" | |
| api.delete_reservation(ref, update_scope=scope) | |
| print( | |
| f"Deleted {ref} | {res.get('resourceName')} | {res.get('startDate')} -> {res.get('endDate')} | scope={scope}" | |
| ) | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Import Calcium .Events into a resource as Service reservations." | |
| ) | |
| parser.add_argument( | |
| "input", | |
| nargs="?", | |
| help="Input Calcium .Events file (defaults based on --resource)", | |
| ) | |
| parser.add_argument( | |
| "--category", | |
| required=True, | |
| help="Calcium category to import (exact match)", | |
| ) | |
| parser.add_argument( | |
| "--resource", | |
| default=DEFAULT_RESOURCE, | |
| help="Resource name (default: P1 Prisma)", | |
| ) | |
| parser.add_argument( | |
| "--preferences", | |
| help="Optional Calcium .Preferences file to resolve time periods", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be created without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--debug", | |
| action="store_true", | |
| help="Log progress details to stderr", | |
| ) | |
| parser.add_argument( | |
| "--delete-existing", | |
| action="store_true", | |
| help="Delete existing PI reservations in the pmservice date range", | |
| ) | |
| parser.add_argument( | |
| "--list", | |
| action="store_true", | |
| help="List events without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--pi", | |
| dest="pi_value", | |
| help="Override PI attribute value (default: Service)", | |
| ) | |
| args = parser.parse_args() | |
| if not args.input: | |
| input_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource) | |
| if not input_path_value: | |
| print( | |
| f"No default Calcium .Events for resource '{args.resource}'. " | |
| "Provide an input file.", | |
| file=sys.stderr, | |
| ) | |
| return 1 | |
| args.input = input_path_value | |
| api = BookedAPI() | |
| resource = api.find_resource_by_name(args.resource) | |
| if not resource: | |
| print(f"Resource '{args.resource}' not found.", file=sys.stderr) | |
| return 1 | |
| resource_id = int(resource["resourceId"]) | |
| schedule_id = int(resource["scheduleId"]) | |
| user_id = api.get_user_id() | |
| tz = get_resource_timezone(api, args.resource, {}, args.debug) | |
| events_path = Path(args.input) | |
| prefs_path = Path(args.preferences) if args.preferences else None | |
| vevents = build_vevents( | |
| events_path, | |
| category_filter=args.category, | |
| tz_name=tz.key, | |
| preferences_path=prefs_path, | |
| debug=args.debug, | |
| use_utc=True, | |
| ) | |
| events = [{"event": item.event, "lines": item.lines} for item in vevents] | |
| expanded = expand_exdates(events, tz, args.debug) | |
| today_date = datetime.now(tz).date() | |
| adjusted = [] | |
| for item in expanded: | |
| updated = adjust_event_to_today(item["event"], tz, today_date) | |
| if not updated: | |
| continue | |
| adjusted.append({"event": updated, "lines": item["lines"]}) | |
| expanded = adjusted | |
| service_value = resolve_service_value(api, args.pi_value) | |
| if args.delete_existing and expanded and not args.dry_run and not args.list: | |
| start_dates = [local_datetime_from_ics(item["event"]["DTSTART"], tz).date() for item in expanded] | |
| end_dates = [local_datetime_from_ics(item["event"]["DTEND"], tz).date() for item in expanded] | |
| min_date = min(start_dates) | |
| max_date = max(end_dates) + timedelta(days=1) | |
| delete_service_reservations( | |
| api, | |
| start_date=min_date.strftime("%Y-%m-%d"), | |
| end_date=max_date.strftime("%Y-%m-%d"), | |
| target_pi_value=service_value, | |
| ) | |
| slot_cache: dict = {} | |
| for idx, item in enumerate(expanded, start=1): | |
| start_local = local_datetime_from_ics(item["event"]["DTSTART"], tz) | |
| max_end = get_last_reservable_end( | |
| api, schedule_id, resource_id, start_local.date(), slot_cache | |
| ) | |
| payload = build_payload( | |
| item["event"], | |
| resource_id, | |
| user_id, | |
| service_value, | |
| tz, | |
| max_end_iso=max_end, | |
| ) | |
| if args.list: | |
| print(format_dry_run_entry(idx, item["event"], payload)) | |
| print(payload["description"]) | |
| continue | |
| if args.dry_run: | |
| print(format_dry_run_entry(idx, item["event"], payload)) | |
| print(payload["description"]) | |
| continue | |
| result = api.create_reservation( | |
| resource_id=payload["resourceId"], | |
| start_datetime=payload["startDateTime"], | |
| end_datetime=payload["endDateTime"], | |
| title=payload["title"], | |
| description=payload["description"], | |
| user_id=payload["userId"], | |
| custom_attributes=payload["customAttributes"], | |
| recurrence_rule=payload.get("recurrenceRule"), | |
| ) | |
| if result.get("referenceNumber"): | |
| print(f"Created {idx}: {result['referenceNumber']}") | |
| else: | |
| print(f"Error creating {idx}: {result.get('errors')}", file=sys.stderr) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import csv | |
| import re | |
| import sys | |
| from dataclasses import dataclass | |
| from datetime import date, datetime, time, timedelta | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from zoneinfo import ZoneInfo | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI, parse_rrule # noqa: E402 | |
| from calcium_events import _build_rrule, build_vevents, parse_calcium_events | |
| DEFAULT_RESOURCE_NAME = "Staff" | |
| INITIALS_FILE = "initials.csv" | |
| LOCATION_ATTRIBUTE_ID = 6 | |
| DEFAULT_TZ = "America/New_York" | |
| DEFAULT_EVENTS_PATH = Path(__file__).resolve().parent / "data/3T_Prisma_P1.Events" | |
| DEFAULT_OWNER_EMAIL = "[email protected]" | |
| STAFF_CATEGORIES = ("Staff Hours", "Front Desk", "Tech_Aide") | |
| DEFAULT_REPEAT_TERMINATION = "2026-07-30T00:00:00+0000" | |
| TIME_RANGE_RE = re.compile( | |
| r"^(?P<start>\d{1,2}(?::\d{2})?)(?P<start_suffix>[ap])?\s*-?\s*(?P<end>\d{1,2}(?::\d{2})?)(?P<end_suffix>[ap])?$" | |
| ) | |
| SEGMENT_RE = re.compile( | |
| r"(?P<range>\d{1,2}(?::\d{2})?[ap]?\s*-?\s*\d{1,2}(?::\d{2})?[ap]?)\s*\((?P<loc>[^)]+)\)" | |
| ) | |
| RANGE_RE = re.compile( | |
| r"\d{1,2}(?::\d{2})?[ap]?\s*-?\s*\d{1,2}(?::\d{2})?[ap]?" | |
| ) | |
| DEFAULT_LOCATION_BY_CATEGORY = { | |
| "Front Desk": "FD", | |
| "Tech_Aide": "TA", | |
| } | |
| @dataclass(frozen=True) | |
| class StaffEntry: | |
| initials: str | |
| date: date | |
| start: time | |
| end: time | |
| location: str | |
| def load_initials_map(path: str) -> dict[str, str]: | |
| mapping: dict[str, str] = {} | |
| with open(path, "r", encoding="utf-8") as handle: | |
| reader = csv.reader(handle) | |
| next(reader, None) | |
| for row in reader: | |
| if len(row) < 3: | |
| continue | |
| initials = row[0].strip() | |
| email = row[2].strip() | |
| if initials: | |
| mapping[initials] = email | |
| return mapping | |
| def local_datetime_to_iso(day: date, t: time, tz_name: str = DEFAULT_TZ) -> str: | |
| tz = ZoneInfo(tz_name) | |
| dt = datetime.combine(day, t, tzinfo=tz) | |
| return dt.astimezone(ZoneInfo("UTC")).strftime("%Y-%m-%dT%H:%M:%S%z") | |
| def _parse_hhmm(token: str) -> time: | |
| if ":" in token: | |
| hour_str, minute_str = token.split(":", 1) | |
| hour = int(hour_str) | |
| minute = int(minute_str) | |
| else: | |
| hour = int(token) | |
| minute = 0 | |
| return time(hour, minute) | |
| def _apply_meridiem(base: time, suffix: str) -> time: | |
| hour = base.hour | |
| minute = base.minute | |
| if suffix == "p" and hour < 12: | |
| hour += 12 | |
| if suffix == "a" and hour == 12: | |
| hour = 0 | |
| return time(hour, minute) | |
| def parse_time_range(value: str) -> tuple[time, time]: | |
| normalized = " ".join(value.strip().lower().split()) | |
| match = TIME_RANGE_RE.match(normalized) | |
| if not match: | |
| raise ValueError(f"Invalid time range: {value}") | |
| start_token = match.group("start") | |
| end_token = match.group("end") | |
| start_suffix = match.group("start_suffix") | |
| end_suffix = match.group("end_suffix") | |
| if not end_suffix: | |
| end_suffix = "p" | |
| start_raw = _parse_hhmm(start_token) | |
| end_raw = _parse_hhmm(end_token) | |
| end_time = _apply_meridiem(end_raw, end_suffix) | |
| if start_suffix: | |
| start_time = _apply_meridiem(start_raw, start_suffix) | |
| else: | |
| assumed = _apply_meridiem(start_raw, end_suffix) | |
| if assumed >= end_time: | |
| flipped = "a" if end_suffix == "p" else "p" | |
| assumed = _apply_meridiem(start_raw, flipped) | |
| start_time = assumed | |
| return start_time, end_time | |
| def split_summary_lines(summary: str) -> list[str]: | |
| return [line.strip() for line in summary.replace("\\n", "\n").split("\n") if line.strip()] | |
| def parse_staff_line( | |
| line: str, default_location: Optional[str] = None | |
| ) -> Optional[tuple[str, list[tuple[time, time, str]]]]: | |
| stripped = line.strip() | |
| if not stripped: | |
| raise ValueError("Empty summary line") | |
| parts = stripped.split(maxsplit=1) | |
| initials = parts[0] | |
| remainder = parts[1] if len(parts) > 1 else "" | |
| if remainder.strip().lower().startswith("off"): | |
| return None | |
| segments: list[tuple[time, time, str]] = [] | |
| for match in SEGMENT_RE.finditer(remainder): | |
| time_range = match.group("range") | |
| location = match.group("loc").strip() | |
| try: | |
| start, end = parse_time_range(time_range) | |
| except ValueError: | |
| continue | |
| segments.append((start, end, location)) | |
| if not segments and default_location: | |
| for match in RANGE_RE.finditer(remainder): | |
| try: | |
| start, end = parse_time_range(match.group(0)) | |
| except ValueError: | |
| continue | |
| segments.append((start, end, default_location)) | |
| if not segments: | |
| return None | |
| return initials, segments | |
| def build_staff_entries( | |
| summary: str, day: date, default_location: Optional[str] = None | |
| ) -> list[StaffEntry]: | |
| entries: list[StaffEntry] = [] | |
| for line in split_summary_lines(summary): | |
| parsed = parse_staff_line(line, default_location=default_location) | |
| if not parsed: | |
| print(f"Warning: skipped summary line (no segments or off): {line}", file=sys.stderr) | |
| continue | |
| initials, segments = parsed | |
| for start, end, location in segments: | |
| entries.append( | |
| StaffEntry( | |
| initials=initials, | |
| date=day, | |
| start=start, | |
| end=end, | |
| location=location, | |
| ) | |
| ) | |
| return entries | |
| def create_reservations( | |
| api: BookedAPI, | |
| payloads: list[dict], | |
| initials_map: dict[str, str], | |
| owner_id: Optional[int], | |
| ) -> list[str]: | |
| references: list[str] = [] | |
| for payload in payloads: | |
| if payload.get("customAttributes"): | |
| for attr in payload["customAttributes"]: | |
| if attr.get("attributeId") == LOCATION_ATTRIBUTE_ID and attr.get("attributeValue"): | |
| resolved = api.find_attribute_value( | |
| LOCATION_ATTRIBUTE_ID, attr["attributeValue"] | |
| ) | |
| if resolved: | |
| attr["attributeValue"] = resolved | |
| resource_id = payload.get("resource_id") | |
| if resource_id is None: | |
| raise ValueError("Payload missing resource_id") | |
| participant_ids = None | |
| initials = payload.get("initials", "") | |
| if initials and initials != "?": | |
| email = initials_map.get(initials) | |
| if email: | |
| user = api.find_user_by_email(email) | |
| if user: | |
| participant_ids = [user["id"]] | |
| result = api.create_reservation( | |
| resource_id=resource_id, | |
| start_datetime=payload["start"], | |
| end_datetime=payload["end"], | |
| title="", | |
| user_id=owner_id, | |
| participants=participant_ids, | |
| custom_attributes=payload["customAttributes"], | |
| recurrence_rule=_apply_default_termination(payload.get("recurrence")), | |
| ) | |
| reference = result.get("referenceNumber") | |
| if not reference: | |
| detail = { | |
| "initials": payload.get("initials"), | |
| "start": payload.get("start"), | |
| "end": payload.get("end"), | |
| "location": payload.get("location"), | |
| "recurrence": payload.get("recurrence"), | |
| } | |
| raise RuntimeError(f"API error: {result} for {detail}") | |
| references.append(reference) | |
| return references | |
| def _apply_default_termination(recurrence: Optional[dict]) -> Optional[dict]: | |
| if not recurrence: | |
| return recurrence | |
| if recurrence.get("type") in {None, "none"}: | |
| return recurrence | |
| if recurrence.get("repeatTerminationDate"): | |
| return recurrence | |
| updated = dict(recurrence) | |
| updated["repeatTerminationDate"] = DEFAULT_REPEAT_TERMINATION | |
| return updated | |
| def parse_start_datetime(dtstart: str) -> datetime: | |
| trimmed = dtstart.rstrip("Z") | |
| if len(trimmed) == 8: | |
| return datetime.strptime(trimmed, "%Y%m%d") | |
| return datetime.strptime(trimmed, "%Y%m%dT%H%M%S") | |
| def parse_termination_date(value: str | None, tz: Optional[ZoneInfo] = None) -> date | None: | |
| if not value: | |
| return None | |
| if value.endswith("Z"): | |
| value = value[:-1] + "+0000" | |
| if value[-3] == ":": | |
| value = value[:-3] + value[-2:] | |
| parsed = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S%z") | |
| if tz: | |
| return parsed.astimezone(tz).date() | |
| return parsed.date() | |
| def recurrence_signature(rule: dict) -> tuple: | |
| if not rule: | |
| return (None,) | |
| return ( | |
| rule.get("type"), | |
| int(rule.get("interval", 1)), | |
| tuple(rule.get("weekdays") or []), | |
| parse_termination_date(rule.get("repeatTerminationDate")), | |
| ) | |
| def recurrence_occurs_on(entry_date: date, start_date: date, rule: dict) -> bool: | |
| rtype = rule.get("type") | |
| if not rtype or rtype == "none": | |
| return False | |
| if entry_date < start_date: | |
| return False | |
| termination = parse_termination_date(rule.get("repeatTerminationDate")) | |
| if termination and entry_date > termination: | |
| return False | |
| interval = int(rule.get("interval", 1)) | |
| if rtype == "daily": | |
| diff_days = (entry_date - start_date).days | |
| return diff_days % interval == 0 | |
| if rtype == "weekly": | |
| weekdays = rule.get("weekdays") or [] | |
| if not weekdays: | |
| return False | |
| booked_weekday = (entry_date.weekday() + 1) % 7 | |
| if booked_weekday not in weekdays: | |
| return False | |
| start_weekday = (start_date.weekday() + 1) % 7 | |
| delta_days = (booked_weekday - start_weekday) % 7 | |
| first_occurrence = start_date + timedelta(days=delta_days) | |
| if entry_date < first_occurrence: | |
| return False | |
| diff_days = (entry_date - first_occurrence).days | |
| if diff_days % 7 != 0: | |
| return False | |
| weeks = diff_days // 7 | |
| return weeks % interval == 0 | |
| return False | |
| def recurrence_end_date(rule: dict) -> date: | |
| termination = parse_termination_date(rule.get("repeatTerminationDate")) | |
| if termination: | |
| return termination | |
| return parse_termination_date(DEFAULT_REPEAT_TERMINATION) | |
| def format_termination_date(day: date) -> str: | |
| return day.strftime("%Y-%m-%dT00:00:00+0000") | |
| def next_occurrence_after( | |
| start_date: date, rule: dict, after_date: date, tz: ZoneInfo | |
| ) -> Optional[date]: | |
| start_dt = datetime.combine(start_date, time.min, tzinfo=tz) | |
| now_dt = datetime.combine(after_date + timedelta(days=1), time.min, tzinfo=tz) | |
| occurrence = next_occurrence(start_dt, rule, now_dt) | |
| return occurrence.date() if occurrence else None | |
| def add_months(date_value: date, months: int) -> date: | |
| year = date_value.year + (date_value.month - 1 + months) // 12 | |
| month = (date_value.month - 1 + months) % 12 + 1 | |
| day = date_value.day | |
| while True: | |
| try: | |
| return date(year, month, day) | |
| except ValueError: | |
| day -= 1 | |
| def next_occurrence(start_dt: datetime, rule: dict, now_dt: datetime) -> Optional[datetime]: | |
| interval = int(rule.get("interval", 1)) | |
| occurrence = start_dt | |
| if rule.get("type") == "daily": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| steps = delta_days // interval | |
| occurrence = occurrence + timedelta(days=steps * interval) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval) | |
| elif rule.get("type") == "weekly": | |
| if now_dt > occurrence: | |
| delta_days = (now_dt.date() - occurrence.date()).days | |
| weeks = delta_days // 7 | |
| steps = (weeks // interval) * interval | |
| occurrence = occurrence + timedelta(days=steps * 7) | |
| if occurrence < now_dt: | |
| occurrence += timedelta(days=interval * 7) | |
| elif rule.get("type") == "monthly": | |
| if now_dt > occurrence: | |
| months = (now_dt.year - occurrence.year) * 12 + (now_dt.month - occurrence.month) | |
| steps = (months // interval) * interval | |
| occurrence_date = add_months(occurrence.date(), steps) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| if occurrence < now_dt: | |
| occurrence_date = add_months(occurrence.date(), interval) | |
| occurrence = datetime.combine(occurrence_date, occurrence.time(), occurrence.tzinfo) | |
| elif rule.get("type") == "yearly": | |
| if now_dt > occurrence: | |
| years = now_dt.year - occurrence.year | |
| steps = (years // interval) * interval | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + steps) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + steps, day=28) | |
| if occurrence < now_dt: | |
| try: | |
| occurrence = occurrence.replace(year=occurrence.year + interval) | |
| except ValueError: | |
| occurrence = occurrence.replace(year=occurrence.year + interval, day=28) | |
| else: | |
| return None | |
| return occurrence | |
| def adjust_entries_to_today(entries: List[dict], tz: ZoneInfo) -> List[dict]: | |
| today_date = datetime.now(tz).date() | |
| today_dt = datetime.combine(today_date, time.min, tzinfo=tz) | |
| adjusted: List[dict] = [] | |
| for entry in entries: | |
| recurrence_rule = entry.get("recurrence_rule") | |
| if not recurrence_rule: | |
| if entry["date"] < today_date: | |
| continue | |
| adjusted.append(entry) | |
| continue | |
| start_dt = datetime.combine(entry["date"], entry["start_time"], tzinfo=tz) | |
| until_date = parse_termination_date( | |
| recurrence_rule.get("repeatTerminationDate"), tz | |
| ) | |
| if until_date and today_date > until_date: | |
| continue | |
| if entry["date"] < today_date: | |
| occurrence = next_occurrence(start_dt, recurrence_rule, today_dt) | |
| if not occurrence: | |
| continue | |
| if until_date and occurrence.date() > until_date: | |
| continue | |
| entry = dict(entry) | |
| entry["date"] = occurrence.date() | |
| entry["recurrence_start"] = occurrence.date() | |
| adjusted.append(entry) | |
| return adjusted | |
| def entry_key(entry: dict) -> tuple: | |
| return ( | |
| entry.get("initials"), | |
| entry.get("location"), | |
| entry.get("start_time"), | |
| entry.get("end_time"), | |
| ) | |
| def split_recurrence_on_overrides( | |
| entry: dict, | |
| override_dates: List[date], | |
| tz: ZoneInfo, | |
| ) -> List[dict]: | |
| if not override_dates: | |
| return [entry] | |
| rule = entry.get("recurrence_rule") | |
| if not rule: | |
| return [entry] | |
| start_date = entry.get("recurrence_start") or entry.get("date") | |
| if not start_date: | |
| return [entry] | |
| end_date = recurrence_end_date(rule) | |
| valid_overrides = [ | |
| day | |
| for day in override_dates | |
| if day >= start_date and day <= end_date and recurrence_occurs_on(day, start_date, rule) | |
| ] | |
| if not valid_overrides: | |
| return [entry] | |
| segments: List[dict] = [] | |
| segment_start = start_date | |
| for override in sorted(valid_overrides): | |
| segment_end = override - timedelta(days=1) | |
| if segment_end >= segment_start: | |
| segment = dict(entry) | |
| segment["date"] = segment_start | |
| segment["recurrence_start"] = segment_start | |
| segment_rule = dict(rule) | |
| segment_rule["repeatTerminationDate"] = format_termination_date(segment_end) | |
| segment["recurrence_rule"] = segment_rule | |
| segments.append(segment) | |
| next_start = next_occurrence_after(segment_start, rule, override, tz) | |
| if not next_start: | |
| segment_start = None | |
| break | |
| segment_start = next_start | |
| if segment_start and segment_start <= end_date: | |
| segment = dict(entry) | |
| segment["date"] = segment_start | |
| segment["recurrence_start"] = segment_start | |
| segment_rule = dict(rule) | |
| segment_rule["repeatTerminationDate"] = format_termination_date(end_date) | |
| segment["recurrence_rule"] = segment_rule | |
| segments.append(segment) | |
| return segments | |
| def dedupe_entries(entries: List[dict], tz: ZoneInfo) -> tuple[List[dict], int, List[dict]]: | |
| recurring_entries = [e for e in entries if e.get("recurrence_rule")] | |
| single_entries = [e for e in entries if not e.get("recurrence_rule")] | |
| override_dates_by_category: dict[str, set[date]] = {} | |
| for entry in single_entries: | |
| entry_date = entry.get("date") | |
| if not entry_date: | |
| continue | |
| category = entry.get("category") or "" | |
| override_dates_by_category.setdefault(category, set()).add(entry_date) | |
| split_recurring: List[dict] = [] | |
| for entry in recurring_entries: | |
| category = entry.get("category") or "" | |
| override_dates = sorted(override_dates_by_category.get(category, set())) | |
| split_recurring.extend(split_recurrence_on_overrides(entry, override_dates, tz)) | |
| recurring_seen = set() | |
| deduped_recurring = [] | |
| removed = 0 | |
| removed_entries: List[dict] = [] | |
| for entry in split_recurring: | |
| signature = (entry_key(entry), recurrence_signature(entry["recurrence_rule"]), entry["recurrence_start"]) | |
| if signature in recurring_seen: | |
| removed += 1 | |
| removed_entries.append(entry) | |
| continue | |
| recurring_seen.add(signature) | |
| deduped_recurring.append(entry) | |
| return deduped_recurring + single_entries, removed, removed_entries | |
| def build_entries_for_event(event: dict) -> List[dict]: | |
| summary = event.get("SUMMARY", "") | |
| dtstart = event.get("DTSTART", "") | |
| if not summary.strip() or not dtstart: | |
| raise ValueError("SUMMARY and DTSTART are required") | |
| start_dt = parse_start_datetime(dtstart) | |
| day = start_dt.date() | |
| category = event.get("CATEGORIES", "") | |
| default_location = DEFAULT_LOCATION_BY_CATEGORY.get(category) | |
| staff_entries = build_staff_entries(summary, day, default_location=default_location) | |
| recurrence_rule = None | |
| recurrence_start = None | |
| rrule = event.get("RRULE", "") | |
| if rrule: | |
| recurrence_rule = parse_rrule(rrule, start_dt) | |
| recurrence_start = start_dt.date() | |
| entries = [] | |
| for entry in staff_entries: | |
| entries.append( | |
| { | |
| "initials": entry.initials, | |
| "date": entry.date, | |
| "start_time": entry.start, | |
| "end_time": entry.end, | |
| "location": entry.location, | |
| "recurrence_rule": recurrence_rule, | |
| "recurrence_start": recurrence_start, | |
| "category": category, | |
| "summary": summary, | |
| } | |
| ) | |
| return entries | |
| def build_entries_for_calcium_event(event: "CalciumEvent") -> List[dict]: | |
| summary = event.text or "" | |
| start_date = event.repeat_start or event.date | |
| if not summary.strip() or not start_date: | |
| raise ValueError("SUMMARY and date are required") | |
| category = event.categories[0] if event.categories else "" | |
| default_location = DEFAULT_LOCATION_BY_CATEGORY.get(category) | |
| staff_entries = build_staff_entries(summary, start_date, default_location=default_location) | |
| recurrence_rule = None | |
| recurrence_start = None | |
| rrule = _build_rrule(event) | |
| if rrule: | |
| recurrence_rule = parse_rrule(rrule, datetime.combine(start_date, time.min)) | |
| recurrence_start = start_date | |
| entries = [] | |
| for entry in staff_entries: | |
| entries.append( | |
| { | |
| "initials": entry.initials, | |
| "date": entry.date, | |
| "start_time": entry.start, | |
| "end_time": entry.end, | |
| "location": entry.location, | |
| "recurrence_rule": recurrence_rule, | |
| "recurrence_start": recurrence_start, | |
| "category": category, | |
| "summary": summary, | |
| } | |
| ) | |
| return entries | |
| def should_process_vevent(event: dict, today: date) -> bool: | |
| dtstart = event.get("DTSTART", "") | |
| if not dtstart: | |
| return False | |
| start_dt = parse_start_datetime(dtstart) | |
| if start_dt.date() >= today: | |
| return True | |
| rrule = event.get("RRULE", "") | |
| if not rrule: | |
| return False | |
| recurrence = parse_rrule(rrule, start_dt) | |
| termination = parse_termination_date(recurrence.get("repeatTerminationDate")) | |
| return termination is None or termination >= today | |
| def should_process_calcium_event(event: "CalciumEvent", today: date) -> bool: | |
| if event.is_repeating: | |
| start = event.repeat_start | |
| end = event.repeat_end | |
| if start and start > today: | |
| return True | |
| if end and end < today: | |
| return False | |
| return True | |
| if event.date and event.date >= today: | |
| return True | |
| return False | |
| def build_payload_from_entry(entry: dict) -> dict: | |
| return { | |
| "start": local_datetime_to_iso(entry["date"], entry["start_time"]), | |
| "end": local_datetime_to_iso(entry["date"], entry["end_time"]), | |
| "location": entry["location"], | |
| "customAttributes": [ | |
| {"attributeId": LOCATION_ATTRIBUTE_ID, "attributeValue": entry["location"]} | |
| ], | |
| "initials": entry["initials"], | |
| "recurrence": entry.get("recurrence_rule"), | |
| } | |
| def summarize_plan(entries: List[dict], removed: int, initials_map: dict) -> None: | |
| print("Plan summary") | |
| print(f"Total entries: {len(entries) + removed}") | |
| print(f"Removed as duplicates: {removed}") | |
| print(f"To create: {len(entries)}") | |
| for entry in entries: | |
| email = initials_map.get(entry.get("initials", "")) | |
| recurrence = entry.get("recurrence_rule") | |
| label = recurrence.get("type") if recurrence else "single" | |
| start = datetime.combine(entry["date"], entry["start_time"]).strftime("%Y-%m-%d %H:%M") | |
| end = datetime.combine(entry["date"], entry["end_time"]).strftime("%Y-%m-%d %H:%M") | |
| line = f"- {entry.get('initials')} {start} -> {end} ({entry.get('location')})" | |
| if email: | |
| line = f"{line} {email}" | |
| line = f"{line} [{label}]" | |
| print(line) | |
| def summarize_removed(removed_entries: List[dict], initials_map: dict) -> None: | |
| if not removed_entries: | |
| return | |
| print("\nRemoved duplicates") | |
| for entry in removed_entries: | |
| email = initials_map.get(entry.get("initials", "")) | |
| recurrence = entry.get("recurrence_rule") | |
| label = recurrence.get("type") if recurrence else "single" | |
| start = datetime.combine(entry["date"], entry["start_time"]).strftime("%Y-%m-%d %H:%M") | |
| end = datetime.combine(entry["date"], entry["end_time"]).strftime("%Y-%m-%d %H:%M") | |
| line = f"- {entry.get('initials')} {start} -> {end} ({entry.get('location')})" | |
| if email: | |
| line = f"{line} {email}" | |
| line = f"{line} [{label}]" | |
| print(line) | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description=( | |
| "Create Staff reservations from Calcium .Events file with dedupe " | |
| f"(source: {DEFAULT_EVENTS_PATH})" | |
| ), | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| parser.add_argument( | |
| "--yes", | |
| action="store_true", | |
| help="Create all reservations without prompting", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show plan without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--list", | |
| action="store_true", | |
| help="List entries without creating reservations", | |
| ) | |
| parser.add_argument( | |
| "--only-date", | |
| help="Create entries only for a specific date (YYYY-MM-DD)", | |
| ) | |
| args = parser.parse_args() | |
| events_path = DEFAULT_EVENTS_PATH | |
| tz = ZoneInfo(DEFAULT_TZ) | |
| today = datetime.now(tz).date() | |
| vevents = [] | |
| raw_events = [] | |
| for category in STAFF_CATEGORIES: | |
| vevents.extend( | |
| build_vevents( | |
| events_path, | |
| category_filter=category, | |
| tz_name=DEFAULT_TZ, | |
| preferences_path=None, | |
| use_utc=False, | |
| ) | |
| ) | |
| raw_events.extend( | |
| parse_calcium_events( | |
| events_path, | |
| category_filter=category, | |
| exclude_categories=None, | |
| preferences_path=None, | |
| debug=False, | |
| ) | |
| ) | |
| if not vevents and not raw_events: | |
| print("No matching events found.") | |
| return 1 | |
| entries: List[dict] = [] | |
| for item in vevents: | |
| if not should_process_vevent(item.event, today): | |
| continue | |
| entries.extend(build_entries_for_event(item.event)) | |
| for event in raw_events: | |
| if not should_process_calcium_event(event, today): | |
| continue | |
| if event.start_time is not None: | |
| continue | |
| entries.extend(build_entries_for_calcium_event(event)) | |
| if args.only_date: | |
| try: | |
| target_date = datetime.strptime(args.only_date, "%Y-%m-%d").date() | |
| except ValueError: | |
| print("Invalid --only-date value (expected YYYY-MM-DD).", file=sys.stderr) | |
| return 1 | |
| singles_by_category: dict[str, List[dict]] = {} | |
| recurring_by_category: dict[str, List[dict]] = {} | |
| for entry in entries: | |
| category = entry.get("category") or "" | |
| if entry.get("recurrence_rule"): | |
| if recurrence_occurs_on( | |
| target_date, entry["recurrence_start"], entry["recurrence_rule"] | |
| ): | |
| recurring_by_category.setdefault(category, []).append(entry) | |
| else: | |
| if entry["date"] == target_date: | |
| singles_by_category.setdefault(category, []).append(entry) | |
| entries = [] | |
| categories = set(singles_by_category) | set(recurring_by_category) | |
| for category in sorted(categories): | |
| singles = singles_by_category.get(category, []) | |
| if singles: | |
| entries.extend(singles) | |
| continue | |
| for entry in recurring_by_category.get(category, []): | |
| single = dict(entry) | |
| single["date"] = target_date | |
| single["recurrence_rule"] = None | |
| single["recurrence_start"] = None | |
| entries.append(single) | |
| else: | |
| entries = adjust_entries_to_today(entries, tz) | |
| if not entries: | |
| print("No future staff entries found.") | |
| return 0 | |
| deduped, removed, removed_entries = dedupe_entries(entries, tz) | |
| initials_map = load_initials_map(INITIALS_FILE) | |
| summarize_plan(deduped, removed, initials_map) | |
| summarize_removed(removed_entries, initials_map) | |
| if args.dry_run or args.list: | |
| return 0 | |
| if not args.yes: | |
| response = input("Create these entries? [Y]es / [n]o: ").strip().lower() | |
| if response in {"n", "no"}: | |
| print("Aborted.") | |
| return 0 | |
| api = BookedAPI() | |
| owner_id = None | |
| if DEFAULT_OWNER_EMAIL: | |
| owner = api.find_user_by_email(DEFAULT_OWNER_EMAIL) | |
| if owner: | |
| owner_id = int(owner["id"]) | |
| else: | |
| print(f"Owner '{DEFAULT_OWNER_EMAIL}' not found.", file=sys.stderr) | |
| return 1 | |
| resource = api.find_resource_by_name(DEFAULT_RESOURCE_NAME) | |
| if not resource: | |
| print(f"Resource '{DEFAULT_RESOURCE_NAME}' not found.", file=sys.stderr) | |
| return 1 | |
| payloads = [] | |
| for entry in deduped: | |
| payload = build_payload_from_entry(entry) | |
| payload["resource_id"] = resource["resourceId"] | |
| payloads.append(payload) | |
| refs = create_reservations(api, payloads, initials_map, owner_id) | |
| print(f"Created {len(refs)} reservations.") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import sys | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI # noqa: E402 | |
| PI_ATTRIBUTE_ID = 1 | |
| DEFAULT_RESOURCE = "P1 Prisma" | |
| def parse_date(value: str) -> str: | |
| try: | |
| datetime.strptime(value, "%Y-%m-%d") | |
| except ValueError as exc: | |
| raise argparse.ArgumentTypeError(f"Invalid date: {value}") from exc | |
| return value | |
| def has_pi(reservation: dict, pi_value: str) -> bool: | |
| for attr in reservation.get("customAttributes") or []: | |
| if attr.get("id") == PI_ATTRIBUTE_ID and attr.get("value") == pi_value: | |
| return True | |
| return False | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Delete reservations for a specific PI value on a resource." | |
| ) | |
| parser.add_argument( | |
| "--pi", | |
| required=True, | |
| help="Principal Investigator attribute value to delete", | |
| ) | |
| parser.add_argument( | |
| "--resource", | |
| default=DEFAULT_RESOURCE, | |
| help="Resource name (default: P1 Prisma)", | |
| ) | |
| parser.add_argument( | |
| "--start-date", | |
| type=parse_date, | |
| default="2000-01-01", | |
| help="Start date (YYYY-MM-DD, default: 2000-01-01)", | |
| ) | |
| parser.add_argument( | |
| "--end-date", | |
| type=parse_date, | |
| default="2100-01-01", | |
| help="End date (YYYY-MM-DD, default: 2100-01-01)", | |
| ) | |
| parser.add_argument( | |
| "--yes", | |
| action="store_true", | |
| help="Delete without confirmation", | |
| ) | |
| args = parser.parse_args() | |
| api = BookedAPI() | |
| resource = api.find_resource_by_name(args.resource) | |
| if not resource: | |
| print(f"Resource '{args.resource}' not found.") | |
| return 1 | |
| reservations = api.list_reservations( | |
| resource_id=resource["resourceId"], | |
| start_date=args.start_date, | |
| end_date=args.end_date, | |
| ) | |
| matches = [r for r in reservations if has_pi(r, args.pi)] | |
| print(f"Found {len(matches)} reservations with PI '{args.pi}' on {resource['name']}.") | |
| if not matches: | |
| return 0 | |
| if not args.yes: | |
| response = input("Delete these reservations? [Y]es / [n]o: ").strip().lower() | |
| if response in {"n", "no"}: | |
| print("Aborted.") | |
| return 0 | |
| for res in matches: | |
| ref = res.get("referenceNumber") | |
| if not ref: | |
| continue | |
| result = api.delete_reservation(ref) | |
| if result and isinstance(result, dict) and result.get("errors"): | |
| print(f"{ref}: delete failed ({result['errors']})") | |
| else: | |
| print(f"{ref}: deleted") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import sys | |
| from pathlib import Path | |
| from typing import List | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI # noqa: E402 | |
| ALLOWED_RESOURCES = ("Staff", "Coverage", "P1 Prisma", "P2 Prisma Fit", "4T Varian") | |
| DEFAULT_START_DATE = "2000-01-01" | |
| DEFAULT_END_DATE = "2100-01-01" | |
| def normalize_choice(response: str) -> str | None: | |
| if response in {"", "n", "no"}: | |
| return "no" | |
| if response in {"y", "yes"}: | |
| return "yes" | |
| return None | |
| def unique_refs(reservations: List[dict]) -> List[str]: | |
| refs = [] | |
| seen = set() | |
| for res in reservations: | |
| ref = res.get("referenceNumber") | |
| if not ref or ref in seen: | |
| continue | |
| seen.add(ref) | |
| refs.append(ref) | |
| return refs | |
| def find_resource_by_exact_name(api: BookedAPI, name: str) -> dict: | |
| resources = api.list_resources() | |
| for res in resources: | |
| if res.get("name", "").lower() == name.lower(): | |
| return res | |
| raise ValueError(f"Resource '{name}' not found") | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Delete all reservations for a specific Scandium resource." | |
| ) | |
| parser.add_argument( | |
| "--resource", | |
| required=True, | |
| choices=ALLOWED_RESOURCES, | |
| help="Resource name to purge", | |
| ) | |
| parser.add_argument( | |
| "--yes", | |
| action="store_true", | |
| help="Skip confirmation prompt", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="List reservations without deleting", | |
| ) | |
| args = parser.parse_args() | |
| api = BookedAPI() | |
| resource = find_resource_by_exact_name(api, args.resource) | |
| reservations = api.list_reservations( | |
| DEFAULT_START_DATE, | |
| DEFAULT_END_DATE, | |
| resource_id=resource["resourceId"], | |
| ) | |
| refs = unique_refs(reservations) | |
| print(f"Resource: {resource['name']} (ID {resource['resourceId']})") | |
| print(f"Found {len(reservations)} reservations, {len(refs)} unique references") | |
| if args.dry_run: | |
| for res in reservations: | |
| ref = res.get("referenceNumber") | |
| if not ref: | |
| continue | |
| title = res.get("title") | |
| start = res.get("startDate") | |
| end = res.get("endDate") | |
| pi_value = None | |
| for attr in res.get("customAttributes", []) or []: | |
| attr_id = attr.get("attributeId") if "attributeId" in attr else attr.get("id") | |
| if attr_id == 1: | |
| pi_value = attr.get("attributeValue") or attr.get("value") | |
| break | |
| print(f"{ref} | title={title!r} | PI={pi_value} | {start} -> {end}") | |
| return 0 | |
| if not args.yes: | |
| response = input("Delete all reservations? [y/N]: ").strip().lower() | |
| choice = normalize_choice(response) | |
| if choice != "yes": | |
| print("Cancelled.") | |
| return 0 | |
| for idx, ref in enumerate(refs, start=1): | |
| api.delete_reservation(ref, update_scope="full") | |
| print(f"Deleted {idx}/{len(refs)}: {ref}") | |
| print("Done") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| from dataclasses import dataclass | |
| from datetime import date, datetime, time | |
| from pathlib import Path | |
| from typing import Iterable | |
| from dateutil.rrule import rrulestr | |
| from icalendar import Event as ICalEvent | |
| @dataclass | |
| class Event: | |
| raw_lines: list[str] | |
| ical: ICalEvent | None | |
| def parse_event(lines: list[str]) -> Event: | |
| block = "\n".join(lines) + "\n" | |
| try: | |
| ical = ICalEvent.from_ical(block) | |
| except Exception: | |
| ical = None | |
| return Event(lines, ical) | |
| def iter_ical_datetimes(prop) -> Iterable[date | datetime]: | |
| if not prop: | |
| return [] | |
| props = prop if isinstance(prop, list) else [prop] | |
| values: list[date | datetime] = [] | |
| for item in props: | |
| if hasattr(item, "dts"): | |
| values.extend([d.dt for d in item.dts]) | |
| elif hasattr(item, "dt"): | |
| values.append(item.dt) | |
| return values | |
| def split_exdates(prop) -> tuple[set[date], set[datetime]]: | |
| exdates_date: set[date] = set() | |
| exdates_dt: set[datetime] = set() | |
| for value in iter_ical_datetimes(prop): | |
| if isinstance(value, datetime): | |
| exdates_dt.add(value) | |
| exdates_date.add(value.date()) | |
| elif isinstance(value, date): | |
| exdates_date.add(value) | |
| return exdates_date, exdates_dt | |
| def split_rdates(prop) -> list[date | datetime]: | |
| return list(iter_ical_datetimes(prop)) | |
| def as_date(value: date | datetime) -> date: | |
| return value.date() if isinstance(value, datetime) else value | |
| def event_has_remaining_occurrence(event: Event, today: date) -> bool: | |
| if not event.ical: | |
| return True | |
| dtstart_prop = event.ical.get("DTSTART") | |
| if not dtstart_prop or not hasattr(dtstart_prop, "dt"): | |
| return True | |
| dtstart = dtstart_prop.dt | |
| exdates_date, exdates_dt = split_exdates(event.ical.get("EXDATE")) | |
| rdates = split_rdates(event.ical.get("RDATE")) | |
| rrule_prop = event.ical.get("RRULE") | |
| def is_excluded(occ: date | datetime) -> bool: | |
| if isinstance(occ, datetime): | |
| if occ in exdates_dt: | |
| return True | |
| return occ.date() in exdates_date | |
| return occ in exdates_date | |
| def is_future(occ: date | datetime) -> bool: | |
| return as_date(occ) >= today | |
| for occ in rdates: | |
| if is_future(occ) and not is_excluded(occ): | |
| return True | |
| if not rrule_prop: | |
| dtstart_date = as_date(dtstart) | |
| if dtstart_date >= today: | |
| return not is_excluded(dtstart) | |
| return False | |
| if not hasattr(rrule_prop, "to_ical"): | |
| return True | |
| try: | |
| rrule_str = rrule_prop.to_ical().decode("utf-8") | |
| dtstart_dt = dtstart if isinstance(dtstart, datetime) else datetime.combine(dtstart, time.min) | |
| rule = rrulestr(rrule_str, dtstart=dtstart_dt) | |
| except Exception: | |
| return True | |
| search_start = datetime.combine(today, time.min) | |
| occ = rule.after(search_start, inc=True) | |
| while occ is not None: | |
| if is_future(occ) and not is_excluded(occ): | |
| return True | |
| occ = rule.after(occ, inc=False) | |
| return False | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Filter VEVENT blocks with no remaining future occurrences." | |
| ) | |
| parser.add_argument("input", help="Input VCALENDAR (.ics) file") | |
| parser.add_argument("output", help="Output VCALENDAR (.ics) file") | |
| args = parser.parse_args() | |
| src = Path(args.input) | |
| dst = Path(args.output) | |
| today = date.today() | |
| lines = src.read_text().splitlines() | |
| out_lines: list[str] = [] | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i] | |
| if line == "BEGIN:VEVENT": | |
| block = [line] | |
| i += 1 | |
| while i < len(lines): | |
| block.append(lines[i]) | |
| if lines[i] == "END:VEVENT": | |
| break | |
| i += 1 | |
| event = parse_event(block) | |
| if event_has_remaining_occurrence(event, today): | |
| out_lines.extend(block) | |
| else: | |
| out_lines.append(line) | |
| i += 1 | |
| dst.write_text("\n".join(out_lines) + "\n") | |
| if __name__ == "__main__": | |
| main() | |
| #!/usr/bin/env python3 | |
| import argparse | |
| import html | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import textwrap | |
| import readline | |
| from bisect import bisect_left, bisect_right | |
| try: | |
| from prompt_toolkit import prompt as pt_prompt | |
| HAS_PROMPT_TOOLKIT = True | |
| except Exception: | |
| HAS_PROMPT_TOOLKIT = False | |
| import subprocess | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dataclasses import dataclass | |
| from datetime import date, datetime, time, timedelta | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional, Tuple | |
| from zoneinfo import ZoneInfo | |
| import shutil | |
| from threading import Lock | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| from booked_api import BookedAPI, ics_datetime_to_iso, parse_rrule # noqa: E402 | |
| from calcium_events import build_vevent, build_vevents, parse_calcium_events | |
| from tqdm import tqdm | |
| from create_scandium_clinical import extract_scanned_initials, has_contrast_indicator | |
| DEFAULT_RESOURCE = "P1 Prisma" | |
| DEFAULT_CALCIUM_EVENTS_BY_RESOURCE = { | |
| "P1 Prisma": "data/3T_Prisma_P1.Events", | |
| "P2 Prisma Fit": "data/3T_Prisma_P2_Fit.Events", | |
| "4T Varian": "data/4T_Varian_Inova.Events", | |
| } | |
| DEFAULT_TZ = "America/New_York" | |
| PI_ATTRIBUTE_ID = 1 | |
| SCANNED_INITIALS_ATTRIBUTE_ID = 2 | |
| BOLD = "\033[1m" | |
| RESET = "\033[0m" | |
| ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") | |
| GREEN = "\033[92m" | |
| ORANGE = "\033[93m" | |
| BLUE = "\033[94m" | |
| CACHE_LOCK = Lock() | |
| EMAIL_RE = re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.IGNORECASE) | |
| INITIALS_RE = re.compile(r"(?:Confirmed[- ]|[^A-Z])([A-Z]{2,3})(?:[^A-Z]|$)") | |
| EMAIL_DOMAIN_SWAPS = { | |
| "mgb.org": "mclean.harvard.edu", | |
| "mclean.harvard.edu": "mgb.org", | |
| } | |
| @dataclass(frozen=True) | |
| class EventView: | |
| start: datetime | |
| end: datetime | |
| title: str | |
| description: Optional[str] | |
| details: List[Tuple[str, str]] | |
| recurrence: str | |
| reference: Optional[str] | |
| compare_fields: Dict[str, str] | |
| ai_payload: Dict[str, object] | |
| calcium_event: Optional[Dict[str, str]] | |
| scandium_detail: Optional[Dict[str, object]] | |
| @dataclass(frozen=True) | |
| class ParsedCalciumVEvent: | |
| event: Dict[str, str] | |
| start_dt: datetime | |
| end_dt: datetime | |
| duration: timedelta | |
| rrule: Optional[str] | |
| rule: Optional[Dict] | |
| until_date: Optional[date] | |
| exdates: set[date] | |
| def parse_date(value: str) -> date: | |
| return datetime.strptime(value, "%Y-%m-%d").date() | |
| def parse_time_value(value: str) -> time: | |
| return datetime.strptime(value, "%H:%M").time() | |
| def normalize_summary(summary: str) -> str: | |
| normalized = summary.replace("\\n", "\n") | |
| normalized = normalized.replace("\\,", ",").replace("\\;", ";").replace("\\\\", "\\") | |
| normalized = html.unescape(normalized) | |
| return normalized.strip() | |
| def extract_emails(text: str) -> List[str]: | |
| return EMAIL_RE.findall(text or "") | |
| def contains_spi_blocker(text: str) -> bool: | |
| if not text: | |
| return False | |
| lowered = text.lower() | |
| if "acr qa" in lowered: | |
| return True | |
| if "hd cleanup" in lowered or "hd clean up" in lowered: | |
| return True | |
| return False | |
| def swap_email_domain(email: str) -> Optional[str]: | |
| if "@" not in email: | |
| return None | |
| local, domain = email.rsplit("@", 1) | |
| swapped = EMAIL_DOMAIN_SWAPS.get(domain.lower()) | |
| if not swapped: | |
| return None | |
| return f"{local}@{swapped}" | |
| def extract_initials(text: str) -> List[str]: | |
| if not text: | |
| return [] | |
| if contains_spi_blocker(text): | |
| return [] | |
| cleaned = re.sub(r"fbirn\s*&\s*hd\s*cleanup", "", text, flags=re.IGNORECASE) | |
| return [match.group(1) for match in INITIALS_RE.finditer(cleaned)] | |
| def extract_patient_source(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| cleaned = text.strip() | |
| for prefix in ("OP-", "IP-"): | |
| if cleaned.startswith(prefix): | |
| return prefix[:-1] | |
| for prefix in ("OP ", "IP "): | |
| if cleaned.startswith(prefix): | |
| return prefix.strip() | |
| if "-" in cleaned: | |
| lead = cleaned.split("-", 1)[0].strip() | |
| if lead and lead.isalnum(): | |
| return lead | |
| return None | |
| def parse_clinical_prefix(summary: str) -> tuple[Optional[str], Optional[str], Optional[str], str]: | |
| lines = summary.splitlines() if summary else [] | |
| if not lines: | |
| return None, None, None, "" | |
| first = lines[0] | |
| match = re.match( | |
| r"^\s*(?P<pat>[A-Za-z0-9]+)\s*-\s*(?P<body>.+?)\s*-\s*(?P<spi>[A-Za-z]{2,3})(?P<rest>.*)$", | |
| first, | |
| ) | |
| if not match: | |
| return None, None, None, summary | |
| pat = match.group("pat").strip() | |
| body = match.group("body").strip() | |
| spi = match.group("spi").strip() | |
| rest = match.group("rest").strip().lstrip("-").strip() | |
| new_lines: list[str] = [] | |
| if rest: | |
| new_lines.append(rest) | |
| new_lines.extend(lines[1:]) | |
| return pat, body, spi, "\n".join(new_lines).strip() | |
| def extract_body_part(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| cleaned = text.strip() | |
| for prefix in ("OP-", "IP-"): | |
| if cleaned.startswith(prefix): | |
| parts = cleaned.split("-", 2) | |
| if len(parts) >= 2: | |
| return parts[1].strip() | |
| if "-" in cleaned: | |
| parts = cleaned.split("-", 2) | |
| if len(parts) >= 2: | |
| return parts[1].strip() | |
| return None | |
| def normalize_compare_value(value: str) -> str: | |
| cleaned = (value or "").lower() | |
| cleaned = re.sub(r"\band\b", "", cleaned) | |
| return re.sub(r"[^a-z0-9]+", "", cleaned) | |
| def canonical_lines( | |
| details: Optional[List[Tuple[str, str]]], | |
| swap_body_label: bool = False, | |
| ) -> List[str]: | |
| if not details: | |
| return ["missing"] | |
| lines: List[str] = [] | |
| for label, value in details: | |
| clean_label = label | |
| if swap_body_label: | |
| if clean_label == "BOD": | |
| clean_label = "Body part" | |
| elif clean_label == "Body part": | |
| clean_label = "BOD" | |
| parts = (value or "").splitlines() or [""] | |
| for idx, part in enumerate(parts): | |
| if idx == 0: | |
| lines.append(f"{clean_label}: {part}".strip()) | |
| else: | |
| lines.append(part.strip()) | |
| return lines | |
| def canonical_hash( | |
| cal: Optional[EventView], | |
| sca: Optional[EventView], | |
| swap_body_label: bool = False, | |
| ) -> str: | |
| left = "\n".join(canonical_lines(cal.details if cal else None, swap_body_label)) | |
| right = "\n".join(canonical_lines(sca.details if sca else None, swap_body_label)) | |
| payload = f"LEFT\n{left}\nRIGHT\n{right}" | |
| import hashlib | |
| return hashlib.sha256(payload.encode("utf-8")).hexdigest() | |
| def resolve_accepted_hash( | |
| cal: Optional[EventView], | |
| sca: Optional[EventView], | |
| accepted_hashes: set[str], | |
| ) -> Optional[str]: | |
| primary = canonical_hash(cal, sca, swap_body_label=False) | |
| if primary in accepted_hashes: | |
| return primary | |
| alternate = canonical_hash(cal, sca, swap_body_label=True) | |
| if alternate in accepted_hashes: | |
| return alternate | |
| return None | |
| def load_accepted_hashes(path: Path) -> set[str]: | |
| if not path.exists(): | |
| return set() | |
| try: | |
| return {line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()} | |
| except OSError: | |
| return set() | |
| def store_accepted_hash(path: Path, hash_value: str) -> None: | |
| with path.open("a", encoding="utf-8") as handle: | |
| handle.write(hash_value + "\n") | |
| def parse_exdate_dates(value: str) -> set[date]: | |
| if not value: | |
| return set() | |
| dates: set[date] = set() | |
| for part in value.split(","): | |
| chunk = part.strip() | |
| if len(chunk) >= 8: | |
| try: | |
| dates.add(datetime.strptime(chunk[:8], "%Y%m%d").date()) | |
| except ValueError: | |
| continue | |
| return dates | |
| def parse_ics_datetime(value: str, tz: ZoneInfo) -> Optional[datetime]: | |
| if not value: | |
| return None | |
| if "T" not in value: | |
| return None | |
| if value.endswith("Z"): | |
| iso = ics_datetime_to_iso(value) | |
| dt = datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S%z") | |
| return dt.astimezone(tz) | |
| dt = datetime.strptime(value, "%Y%m%dT%H%M%S") | |
| return dt.replace(tzinfo=tz) | |
| def parse_api_datetime(value: str, tz: ZoneInfo) -> Optional[datetime]: | |
| if not value: | |
| return None | |
| cleaned = value | |
| if cleaned.endswith("Z"): | |
| cleaned = cleaned[:-1] + "+0000" | |
| if len(cleaned) >= 6 and cleaned[-3] == ":": | |
| cleaned = cleaned[:-3] + cleaned[-2:] | |
| try: | |
| dt = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z") | |
| except ValueError: | |
| return None | |
| return dt.astimezone(tz) | |
| def parse_termination_date(value: Optional[str], tz: ZoneInfo) -> Optional[date]: | |
| if not value: | |
| return None | |
| cleaned = value | |
| if cleaned.endswith("Z"): | |
| cleaned = cleaned[:-1] + "+0000" | |
| if cleaned[-3] == ":": | |
| cleaned = cleaned[:-3] + cleaned[-2:] | |
| return datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z").astimezone(tz).date() | |
| def parse_booked_until_date(value: Optional[str]) -> Optional[date]: | |
| if not value: | |
| return None | |
| cleaned = value | |
| if cleaned.endswith("Z"): | |
| cleaned = cleaned[:-1] + "+0000" | |
| if cleaned[-3] == ":": | |
| cleaned = cleaned[:-3] + cleaned[-2:] | |
| try: | |
| dt = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S%z") | |
| except ValueError: | |
| return None | |
| return dt.date() | |
| def parse_ics_until(value: str) -> Optional[date]: | |
| if not value: | |
| return None | |
| cleaned = value.strip() | |
| if len(cleaned) >= 8: | |
| try: | |
| return datetime.strptime(cleaned[:8], "%Y%m%d").date() | |
| except ValueError: | |
| return None | |
| return None | |
| def format_weekdays(weekdays: Iterable[int]) -> str: | |
| names = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"] | |
| output = [] | |
| for day in weekdays: | |
| if 0 <= day < len(names): | |
| output.append(names[day]) | |
| return ", ".join(output) | |
| def format_ics_recurrence(rrule: Optional[str], start_dt: datetime) -> str: | |
| if not rrule: | |
| return "none" | |
| parts = {} | |
| for piece in rrule.split(";"): | |
| if "=" in piece: | |
| key, value = piece.split("=", 1) | |
| parts[key] = value | |
| freq = parts.get("FREQ", "").upper() | |
| interval = parts.get("INTERVAL") or "1" | |
| byday = parts.get("BYDAY", "") | |
| until = parts.get("UNTIL", "") | |
| freq_map = { | |
| "DAILY": "daily", | |
| "WEEKLY": "weekly", | |
| "MONTHLY": "monthly", | |
| "YEARLY": "yearly", | |
| } | |
| label = freq_map.get(freq, "none") | |
| if label == "none": | |
| return "none" | |
| tokens = [f"{label} every {interval}"] | |
| if label == "weekly": | |
| day_map = {"SU": 0, "MO": 1, "TU": 2, "WE": 3, "TH": 4, "FR": 5, "SA": 6} | |
| days = [] | |
| if byday: | |
| for code in byday.split(","): | |
| code = code.strip().upper() | |
| if code in day_map: | |
| days.append(day_map[code]) | |
| if not days: | |
| days = [(start_dt.weekday() + 1) % 7] | |
| readable = format_weekdays(days) | |
| if readable: | |
| tokens.append(f"on {readable}") | |
| until_date = parse_ics_until(until) | |
| if until_date: | |
| tokens.append(f"until {until_date.isoformat()}") | |
| return " ".join(tokens) | |
| def recurrence_occurs_on( | |
| target_date: date, | |
| start_date: date, | |
| rule: Dict, | |
| ) -> bool: | |
| if target_date < start_date: | |
| return False | |
| interval = int(rule.get("interval", 1)) | |
| rtype = rule.get("type") | |
| if rtype == "daily": | |
| diff_days = (target_date - start_date).days | |
| return diff_days % interval == 0 | |
| if rtype == "weekly": | |
| weekdays = rule.get("weekdays") or [] | |
| booked_weekday = (target_date.weekday() + 1) % 7 | |
| if booked_weekday not in weekdays: | |
| return False | |
| diff_days = (target_date - start_date).days | |
| return diff_days % (interval * 7) == 0 | |
| if rtype == "monthly": | |
| if target_date.day != start_date.day: | |
| return False | |
| months = (target_date.year - start_date.year) * 12 + ( | |
| target_date.month - start_date.month | |
| ) | |
| return months % interval == 0 | |
| if rtype == "yearly": | |
| if (target_date.month, target_date.day) != (start_date.month, start_date.day): | |
| return False | |
| return (target_date.year - start_date.year) % interval == 0 | |
| return False | |
| def calcium_occurrence_for_date( | |
| event: Dict[str, str], | |
| target_date: date, | |
| tz: ZoneInfo, | |
| ) -> Optional[Tuple[datetime, datetime]]: | |
| start_raw = event.get("DTSTART", "") | |
| end_raw = event.get("DTEND", "") | |
| start_dt = parse_ics_datetime(start_raw, tz) | |
| end_dt = parse_ics_datetime(end_raw, tz) | |
| if not start_dt or not end_dt: | |
| return None | |
| duration = end_dt - start_dt | |
| rrule = event.get("RRULE") | |
| exdates = parse_exdate_dates(event.get("EXDATE", "")) | |
| if not rrule: | |
| if start_dt.date() != target_date: | |
| return None | |
| return start_dt, end_dt | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) | |
| until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) | |
| if until_date and target_date > until_date: | |
| return None | |
| if target_date in exdates: | |
| return None | |
| if not recurrence_occurs_on(target_date, start_dt.date(), rule): | |
| return None | |
| new_start = datetime.combine(target_date, start_dt.timetz()) | |
| new_end = new_start + duration | |
| return new_start, new_end | |
| def build_event_view_from_calcium_occurrence( | |
| event: Dict[str, str], | |
| start_dt: datetime, | |
| end_dt: datetime, | |
| ) -> EventView: | |
| summary = normalize_summary(event.get("SUMMARY", "")) or "(no summary)" | |
| description = event.get("DESCRIPTION") | |
| categories = event.get("CATEGORIES") | |
| rrule = event.get("RRULE") | |
| recurrence_label = format_ics_recurrence(rrule, start_dt) | |
| details: List[Tuple[str, str]] = [] | |
| if categories: | |
| details.append(("TYP", categories)) | |
| details.append(("SUM", summary)) | |
| if recurrence_label != "none": | |
| details.append(("REC", recurrence_label)) | |
| compare_fields = { | |
| "TYP": categories or "", | |
| "SUM": summary or "", | |
| "REC": recurrence_label or "none", | |
| } | |
| patient_source = extract_patient_source(summary or "") or "" | |
| body_part = extract_body_part(summary or "") or "" | |
| spi_values = extract_initials(summary or "") | |
| if categories and categories.strip().lower() == "open": | |
| patient_source = "" | |
| body_part = "" | |
| spi_values = [] | |
| elif categories and "Clinical" in categories: | |
| pat_parsed, body_parsed, spi_parsed, _cleaned = parse_clinical_prefix(summary or "") | |
| if pat_parsed: | |
| patient_source = pat_parsed | |
| if body_parsed: | |
| body_part = body_parsed | |
| if spi_parsed: | |
| spi_values = [spi_parsed] | |
| ai_payload = { | |
| "categories": categories or "", | |
| "summary": summary or "", | |
| "description": description or "", | |
| "emails": extract_emails(summary or ""), | |
| "initials": spi_values, | |
| "patient_source": patient_source, | |
| "body_part": body_part, | |
| "recurrence": recurrence_label or "none", | |
| } | |
| return EventView( | |
| start=start_dt, | |
| end=end_dt, | |
| title=summary, | |
| description=description, | |
| details=details, | |
| recurrence=recurrence_label, | |
| reference=None, | |
| compare_fields=compare_fields, | |
| ai_payload=ai_payload, | |
| calcium_event=event, | |
| scandium_detail=None, | |
| ) | |
| def build_calcium_events_for_date( | |
| events_path: Path, | |
| preferences_path: Optional[Path], | |
| tz: ZoneInfo, | |
| target_date: date, | |
| start_time: Optional[time], | |
| end_time: Optional[time], | |
| ) -> List[EventView]: | |
| vevents = build_vevents( | |
| events_path, | |
| category_filter=None, | |
| tz_name=str(tz.key), | |
| preferences_path=preferences_path, | |
| exclude_categories=None, | |
| debug=False, | |
| use_utc=True, | |
| ) | |
| results: List[EventView] = [] | |
| extra_open: List[Dict[str, str]] = [] | |
| for calcium_event in parse_calcium_events( | |
| events_path, | |
| category_filter=None, | |
| exclude_categories=None, | |
| preferences_path=preferences_path, | |
| debug=False, | |
| ): | |
| if calcium_event.categories: | |
| continue | |
| summary_text = (calcium_event.text or "").lstrip() | |
| if summary_text[:4].lower() != "open": | |
| continue | |
| for vevent in build_vevent( | |
| calcium_event, | |
| tz, | |
| category_value="Open", | |
| use_utc=True, | |
| debug=False, | |
| ): | |
| if vevent: | |
| extra_open.append(vevent) | |
| def append_event(event: Dict[str, str]) -> None: | |
| occurrence = calcium_occurrence_for_date(event, target_date, tz) | |
| if not occurrence: | |
| return | |
| start_dt, end_dt = occurrence | |
| if start_time and end_time: | |
| if start_dt.timetz().replace(tzinfo=None) != start_time: | |
| return | |
| if end_dt.timetz().replace(tzinfo=None) != end_time: | |
| return | |
| results.append(build_event_view_from_calcium_occurrence(event, start_dt, end_dt)) | |
| for item in vevents: | |
| append_event(item.event) | |
| for event in extra_open: | |
| append_event(event) | |
| return results | |
| def build_calcium_events_by_date( | |
| events_path: Path, | |
| preferences_path: Optional[Path], | |
| tz: ZoneInfo, | |
| dates: Iterable[date], | |
| start_time: Optional[time], | |
| end_time: Optional[time], | |
| ) -> Dict[date, List[EventView]]: | |
| dates_sorted = sorted(dates) | |
| results: Dict[date, List[EventView]] = {value: [] for value in dates_sorted} | |
| if not dates_sorted: | |
| return results | |
| calcium_events = parse_calcium_events( | |
| events_path, | |
| category_filter=None, | |
| exclude_categories=None, | |
| preferences_path=preferences_path, | |
| debug=False, | |
| ) | |
| vevents: List[Dict[str, str]] = [] | |
| for calcium_event in calcium_events: | |
| category_value = calcium_event.categories[0] if calcium_event.categories else None | |
| if category_value: | |
| for vevent in build_vevent( | |
| calcium_event, | |
| tz, | |
| category_value=category_value, | |
| use_utc=True, | |
| debug=False, | |
| ): | |
| if vevent: | |
| vevents.append(vevent) | |
| if calcium_event.categories: | |
| continue | |
| summary_text = (calcium_event.text or "").lstrip() | |
| if summary_text[:4].lower() != "open": | |
| continue | |
| for vevent in build_vevent( | |
| calcium_event, | |
| tz, | |
| category_value="Open", | |
| use_utc=True, | |
| debug=False, | |
| ): | |
| if vevent: | |
| vevents.append(vevent) | |
| parsed: List[ParsedCalciumVEvent] = [] | |
| for event in vevents: | |
| start_dt = parse_ics_datetime(event.get("DTSTART", ""), tz) | |
| end_dt = parse_ics_datetime(event.get("DTEND", ""), tz) | |
| if not start_dt or not end_dt: | |
| continue | |
| rrule = event.get("RRULE") | |
| rule = parse_rrule(rrule, start_dt.replace(tzinfo=None)) if rrule else None | |
| until_date = parse_termination_date(rule.get("repeatTerminationDate"), tz) if rule else None | |
| parsed.append( | |
| ParsedCalciumVEvent( | |
| event=event, | |
| start_dt=start_dt, | |
| end_dt=end_dt, | |
| duration=end_dt - start_dt, | |
| rrule=rrule, | |
| rule=rule, | |
| until_date=until_date, | |
| exdates=parse_exdate_dates(event.get("EXDATE", "")), | |
| ) | |
| ) | |
| min_date = dates_sorted[0] | |
| max_date = dates_sorted[-1] | |
| for item in parsed: | |
| start_date = item.start_dt.date() | |
| if not item.rrule: | |
| if start_date not in results: | |
| continue | |
| if start_time and end_time: | |
| if item.start_dt.timetz().replace(tzinfo=None) != start_time: | |
| continue | |
| if item.end_dt.timetz().replace(tzinfo=None) != end_time: | |
| continue | |
| results[start_date].append( | |
| build_event_view_from_calcium_occurrence(item.event, item.start_dt, item.end_dt) | |
| ) | |
| continue | |
| range_start = max(start_date, min_date) | |
| range_end = min(item.until_date, max_date) if item.until_date else max_date | |
| if range_end < range_start: | |
| continue | |
| start_idx = bisect_left(dates_sorted, range_start) | |
| end_idx = bisect_right(dates_sorted, range_end) | |
| for target_date in dates_sorted[start_idx:end_idx]: | |
| if target_date in item.exdates: | |
| continue | |
| if not item.rule or not recurrence_occurs_on(target_date, start_date, item.rule): | |
| continue | |
| new_start = datetime.combine(target_date, item.start_dt.timetz()) | |
| new_end = new_start + item.duration | |
| if start_time and end_time: | |
| if new_start.timetz().replace(tzinfo=None) != start_time: | |
| continue | |
| if new_end.timetz().replace(tzinfo=None) != end_time: | |
| continue | |
| results[target_date].append( | |
| build_event_view_from_calcium_occurrence(item.event, new_start, new_end) | |
| ) | |
| return results | |
| def format_recurrence(rule: Optional[Dict], start_dt: datetime) -> str: | |
| if not rule: | |
| return "none" | |
| rule_type = rule.get("type") or "none" | |
| if rule_type == "none": | |
| return "none" | |
| parts = [rule_type] | |
| interval = rule.get("interval") | |
| if interval: | |
| parts.append(f"every {interval}") | |
| weekdays = rule.get("weekdays") or [] | |
| if rule_type == "weekly": | |
| normalized_days: List[int] = [] | |
| for day in weekdays: | |
| try: | |
| normalized_days.append(int(day)) | |
| except (TypeError, ValueError): | |
| continue | |
| if not normalized_days: | |
| normalized_days = [(start_dt.weekday() + 1) % 7] | |
| readable = format_weekdays(normalized_days) | |
| if readable: | |
| parts.append(f"on {readable}") | |
| until = rule.get("repeatTerminationDate") | |
| if until: | |
| until_date = parse_booked_until_date(until) | |
| if until_date: | |
| parts.append(f"until {until_date.isoformat()}") | |
| return " ".join(parts) | |
| def build_scandium_event_view_from_detail( | |
| detail: Dict[str, object], | |
| tz: ZoneInfo, | |
| ) -> Optional[EventView]: | |
| start_raw = detail.get("startDate") or detail.get("startDateTime") | |
| end_raw = detail.get("endDate") or detail.get("endDateTime") | |
| start_dt = parse_api_datetime(start_raw, tz) if start_raw else None | |
| end_dt = parse_api_datetime(end_raw, tz) if end_raw else None | |
| if not start_dt or not end_dt: | |
| return None | |
| title = (detail.get("title") or "").strip() | |
| description = detail.get("description") | |
| reference = detail.get("referenceNumber") or None | |
| participant_items = detail.get("participants", []) | |
| invitee_items = detail.get("invitees", []) | |
| participants: List[str] = [] | |
| for entry in list(participant_items) + list(invitee_items): | |
| if isinstance(entry, dict): | |
| first = (entry.get("firstName") or "").strip() | |
| last = (entry.get("lastName") or "").strip() | |
| email = (entry.get("emailAddress") or "").strip() | |
| label = " ".join(piece for piece in [first, last] if piece).strip() | |
| if email: | |
| if label: | |
| label = f"{label} <{email}>" | |
| else: | |
| label = email | |
| if label: | |
| participants.append(label) | |
| elif isinstance(entry, str): | |
| label = entry.strip() | |
| if label: | |
| participants.append(label) | |
| participant_text = ", ".join(p.strip() for p in participants if p.strip()) | |
| label_map = { | |
| "patient source": "PAT", | |
| "scanned person initials": "SPI", | |
| "body part": "BOD", | |
| "contrast": "CON", | |
| } | |
| attributes: List[Tuple[str, str]] = [] | |
| scan_type_value: Optional[str] = None | |
| spi_value: Optional[str] = None | |
| patient_source: Optional[str] = None | |
| body_part: Optional[str] = None | |
| for attr in detail.get("customAttributes", []): | |
| label = (attr.get("label") or "").strip() | |
| value = (attr.get("value") or "").strip() | |
| label_key = label.lower() | |
| if not label or not value: | |
| continue | |
| if label_key == "scan type / pi" and not scan_type_value: | |
| scan_type_value = value | |
| continue | |
| if label_key == "scanned person initials" and not spi_value: | |
| spi_value = value | |
| if label_key == "patient source" and not patient_source: | |
| patient_source = value | |
| if label_key == "body part" and not body_part: | |
| body_part = value | |
| attributes.append((label_map.get(label_key, label), value)) | |
| recurrence = format_recurrence(detail.get("recurrenceRule"), start_dt) | |
| details: List[Tuple[str, str]] = [] | |
| compare_fields: Dict[str, str] = {} | |
| if scan_type_value: | |
| details.append(("TYP", scan_type_value)) | |
| compare_fields["TYP"] = scan_type_value | |
| if scan_type_value and scan_type_value.strip().lower() == "notice": | |
| if description and title and description.strip() == title.strip(): | |
| title = "" | |
| else: | |
| if description and title and description.strip() == title.strip(): | |
| description = "" | |
| if description: | |
| details.append(("SUM", description)) | |
| compare_fields["SUM"] = description | |
| if title: | |
| details.append(("TIT", title)) | |
| if not compare_fields.get("SUM"): | |
| compare_fields["SUM"] = title | |
| if recurrence != "none": | |
| details.append(("REC", recurrence)) | |
| compare_fields["REC"] = recurrence | |
| if participant_text: | |
| details.append(("PAR", participant_text)) | |
| for label, value in attributes: | |
| details.append((label, value)) | |
| ai_payload = { | |
| "scan_type_pi": scan_type_value or "", | |
| "description": description or "", | |
| "title": title, | |
| "recurrence": recurrence or "none", | |
| "participants": participants, | |
| "attributes": {label: value for label, value in attributes}, | |
| "spi": spi_value or "", | |
| "patient_source": patient_source or "", | |
| "body_part": body_part or "", | |
| } | |
| return EventView( | |
| start=start_dt, | |
| end=end_dt, | |
| title=title, | |
| description=description, | |
| details=details, | |
| recurrence=recurrence, | |
| reference=reference, | |
| compare_fields=compare_fields, | |
| ai_payload=ai_payload, | |
| calcium_event=None, | |
| scandium_detail=detail, | |
| ) | |
| def build_scandium_events_for_date( | |
| api: BookedAPI, | |
| resource_name: str, | |
| tz: ZoneInfo, | |
| target_date: date, | |
| start_time: Optional[time], | |
| end_time: Optional[time], | |
| resource: Optional[Dict[str, object]] = None, | |
| ) -> List[EventView]: | |
| if resource is None: | |
| resource = api.find_resource_by_name(resource_name) | |
| if not resource: | |
| raise RuntimeError(f"Resource not found: {resource_name}") | |
| start_date = target_date.strftime("%Y-%m-%d") | |
| end_date = (target_date + timedelta(days=1)).strftime("%Y-%m-%d") | |
| reservations = api.list_reservations( | |
| start_date=start_date, | |
| end_date=end_date, | |
| resource_id=resource["resourceId"], | |
| ) | |
| results: List[EventView] = [] | |
| for reservation in reservations: | |
| reference = reservation.get("referenceNumber", "") | |
| detail = reservation | |
| needs_detail = False | |
| if reference: | |
| for field in ("customAttributes", "participants", "invitees", "recurrenceRule"): | |
| if field not in reservation: | |
| needs_detail = True | |
| break | |
| if not needs_detail: | |
| if not reservation.get("customAttributes") or reservation.get("recurrenceRule") is None: | |
| needs_detail = True | |
| if needs_detail: | |
| try: | |
| detail = api.get_reservation(reference) | |
| except Exception: | |
| detail = reservation | |
| view = build_scandium_event_view_from_detail(detail, tz) | |
| if not view: | |
| continue | |
| if view.start.date() != target_date: | |
| continue | |
| if start_time and end_time: | |
| if view.start.timetz().replace(tzinfo=None) != start_time: | |
| continue | |
| if view.end.timetz().replace(tzinfo=None) != end_time: | |
| continue | |
| results.append(view) | |
| return results | |
| def group_by_time(events: Iterable[EventView]) -> Dict[Tuple[datetime, datetime], List[EventView]]: | |
| grouped: Dict[Tuple[datetime, datetime], List[EventView]] = {} | |
| for event in events: | |
| key = (event.start, event.end) | |
| grouped.setdefault(key, []).append(event) | |
| return grouped | |
| def format_event_label(event: EventView, width: int) -> str: | |
| if not event.details: | |
| title = event.title.strip() if event.title else "(no title)" | |
| if event.description: | |
| return f"{title}\n{event.description.strip()}" | |
| return title | |
| lines: List[str] = [] | |
| for key, value in event.details: | |
| label = f"{key}: " | |
| available = max(10, width - len(label)) | |
| wrapped = textwrap.wrap(value, width=available) or [""] | |
| for idx, chunk in enumerate(wrapped): | |
| prefix = label if idx == 0 else " " * len(label) | |
| lines.append(f"{prefix}{BOLD}{chunk}{RESET}") | |
| return "\n".join(lines) | |
| def build_compare_key(calcium: EventView, scandium: EventView) -> str: | |
| payload = { | |
| "calcium": calcium.ai_payload, | |
| "scandium": scandium.ai_payload, | |
| } | |
| return json.dumps(payload, sort_keys=True) | |
| def build_ai_prompt(calcium: EventView, scandium: EventView) -> str: | |
| return ( | |
| "You are comparing a Calcium calendar event (unstructured) to a Scandium event " | |
| "(structured). Determine whether they refer to the same booking. The Scandium " | |
| "side has structured fields (PAT, Body part, SPI, Participants, Title) that may " | |
| "appear embedded in Calcium SUM. The first words of Calcium SUM often map to " | |
| "Scandium TIT. Scanned person initials might be in Calcium SUM and Scandium SPI. " | |
| "Body part wording can differ (e.g., 'L/Spine' vs 'L Spine'). " | |
| "Emails or names in Calcium SUM often correspond to Scandium Participants. " | |
| "If Scandium fields are contained in Calcium SUM, treat that as a match even if " | |
| "the strings are not identical. If both sides provide PAT or Body part and they " | |
| "conflict, mark as mismatch.\n\n" | |
| "Calcium event:\n" | |
| f"{json.dumps(calcium.ai_payload, indent=2)}\n\n" | |
| "Scandium event:\n" | |
| f"{json.dumps(scandium.ai_payload, indent=2)}\n\n" | |
| "Respond with JSON:\n" | |
| "{\n" | |
| ' "match": true|false,\n' | |
| ' "field_matches": {"TYP": "match|mismatch|unclear", "SUM": "match|mismatch|unclear", "REC": "match|mismatch|unclear"},\n' | |
| ' "notes": "short reason"\n' | |
| "}\n" | |
| ) | |
| def ai_compare( | |
| calcium: EventView, | |
| scandium: EventView, | |
| model: str, | |
| api_key: str, | |
| cache: Dict[str, Dict[str, object]], | |
| ) -> Dict[str, object]: | |
| key = build_compare_key(calcium, scandium) | |
| with CACHE_LOCK: | |
| if key in cache: | |
| return cache[key] | |
| import requests | |
| prompt = build_ai_prompt(calcium, scandium) | |
| response = requests.post( | |
| "https://api.openai.com/v1/responses", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| }, | |
| json={ | |
| "model": model, | |
| "input": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": prompt, | |
| } | |
| ], | |
| } | |
| ], | |
| }, | |
| timeout=30, | |
| ) | |
| if response.status_code != 200: | |
| raise RuntimeError(f"OpenAI error {response.status_code}: {response.text}") | |
| payload = response.json() | |
| text = "" | |
| for item in payload.get("output", []): | |
| if item.get("type") == "message": | |
| for content in item.get("content", []): | |
| if content.get("type") == "output_text": | |
| text += content.get("text", "") | |
| try: | |
| result = json.loads(text) | |
| except json.JSONDecodeError: | |
| result = {"match": False, "field_matches": {}, "notes": "AI response parse error"} | |
| with CACHE_LOCK: | |
| cache[key] = result | |
| return result | |
| def compare_events( | |
| calcium: EventView, | |
| scandium: EventView, | |
| use_ai: bool, | |
| model: str, | |
| api_key: Optional[str], | |
| cache: Dict[str, Dict[str, object]], | |
| ) -> Dict[str, object]: | |
| calcium_spi = set(calcium.ai_payload.get("initials", []) or []) | |
| scandium_spi = (scandium.ai_payload.get("spi") or "").strip() | |
| if scandium_spi and calcium_spi and scandium_spi not in calcium_spi: | |
| return { | |
| "match": False, | |
| "field_matches": {"SUM": "mismatch"}, | |
| "notes": "initials mismatch", | |
| "source": "guard", | |
| } | |
| calcium_typ = calcium.compare_fields.get("TYP", "") | |
| scandium_typ = scandium.compare_fields.get("TYP", "") | |
| if calcium_typ in {"PM / Service", "PM-Service", "PM Service", "PM/Service"} and scandium_typ == "Service": | |
| calcium_typ = "Service" | |
| exact = { | |
| "TYP": calcium_typ == scandium_typ, | |
| "SUM": calcium.compare_fields.get("SUM", "") == scandium.compare_fields.get("SUM", ""), | |
| "REC": calcium.compare_fields.get("REC", "") == scandium.compare_fields.get("REC", ""), | |
| } | |
| if all(exact.values()): | |
| return { | |
| "match": True, | |
| "field_matches": {key: "match" for key in exact}, | |
| "notes": "exact match", | |
| "source": "exact", | |
| } | |
| if not use_ai or not api_key: | |
| mismatched = [key for key, ok in exact.items() if not ok] | |
| return { | |
| "match": False, | |
| "field_matches": {key: "mismatch" if key in mismatched else "match" for key in exact}, | |
| "notes": "exact mismatch", | |
| "source": "exact", | |
| } | |
| try: | |
| result = ai_compare(calcium, scandium, model, api_key, cache) | |
| # Post-check: PAT/body mismatches override AI match when both sides provide values. | |
| calcium_pat = (calcium.ai_payload.get("patient_source") or "").strip() | |
| scandium_pat = (scandium.ai_payload.get("patient_source") or "").strip() | |
| calcium_body = (calcium.ai_payload.get("body_part") or "").strip() | |
| scandium_body = (scandium.ai_payload.get("body_part") or "").strip() | |
| if result.get("match"): | |
| if scandium_pat and calcium_pat: | |
| if normalize_compare_value(scandium_pat) != normalize_compare_value(calcium_pat): | |
| result["match"] = False | |
| field_matches = result.get("field_matches") or {} | |
| field_matches["SUM"] = "mismatch" | |
| result["field_matches"] = field_matches | |
| result["notes"] = "patient source mismatch" | |
| if result.get("match") and scandium_body and calcium_body: | |
| norm_scandium_body = normalize_compare_value(scandium_body) | |
| norm_calcium_body = normalize_compare_value(calcium_body) | |
| if norm_scandium_body != norm_calcium_body: | |
| if norm_scandium_body not in norm_calcium_body and norm_calcium_body not in norm_scandium_body: | |
| result["match"] = False | |
| field_matches = result.get("field_matches") or {} | |
| field_matches["SUM"] = "mismatch" | |
| result["field_matches"] = field_matches | |
| result["notes"] = "body part mismatch" | |
| result["source"] = "ai" | |
| return result | |
| except Exception as exc: | |
| mismatched = [key for key, ok in exact.items() if not ok] | |
| return { | |
| "match": False, | |
| "field_matches": {key: "mismatch" if key in mismatched else "match" for key in exact}, | |
| "notes": f"ai error: {exc}", | |
| "source": "ai-error", | |
| } | |
| def input_with_default(label: str, current: str) -> str: | |
| prompt = f"{label}: " | |
| if HAS_PROMPT_TOOLKIT: | |
| response = pt_prompt(prompt, default=current or "").strip() | |
| else: | |
| def hook() -> None: | |
| readline.insert_text(current) | |
| readline.redisplay() | |
| readline.set_startup_hook(hook) | |
| try: | |
| response = input(prompt).strip() | |
| finally: | |
| readline.set_startup_hook(None) | |
| if response == "": | |
| return current | |
| if response == "-": | |
| return "" | |
| return response | |
| def input_sum_with_default(current: str, calcium_sum: str) -> str: | |
| suffix = " (c=copy from calcium, -=clear)" | |
| label = f"SUM (Description){suffix}" | |
| prompt = f"{label}: " | |
| if HAS_PROMPT_TOOLKIT: | |
| response = pt_prompt(prompt, default=current or "").strip() | |
| if response == "": | |
| return current | |
| else: | |
| def hook() -> None: | |
| readline.insert_text(current) | |
| readline.redisplay() | |
| readline.set_startup_hook(hook) | |
| try: | |
| response = input(prompt).strip() | |
| finally: | |
| readline.set_startup_hook(None) | |
| if response == "": | |
| return current | |
| if response == "-": | |
| return "" | |
| if response.lower() == "c" and calcium_sum: | |
| return calcium_sum | |
| return response | |
| def prompt_yes_no(prompt: str, default: bool = False) -> bool: | |
| suffix = " [Y/n/h]: " if default else " [y/N/h]: " | |
| while True: | |
| response = input(prompt + suffix).strip().lower() | |
| if response == "h": | |
| print("Options: y=yes, n=no.") | |
| continue | |
| if not response: | |
| return default | |
| return response in {"y", "yes"} | |
| def prompt_yes_no_accept( | |
| prompt: str, | |
| default: bool = False, | |
| include_reload: bool = False, | |
| include_clear: bool = False, | |
| include_update_rec: bool = False, | |
| ) -> str: | |
| if include_reload and include_clear and include_update_rec: | |
| suffix = " [y/N/a/r/c/u/h]: " if not default else " [Y/n/a/r/c/u/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, r=reload, c=clear SUM, u=update REC." | |
| elif include_reload and include_clear: | |
| suffix = " [y/N/a/r/c/h]: " if not default else " [Y/n/a/r/c/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, r=reload, c=clear SUM." | |
| elif include_reload and include_update_rec: | |
| suffix = " [y/N/a/r/u/h]: " if not default else " [Y/n/a/r/u/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, r=reload, u=update REC." | |
| elif include_clear and include_update_rec: | |
| suffix = " [y/N/a/c/u/h]: " if not default else " [Y/n/a/c/u/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, c=clear SUM, u=update REC." | |
| elif include_reload: | |
| suffix = " [y/N/a/r/h]: " if not default else " [Y/n/a/r/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, r=reload." | |
| elif include_clear: | |
| suffix = " [y/N/a/c/h]: " if not default else " [Y/n/a/c/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, c=clear SUM." | |
| elif include_update_rec: | |
| suffix = " [y/N/a/u/h]: " if not default else " [Y/n/a/u/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept, u=update REC." | |
| else: | |
| suffix = " [y/N/a/h]: " if not default else " [Y/n/a/h]: " | |
| help_text = "Options: y=yes, n=no, a=accept." | |
| while True: | |
| response = input(prompt + suffix).strip().lower() | |
| if response == "h": | |
| print(help_text) | |
| continue | |
| if response == "r" and include_reload: | |
| return "reload" | |
| if response == "c" and include_clear: | |
| return "clear" | |
| if response == "u" and include_update_rec: | |
| return "update_rec" | |
| if response == "a": | |
| return "accept" | |
| if not response: | |
| return "yes" if default else "no" | |
| return "yes" if response in {"y", "yes"} else "no" | |
| def prompt_edit_scope( | |
| is_recurring: bool, | |
| default_all: bool = False, | |
| ) -> str: | |
| if not is_recurring: | |
| return "this" | |
| while True: | |
| prompt = "Edit scope [t]his/[a]ll/[h]: " | |
| if default_all: | |
| prompt = "Edit scope [t]his/[A]ll/[h]: " | |
| response = input(prompt).strip().lower() | |
| if response == "h": | |
| print("Options: t=this instance, a=all instances.") | |
| continue | |
| if not response: | |
| return "full" if default_all else "this" | |
| if response.startswith("a"): | |
| return "full" | |
| return "this" | |
| def update_reservation_raw( | |
| api: BookedAPI, | |
| reference: str, | |
| data: Dict[str, object], | |
| update_scope: str, | |
| ) -> Dict: | |
| return api._request( | |
| "POST", | |
| f"Reservations/{reference}", | |
| params={"updateScope": update_scope}, | |
| json=data, | |
| ) | |
| def update_scandium_event( | |
| api: BookedAPI, | |
| sca: EventView, | |
| tz: ZoneInfo, | |
| fields: Optional[set[str]] = None, | |
| calcium: Optional[EventView] = None, | |
| prefill: Optional[Dict[str, str]] = None, | |
| prompt: bool = True, | |
| update_scope_override: Optional[str] = None, | |
| recurrence_rule_override: Optional[Dict[str, object]] = None, | |
| ) -> Optional[EventView]: | |
| detail = sca.scandium_detail or {} | |
| reference = sca.reference | |
| if not reference: | |
| return sca | |
| title_current = (detail.get("title") or "").strip() | |
| desc_current = (detail.get("description") or "").strip() | |
| typ_current = "" | |
| pat_current = "" | |
| body_current = "" | |
| spi_current = "" | |
| attr_map: Dict[str, Tuple[int, str]] = {} | |
| for attr in detail.get("customAttributes", []): | |
| label = (attr.get("label") or "").strip() | |
| value = (attr.get("value") or "").strip() | |
| if label and value: | |
| attr_map[label.lower()] = (int(attr.get("id")), value) | |
| if "scan type / pi" in attr_map: | |
| typ_current = attr_map["scan type / pi"][1] | |
| if "patient source" in attr_map: | |
| pat_current = attr_map["patient source"][1] | |
| if "body part" in attr_map: | |
| body_current = attr_map["body part"][1] | |
| if "scanned person initials" in attr_map: | |
| spi_current = attr_map["scanned person initials"][1] | |
| if not typ_current: | |
| typ_current = sca.compare_fields.get("TYP", "") | |
| if not pat_current: | |
| pat_current = (sca.ai_payload.get("patient_source") or "") | |
| if not body_current: | |
| body_current = (sca.ai_payload.get("body_part") or "") | |
| if not spi_current: | |
| spi_current = (sca.ai_payload.get("spi") or "") | |
| fields = fields or {"TYP", "PAT", "BODY", "SPI", "SUM"} | |
| typ_input = typ_current | |
| sum_input = desc_current | |
| tit_input = title_current | |
| pat_input = pat_current | |
| body_input = body_current | |
| spi_input = spi_current | |
| par_input = "" | |
| rec_input = sca.recurrence or "none" | |
| if prefill and "TYP" in prefill: | |
| typ_current = prefill["TYP"] | |
| typ_input = prefill["TYP"] | |
| if "TYP" in fields and prompt: | |
| typ_input = input_with_default("TYP (Scan Type / PI)", typ_current) | |
| if prefill and "SPI" in prefill: | |
| spi_current = prefill["SPI"] | |
| spi_input = prefill["SPI"] | |
| if "SPI" in fields and prompt: | |
| spi_input = input_with_default("SPI (Scanned Person Initials)", spi_current) | |
| typ_is_clinical = (typ_input or "").strip().lower() == "clinical" | |
| typ_changed_from_clinical_to_open = ( | |
| (typ_current or "").strip().lower() == "clinical" | |
| and (typ_input or "").strip().lower() == "open" | |
| ) | |
| if prefill and "PAT" in prefill: | |
| pat_current = prefill["PAT"] | |
| pat_input = prefill["PAT"] | |
| if "PAT" in fields and typ_is_clinical and prompt: | |
| pat_input = input_with_default("PAT (Patient Source)", pat_current) | |
| else: | |
| pat_input = "" | |
| if prefill and "BODY" in prefill: | |
| body_current = prefill["BODY"] | |
| body_input = prefill["BODY"] | |
| if "BODY" in fields and typ_is_clinical and prompt: | |
| body_input = input_with_default("Body part", body_current) | |
| else: | |
| body_input = "" | |
| if typ_changed_from_clinical_to_open: | |
| body_input = "" | |
| spi_input = "" | |
| # Prompt SUM last. | |
| if prefill and "SUM" in prefill: | |
| sum_input = prefill["SUM"] | |
| if "SUM" in fields and prompt: | |
| calcium_sum = "" | |
| if calcium: | |
| calcium_sum = calcium.compare_fields.get("SUM", "") | |
| sum_input = input_sum_with_default(sum_input, calcium_sum) | |
| participants_current = [] | |
| for p in list(detail.get("participants", [])) + list(detail.get("invitees", [])): | |
| if isinstance(p, dict): | |
| email = (p.get("emailAddress") or "").strip() | |
| if email: | |
| participants_current.append(email) | |
| elif isinstance(p, str) and p.strip(): | |
| participants_current.append(p.strip()) | |
| par_current = ", ".join(participants_current) | |
| par_input = par_current | |
| rec_current = sca.recurrence or "none" | |
| rec_input = rec_current | |
| if update_scope_override: | |
| update_scope = update_scope_override | |
| else: | |
| update_scope = prompt_edit_scope(bool(detail.get("isRecurring"))) | |
| custom_attrs: List[Dict[str, object]] = [] | |
| label_to_id = { | |
| "Scan Type / PI": 1, | |
| "Patient Source": 11, | |
| "Body part": 10, | |
| "Scanned Person Initials": 2, | |
| } | |
| def apply_attr(label: str, value: str) -> None: | |
| custom_attrs.append({"attributeId": label_to_id[label], "attributeValue": value}) | |
| apply_attr("Scan Type / PI", typ_input or "") | |
| apply_attr("Patient Source", pat_input or "") | |
| apply_attr("Body part", body_input or "") | |
| apply_attr("Scanned Person Initials", spi_input or "") | |
| participant_ids: List[int] = [] | |
| if par_input.strip(): | |
| for raw in par_input.split(","): | |
| email = raw.strip() | |
| if not email: | |
| continue | |
| user = api.find_user_by_email(email) | |
| if user: | |
| participant_ids.append(user["id"]) | |
| else: | |
| print(f"Participant not found: {email}") | |
| recurrence_rule = None | |
| if rec_input.strip().lower() == "none": | |
| recurrence_rule = {"type": "none"} | |
| elif rec_input.strip() and rec_input.strip() != rec_current: | |
| rrule = rec_input.strip() | |
| if rrule.upper().startswith("RRULE:"): | |
| rrule = rrule.split(":", 1)[1] | |
| recurrence_rule = parse_rrule(rrule, sca.start.replace(tzinfo=None)) | |
| data = { | |
| "resourceId": int(detail.get("resourceId")), | |
| "userId": api.get_user_id(), | |
| "startDateTime": detail.get("startDate") or detail.get("startDateTime"), | |
| "endDateTime": detail.get("endDate") or detail.get("endDateTime"), | |
| "title": tit_input, | |
| "description": sum_input or "", | |
| "participants": participant_ids, | |
| "customAttributes": custom_attrs, | |
| } | |
| if recurrence_rule_override is not None: | |
| data["recurrenceRule"] = recurrence_rule_override | |
| elif recurrence_rule is not None: | |
| data["recurrenceRule"] = recurrence_rule | |
| update_reservation_raw(api, reference, data, update_scope) | |
| new_detail = api.get_reservation(reference) | |
| return build_scandium_event_view_from_detail(new_detail, tz) | |
| def build_event_payload_simple( | |
| api: BookedAPI, | |
| event: Dict[str, str], | |
| resource: Dict[str, object], | |
| tz: ZoneInfo, | |
| ) -> Dict[str, object]: | |
| dtstart = event.get("DTSTART", "") | |
| dtend = event.get("DTEND", "") | |
| if not dtstart or not dtend: | |
| raise ValueError("DTSTART and DTEND are required") | |
| start_iso = ics_datetime_to_iso(dtstart) | |
| end_iso = ics_datetime_to_iso(dtend) | |
| summary_raw = event.get("SUMMARY", "").strip() | |
| summary = normalize_summary(summary_raw) | |
| categories = event.get("CATEGORIES", "") | |
| is_open_category = categories.strip().lower() == "open" | |
| summary_clean = summary | |
| title = summary_clean.splitlines()[0].strip() if summary_clean else "" | |
| matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories) | |
| if not matched_pi and categories.strip(): | |
| matched_pi = api.find_attribute_value(PI_ATTRIBUTE_ID, categories.split()[0]) | |
| if not matched_pi: | |
| matched_pi = categories.strip() or "Unknown" | |
| initials: List[str] = [] | |
| if not is_open_category: | |
| initials = extract_initials(summary) | |
| custom_attributes = [ | |
| {"attributeId": PI_ATTRIBUTE_ID, "attributeValue": matched_pi} | |
| ] | |
| if initials: | |
| custom_attributes.append( | |
| { | |
| "attributeId": SCANNED_INITIALS_ATTRIBUTE_ID, | |
| "attributeValue": initials[0], | |
| } | |
| ) | |
| participant_ids: List[int] = [] | |
| seen_user_ids: set[int] = set() | |
| if not is_open_category: | |
| for email in extract_emails(summary): | |
| user = api.find_user_by_email(email) | |
| if not user: | |
| swapped = swap_email_domain(email) | |
| if swapped: | |
| user = api.find_user_by_email(swapped) | |
| if user: | |
| user_id = int(user["id"]) | |
| if user_id not in seen_user_ids: | |
| participant_ids.append(user_id) | |
| seen_user_ids.add(user_id) | |
| recurrence_rule = None | |
| if event.get("RRULE"): | |
| start_dt = parse_ics_datetime(dtstart, tz) | |
| if start_dt: | |
| recurrence_rule = parse_rrule(event.get("RRULE", ""), start_dt.replace(tzinfo=None)) | |
| is_notice_category = categories.strip().lower() == "notice" | |
| if title and summary_clean and title.strip() == summary_clean.strip(): | |
| if is_notice_category: | |
| title = "" | |
| else: | |
| summary_clean = "" | |
| if is_open_category: | |
| title = "" | |
| return { | |
| "resource_id": resource["resourceId"], | |
| "start_datetime": start_iso, | |
| "end_datetime": end_iso, | |
| "title": title, | |
| "description": summary_clean or None, | |
| "participants": participant_ids if participant_ids else None, | |
| "custom_attributes": custom_attributes, | |
| "recurrence_rule": recurrence_rule, | |
| } | |
| def create_scandium_from_calcium( | |
| api: BookedAPI, | |
| resource: Dict[str, object], | |
| tz: ZoneInfo, | |
| calcium_event: Dict[str, str], | |
| ) -> Optional[EventView]: | |
| categories = calcium_event.get("CATEGORIES", "") or "" | |
| clinical_created = False | |
| if "Clinical" in categories: | |
| description_raw = calcium_event.get("SUMMARY", "") | |
| pat, body, spi, description = parse_clinical_prefix(description_raw) | |
| if spi and contains_spi_blocker(description_raw): | |
| spi = None | |
| if not spi: | |
| spi = extract_scanned_initials(description_raw) | |
| pi_value = api.find_attribute_value(1, "Clinical") or "Clinical" | |
| custom_attrs = [{"attributeId": 1, "attributeValue": pi_value}] | |
| if pat: | |
| custom_attrs.append({"attributeId": 11, "attributeValue": pat}) | |
| if body: | |
| custom_attrs.append({"attributeId": 10, "attributeValue": body}) | |
| if spi: | |
| custom_attrs.append({"attributeId": 2, "attributeValue": spi}) | |
| if has_contrast_indicator(description): | |
| custom_attrs.append({"attributeId": 3, "attributeValue": "Yes"}) | |
| recurrence_rule = None | |
| if calcium_event.get("RRULE"): | |
| start_dt = parse_ics_datetime(calcium_event.get("DTSTART", ""), tz) | |
| if start_dt: | |
| recurrence_rule = parse_rrule( | |
| calcium_event.get("RRULE", ""), start_dt.replace(tzinfo=None) | |
| ) | |
| result = api.create_reservation( | |
| resource_id=resource["resourceId"], | |
| start_datetime=ics_datetime_to_iso(calcium_event.get("DTSTART", "")), | |
| end_datetime=ics_datetime_to_iso(calcium_event.get("DTEND", "")), | |
| title="", | |
| description=description, | |
| custom_attributes=custom_attrs, | |
| recurrence_rule=recurrence_rule, | |
| ) | |
| clinical_created = True | |
| elif "Service" in categories: | |
| service_value = api.find_attribute_value(PI_ATTRIBUTE_ID, "Service") or "Service" | |
| start_iso = ics_datetime_to_iso(calcium_event.get("DTSTART", "")) | |
| end_iso = ics_datetime_to_iso(calcium_event.get("DTEND", "")) | |
| summary = calcium_event.get("SUMMARY", "") | |
| recurrence_rule = None | |
| if calcium_event.get("RRULE"): | |
| start_dt = parse_ics_datetime(calcium_event.get("DTSTART", ""), tz) | |
| if start_dt: | |
| recurrence_rule = parse_rrule( | |
| calcium_event.get("RRULE", ""), start_dt.replace(tzinfo=None) | |
| ) | |
| result = api.create_reservation( | |
| resource_id=resource["resourceId"], | |
| start_datetime=start_iso, | |
| end_datetime=end_iso, | |
| title="", | |
| description=normalize_summary(summary), | |
| custom_attributes=[{"attributeId": PI_ATTRIBUTE_ID, "attributeValue": service_value}], | |
| recurrence_rule=recurrence_rule, | |
| ) | |
| else: | |
| payload = build_event_payload_simple(api, calcium_event, resource, tz) | |
| result = api.create_reservation( | |
| resource_id=payload["resource_id"], | |
| start_datetime=payload["start_datetime"], | |
| end_datetime=payload["end_datetime"], | |
| title=payload["title"], | |
| description=payload.get("description"), | |
| participants=payload.get("participants"), | |
| custom_attributes=payload.get("custom_attributes"), | |
| recurrence_rule=payload.get("recurrence_rule"), | |
| ) | |
| reference = result.get("referenceNumber") | |
| if not reference: | |
| print(f"Create failed: {result}") | |
| return None | |
| new_detail = api.get_reservation(reference) | |
| view = build_scandium_event_view_from_detail(new_detail, tz) | |
| return view | |
| def build_pairs( | |
| calcium_events: List[EventView], | |
| scandium_events: List[EventView], | |
| date_value: date, | |
| ) -> List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]]: | |
| def normalize_match_text(text: str) -> str: | |
| cleaned = " ".join(text.strip().split()) | |
| return cleaned.lower() | |
| def normalize_open_summary(text: str) -> str: | |
| cleaned = normalize_match_text(text) | |
| return re.sub(r"^open\b\s*[-:]*\s*", "", cleaned, flags=re.IGNORECASE) | |
| def normalize_typ_for_pairing(raw: str) -> str: | |
| cleaned = raw.strip().lower() | |
| if not cleaned: | |
| return "" | |
| if "service" in cleaned and "pm" in cleaned: | |
| return "service" | |
| if "/" in cleaned: | |
| cleaned = cleaned.split("/")[-1].strip() | |
| if cleaned: | |
| cleaned = cleaned.split()[0] | |
| return cleaned | |
| def match_fields(event: EventView) -> Tuple[str, str]: | |
| typ = (event.compare_fields.get("TYP") or "").strip() | |
| summary = (event.compare_fields.get("SUM") or "").strip() | |
| if not typ and summary.lower().startswith("open"): | |
| typ = "Open" | |
| typ_key = normalize_typ_for_pairing(typ) | |
| if typ_key == "open": | |
| summary = normalize_open_summary(summary) | |
| else: | |
| summary = normalize_match_text(summary) | |
| return (typ_key, summary) | |
| calcium_grouped = group_by_time(calcium_events) | |
| scandium_grouped = group_by_time(scandium_events) | |
| all_keys = sorted( | |
| set(calcium_grouped) | set(scandium_grouped), | |
| key=lambda key: (key[0], key[1]), | |
| ) | |
| pairs: List[ | |
| Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date] | |
| ] = [] | |
| for key in all_keys: | |
| calcium_list = calcium_grouped.get(key, []) | |
| scandium_list = scandium_grouped.get(key, []) | |
| if not calcium_list or not scandium_list: | |
| max_count = max(len(calcium_list), len(scandium_list)) | |
| for idx in range(max_count): | |
| cal = calcium_list[idx] if idx < len(calcium_list) else None | |
| sca = scandium_list[idx] if idx < len(scandium_list) else None | |
| pairs.append((cal, sca, key, date_value)) | |
| continue | |
| cal_remaining = list(range(len(calcium_list))) | |
| sca_remaining = list(range(len(scandium_list))) | |
| matched: List[Tuple[int, int]] = [] | |
| sca_by_exact: Dict[Tuple[str, str], List[int]] = {} | |
| for idx in sca_remaining: | |
| sca_key = match_fields(scandium_list[idx]) | |
| sca_by_exact.setdefault(sca_key, []).append(idx) | |
| for cal_idx in cal_remaining[:]: | |
| cal_key = match_fields(calcium_list[cal_idx]) | |
| candidates = sca_by_exact.get(cal_key, []) | |
| if candidates: | |
| sca_idx = candidates.pop(0) | |
| matched.append((cal_idx, sca_idx)) | |
| cal_remaining.remove(cal_idx) | |
| sca_remaining.remove(sca_idx) | |
| sca_by_typ: Dict[str, List[int]] = {} | |
| for idx in sca_remaining: | |
| typ, _ = match_fields(scandium_list[idx]) | |
| sca_by_typ.setdefault(typ, []).append(idx) | |
| for cal_idx in cal_remaining[:]: | |
| typ, _ = match_fields(calcium_list[cal_idx]) | |
| candidates = sca_by_typ.get(typ, []) | |
| if candidates: | |
| sca_idx = candidates.pop(0) | |
| matched.append((cal_idx, sca_idx)) | |
| cal_remaining.remove(cal_idx) | |
| sca_remaining.remove(sca_idx) | |
| for cal_idx, sca_idx in matched: | |
| pairs.append((calcium_list[cal_idx], scandium_list[sca_idx], key, date_value)) | |
| for cal_idx in cal_remaining: | |
| pairs.append((calcium_list[cal_idx], None, key, date_value)) | |
| for sca_idx in sca_remaining: | |
| pairs.append((None, scandium_list[sca_idx], key, date_value)) | |
| return pairs | |
| def precompute_ai_results( | |
| pairs: List[ | |
| Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date] | |
| ], | |
| hashes: List[str], | |
| accepted_hashes: set[str], | |
| use_ai: bool, | |
| model: str, | |
| api_key: Optional[str], | |
| ) -> Dict[int, Dict[str, object]]: | |
| cache: Dict[str, Dict[str, object]] = {} | |
| results: Dict[int, Dict[str, object]] = {} | |
| to_schedule: List[Tuple[int, EventView, EventView]] = [] | |
| for idx, (cal, sca, _key, _date_value) in enumerate(pairs): | |
| accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes) | |
| if not cal or not sca: | |
| if accepted_hash: | |
| results[idx] = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| continue | |
| if accepted_hash: | |
| results[idx] = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| continue | |
| prelim = compare_events(cal, sca, False, model, api_key, cache) | |
| if not use_ai or not api_key or prelim.get("source") == "guard" or prelim.get("match") is True: | |
| results[idx] = prelim | |
| continue | |
| to_schedule.append((idx, cal, sca)) | |
| if not use_ai or not api_key or not to_schedule: | |
| return results | |
| max_workers = min(20, len(to_schedule)) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_map = { | |
| executor.submit(compare_events, cal, sca, True, model, api_key, cache): idx | |
| for (idx, cal, sca) in to_schedule | |
| } | |
| for future in tqdm(as_completed(future_map), total=len(future_map), desc="AI compare"): | |
| idx = future_map[future] | |
| results[idx] = future.result() | |
| return results | |
| def compute_compare_results_for_pairs( | |
| pairs: List[ | |
| Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date] | |
| ], | |
| hashes: List[str], | |
| accepted_hashes: set[str], | |
| use_ai: bool, | |
| model: str, | |
| api_key: Optional[str], | |
| cache: Optional[Dict[str, Dict[str, object]]] = None, | |
| ) -> List[Optional[Dict[str, object]]]: | |
| if cache is None: | |
| cache = {} | |
| results_list: List[Optional[Dict[str, object]]] = [None] * len(pairs) | |
| to_schedule: List[Tuple[int, EventView, EventView]] = [] | |
| for idx, (cal, sca, _key, _date_value) in enumerate(pairs): | |
| accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes) | |
| if not cal or not sca: | |
| if accepted_hash: | |
| results_list[idx] = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| continue | |
| if accepted_hash: | |
| results_list[idx] = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| continue | |
| prelim = compare_events(cal, sca, False, model, api_key, cache) | |
| if not use_ai or not api_key or prelim.get("source") == "guard" or prelim.get("match") is True: | |
| results_list[idx] = prelim | |
| continue | |
| to_schedule.append((idx, cal, sca)) | |
| if not use_ai or not api_key or not to_schedule: | |
| return results_list | |
| max_workers = min(20, len(to_schedule)) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_map = { | |
| executor.submit(compare_events, cal, sca, True, model, api_key, cache): idx | |
| for (idx, cal, sca) in to_schedule | |
| } | |
| for future in tqdm(as_completed(future_map), total=len(future_map), desc="AI compare"): | |
| idx = future_map[future] | |
| results_list[idx] = future.result() | |
| return results_list | |
| def side_by_side_lines( | |
| left: str, | |
| right: str, | |
| width: int, | |
| gutter: int, | |
| ) -> List[str]: | |
| left_lines = left.splitlines() or [""] | |
| right_lines = right.splitlines() or [""] | |
| rows = max(len(left_lines), len(right_lines)) | |
| output: List[str] = [] | |
| def pad_to_width(text: str) -> str: | |
| visible_len = len(ANSI_RE.sub("", text)) | |
| if visible_len >= width: | |
| return text | |
| return text + (" " * (width - visible_len)) | |
| for idx in range(rows): | |
| left_text = left_lines[idx] if idx < len(left_lines) else "" | |
| right_text = right_lines[idx] if idx < len(right_lines) else "" | |
| output.append( | |
| f"{pad_to_width(left_text)}{' ' * gutter}{pad_to_width(right_text)}" | |
| ) | |
| return output | |
| def display_pairs( | |
| dates_sorted: List[date], | |
| pairs_by_date: Dict[ | |
| date, | |
| List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]], | |
| ], | |
| hashes_by_date: Dict[date, List[str]], | |
| compare_results_by_date: Dict[date, List[Optional[Dict[str, object]]]], | |
| calcium_by_date: Dict[date, List[EventView]], | |
| scandium_by_date: Dict[date, List[EventView]], | |
| accepted_hashes: set[str], | |
| memory_path: Path, | |
| tz: ZoneInfo, | |
| use_ai: bool, | |
| model: str, | |
| api_key: Optional[str], | |
| api: BookedAPI, | |
| resource: Dict[str, object], | |
| resource_name: str, | |
| typ_filter: Optional[str], | |
| start_time: Optional[time], | |
| end_time: Optional[time], | |
| ) -> None: | |
| terminal_width = shutil.get_terminal_size((120, 20)).columns | |
| gutter = 4 | |
| column_width = max(30, (terminal_width - gutter) // 2) | |
| if not pairs_by_date: | |
| print("No events found on either side.") | |
| return | |
| def recurrence_dates_for_event( | |
| calcium_event: Dict[str, str], | |
| dates: Iterable[date], | |
| ) -> List[date]: | |
| affected: List[date] = [] | |
| for candidate in dates: | |
| occurrence = calcium_occurrence_for_date(calcium_event, candidate, tz) | |
| if not occurrence: | |
| continue | |
| if start_time and end_time: | |
| occ_start, occ_end = occurrence | |
| if occ_start.timetz().replace(tzinfo=None) != start_time: | |
| continue | |
| if occ_end.timetz().replace(tzinfo=None) != end_time: | |
| continue | |
| affected.append(candidate) | |
| return affected | |
| def refresh_scandium_for_dates(dates_to_refresh: Iterable[date]) -> None: | |
| for date_value in dates_to_refresh: | |
| scandium_events = build_scandium_events_for_date( | |
| api=api, | |
| resource_name=resource_name, | |
| tz=tz, | |
| target_date=date_value, | |
| start_time=start_time, | |
| end_time=end_time, | |
| resource=resource, | |
| ) | |
| if typ_filter: | |
| scandium_events = [ | |
| event | |
| for event in scandium_events | |
| if (event.compare_fields.get("TYP") or "").lower() == typ_filter.lower() | |
| ] | |
| scandium_by_date[date_value] = scandium_events | |
| pairs_by_date[date_value] = build_pairs( | |
| calcium_by_date.get(date_value, []), | |
| scandium_events, | |
| date_value, | |
| ) | |
| hashes = [ | |
| canonical_hash(cal, sca) | |
| for cal, sca, _key, _date in pairs_by_date[date_value] | |
| ] | |
| hashes_by_date[date_value] = hashes | |
| compare_results_by_date[date_value] = compute_compare_results_for_pairs( | |
| pairs_by_date[date_value], | |
| hashes, | |
| accepted_hashes, | |
| use_ai=use_ai, | |
| model=model, | |
| api_key=api_key, | |
| ) | |
| for date_value in dates_sorted: | |
| pairs = pairs_by_date.get(date_value, []) | |
| hashes = hashes_by_date.get(date_value, []) | |
| compare_results = compare_results_by_date.get(date_value, []) | |
| if not pairs: | |
| continue | |
| for idx, (cal, sca, key, _date_value) in enumerate(pairs): | |
| start_dt, end_dt = key | |
| compare_result = compare_results[idx] if idx < len(compare_results) else None | |
| while True: | |
| current_hash = canonical_hash(cal, sca) | |
| if idx < len(hashes): | |
| hashes[idx] = current_hash | |
| accepted_hash = resolve_accepted_hash(cal, sca, accepted_hashes) | |
| print("\n\n\n\n") | |
| start_label = start_dt.astimezone(tz).strftime("%Y-%m-%d %H:%M") | |
| end_label = end_dt.astimezone(tz).strftime("%H:%M") | |
| header = f"{start_label} -> {end_label}" | |
| print("=" * terminal_width) | |
| print(header.center(terminal_width)) | |
| print("-" * terminal_width) | |
| sca_label = "Scandium" | |
| if sca and sca.reference: | |
| sca_label = f"Scandium ({sca.reference})" | |
| sca_link = f"https://scandium.mclean.harvard.edu/reservation/?rn={sca.reference}" | |
| print(f"{''.ljust(column_width)}{' ' * gutter}{sca_link.ljust(column_width)}") | |
| print(f"{'Calcium'.ljust(column_width)}{' ' * gutter}{sca_label.ljust(column_width)}") | |
| print("-" * terminal_width) | |
| left_text = format_event_label(cal, column_width) if cal else "missing" | |
| right_text = format_event_label(sca, column_width) if sca else "missing" | |
| for line in side_by_side_lines( | |
| left_text, right_text, width=column_width, gutter=gutter | |
| ): | |
| print(line) | |
| if accepted_hash: | |
| compare_result = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| if compare_result: | |
| match_label = "MATCH" if compare_result.get("match") else "DIFF" | |
| field_matches = compare_result.get("field_matches", {}) | |
| mismatched = [ | |
| key for key, status in field_matches.items() if status == "mismatch" | |
| ] | |
| diff_line = "DIFF: none" if not mismatched else f"DIFF: {', '.join(mismatched)}" | |
| source = compare_result.get("source", "exact") | |
| notes = compare_result.get("notes", "") | |
| note_suffix = "" | |
| if source == "ai-error" and notes: | |
| note_suffix = f" | {notes[:60]}" | |
| is_match = compare_result.get("match") is True | |
| is_clean = not mismatched | |
| if source == "accepted": | |
| color = BLUE | |
| else: | |
| color = GREEN if is_match and is_clean else ORANGE | |
| compare_text = f"CHK: {match_label} | {diff_line} | {source}{note_suffix}" | |
| left_only = f"{color}{compare_text}{RESET}" | |
| for line in side_by_side_lines( | |
| left_only, | |
| "", | |
| width=column_width, | |
| gutter=gutter, | |
| ): | |
| print(line) | |
| if source == "accepted": | |
| short_hash = accepted_hash[:8] | |
| accepted_text = f"{BLUE}[ ACCEPTED - HASH {short_hash} ]{RESET}" | |
| for line in side_by_side_lines( | |
| accepted_text, | |
| "", | |
| width=column_width, | |
| gutter=gutter, | |
| ): | |
| print(line) | |
| break | |
| if compare_result and compare_result.get("match"): | |
| action = prompt_yes_no_accept( | |
| "Edit?", | |
| default=False, | |
| include_reload=True, | |
| include_clear=True, | |
| ) | |
| if action == "clear": | |
| sca = update_scandium_event( | |
| api, | |
| sca, | |
| tz, | |
| fields={"SUM"}, | |
| calcium=cal, | |
| prefill={"SUM": ""}, | |
| prompt=False, | |
| update_scope_override="this", | |
| ) | |
| if sca and cal: | |
| compare_result = compare_events(cal, sca, False, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| if action == "reload": | |
| if sca and sca.reference: | |
| new_detail = api.get_reservation(sca.reference) | |
| sca = build_scandium_event_view_from_detail(new_detail, tz) | |
| if sca and cal: | |
| compare_result = compare_events(cal, sca, use_ai, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| if action == "accept": | |
| current_hash = canonical_hash(cal, sca) | |
| if idx < len(hashes): | |
| hashes[idx] = current_hash | |
| if current_hash not in accepted_hashes: | |
| store_accepted_hash(memory_path, current_hash) | |
| accepted_hashes.add(current_hash) | |
| compare_result = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| break | |
| if action == "yes": | |
| sca = update_scandium_event(api, sca, tz, calcium=cal) | |
| if sca and cal: | |
| compare_result = compare_events(cal, sca, False, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| break | |
| if cal is None and sca is not None: | |
| api.delete_reservation(sca.reference, update_scope="this") | |
| sca = None | |
| compare_result = None | |
| if idx < len(compare_results): | |
| compare_results[idx] = None | |
| break | |
| if sca is None and cal is not None: | |
| action = prompt_yes_no_accept("Scandium missing. Create from Calcium?", default=True) | |
| if action == "accept": | |
| current_hash = canonical_hash(cal, sca) | |
| if idx < len(hashes): | |
| hashes[idx] = current_hash | |
| if current_hash not in accepted_hashes: | |
| store_accepted_hash(memory_path, current_hash) | |
| accepted_hashes.add(current_hash) | |
| compare_result = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| break | |
| if action == "yes": | |
| sca = create_scandium_from_calcium(api, resource, tz, cal.calcium_event) | |
| if sca and cal: | |
| compare_result = compare_events(cal, sca, use_ai, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| if cal.calcium_event and cal.calcium_event.get("RRULE"): | |
| future_dates = [d for d in dates_sorted if d > date_value] | |
| affected_dates = recurrence_dates_for_event(cal.calcium_event, future_dates) | |
| if affected_dates: | |
| refresh_scandium_for_dates(affected_dates) | |
| continue | |
| break | |
| if cal is not None and sca is not None: | |
| field_matches = compare_result.get("field_matches", {}) if compare_result else {} | |
| rec_mismatch = field_matches.get("REC") == "mismatch" | |
| action = prompt_yes_no_accept( | |
| "Edit Scandium fields?", | |
| default=False, | |
| include_update_rec=rec_mismatch, | |
| ) | |
| if action == "accept": | |
| current_hash = canonical_hash(cal, sca) | |
| if idx < len(hashes): | |
| hashes[idx] = current_hash | |
| if current_hash not in accepted_hashes: | |
| store_accepted_hash(memory_path, current_hash) | |
| accepted_hashes.add(current_hash) | |
| compare_result = { | |
| "match": True, | |
| "field_matches": {}, | |
| "notes": "accepted", | |
| "source": "accepted", | |
| } | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| break | |
| if action == "update_rec": | |
| cal_rrule = (cal.calcium_event or {}).get("RRULE", "") if cal else "" | |
| if cal_rrule: | |
| recurrence_override = parse_rrule( | |
| cal_rrule, | |
| cal.start.replace(tzinfo=None), | |
| ) | |
| else: | |
| recurrence_override = {"type": "none"} | |
| is_recurring = bool(sca.scandium_detail.get("isRecurring") if sca.scandium_detail else False) | |
| if is_recurring: | |
| scope = prompt_edit_scope(is_recurring, default_all=True) | |
| else: | |
| scope = "full" | |
| sca = update_scandium_event( | |
| api, | |
| sca, | |
| tz, | |
| calcium=cal, | |
| recurrence_rule_override=recurrence_override, | |
| prompt=False, | |
| update_scope_override=scope, | |
| ) | |
| if sca: | |
| compare_result = compare_events(cal, sca, False, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| if action == "yes": | |
| recurrence_override = None | |
| update_scope_override = None | |
| if cal and compare_result: | |
| if rec_mismatch: | |
| if prompt_yes_no("Update REC to match Calcium?", default=True): | |
| cal_rrule = (cal.calcium_event or {}).get("RRULE", "") | |
| if cal_rrule: | |
| recurrence_override = parse_rrule( | |
| cal_rrule, | |
| cal.start.replace(tzinfo=None), | |
| ) | |
| else: | |
| recurrence_override = {"type": "none"} | |
| is_recurring = bool(sca.scandium_detail.get("isRecurring") if sca.scandium_detail else False) | |
| if is_recurring: | |
| update_scope_override = prompt_edit_scope( | |
| is_recurring, | |
| default_all=True, | |
| ) | |
| else: | |
| update_scope_override = "full" | |
| sca = update_scandium_event( | |
| api, | |
| sca, | |
| tz, | |
| calcium=cal, | |
| recurrence_rule_override=recurrence_override, | |
| update_scope_override=update_scope_override, | |
| ) | |
| if sca: | |
| compare_result = compare_events(cal, sca, False, model, api_key, {}) | |
| if idx < len(compare_results): | |
| compare_results[idx] = compare_result | |
| continue | |
| break | |
| if compare_result and compare_result.get("match"): | |
| pass | |
| def resolve_preferences_path(events_path: Path, override: Optional[Path]) -> Optional[Path]: | |
| if override: | |
| return override | |
| candidate = events_path.with_suffix(".Preferences") | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Compare Calcium and Scandium events for a single date.", | |
| ) | |
| parser.add_argument( | |
| "dates", | |
| nargs="*", | |
| type=parse_date, | |
| help="Date(s) to compare (YYYY-MM-DD).", | |
| ) | |
| parser.add_argument( | |
| "--start-date", | |
| type=parse_date, | |
| default=None, | |
| help="Start date (YYYY-MM-DD) for inclusive range.", | |
| ) | |
| parser.add_argument( | |
| "--end-date", | |
| type=parse_date, | |
| default=None, | |
| help="End date (YYYY-MM-DD) for inclusive range.", | |
| ) | |
| parser.add_argument( | |
| "--calcium-events", | |
| default=None, | |
| help="Path to Calcium .Events file (defaults based on --resource).", | |
| ) | |
| parser.add_argument( | |
| "--preferences", | |
| type=Path, | |
| default=None, | |
| help="Optional path to Calcium .Preferences file.", | |
| ) | |
| parser.add_argument( | |
| "--resource", | |
| default=DEFAULT_RESOURCE, | |
| help="Scandium resource name.", | |
| ) | |
| parser.add_argument( | |
| "--timezone", | |
| default=DEFAULT_TZ, | |
| help="Timezone for display and matching.", | |
| ) | |
| parser.add_argument( | |
| "--typ", | |
| default=None, | |
| help="Filter by TYP (Calcium category/Scandium PI value).", | |
| ) | |
| parser.add_argument( | |
| "--start-time", | |
| type=parse_time_value, | |
| default=None, | |
| help="Filter by exact start time (HH:MM). Requires --end-time.", | |
| ) | |
| parser.add_argument( | |
| "--end-time", | |
| type=parse_time_value, | |
| default=None, | |
| help="Filter by exact end time (HH:MM). Requires --start-time.", | |
| ) | |
| parser.add_argument( | |
| "--no-ai", | |
| action="store_true", | |
| help="Disable AI matching fallback.", | |
| ) | |
| parser.add_argument( | |
| "--ai-model", | |
| default="gpt-5-nano", | |
| help="OpenAI model for AI matching.", | |
| ) | |
| args = parser.parse_args() | |
| tz = ZoneInfo(args.timezone) | |
| events_path_value = args.calcium_events | |
| if not events_path_value: | |
| events_path_value = DEFAULT_CALCIUM_EVENTS_BY_RESOURCE.get(args.resource) | |
| if not events_path_value: | |
| raise SystemExit( | |
| f"No default Calcium .Events for resource '{args.resource}'. " | |
| "Use --calcium-events to specify a file." | |
| ) | |
| events_path = Path(events_path_value) | |
| preferences_path = resolve_preferences_path(events_path, args.preferences) | |
| rsync_cmd = [ | |
| "rsync", | |
| "-rltv", | |
| "ddrucker@calendar-actual:/var/www/cgi-bin/CalciumDir40/data/", | |
| "data/", | |
| ] | |
| try: | |
| subprocess.run(rsync_cmd, check=True) | |
| except subprocess.CalledProcessError as exc: | |
| raise SystemExit(f"rsync failed: {exc}") from exc | |
| if (args.start_time and not args.end_time) or (args.end_time and not args.start_time): | |
| raise SystemExit("Both --start-time and --end-time are required together.") | |
| if (args.start_date and not args.end_date) or (args.end_date and not args.start_date): | |
| raise SystemExit("Both --start-date and --end-date are required together.") | |
| if args.start_date and args.end_date and args.dates: | |
| raise SystemExit("Use either explicit dates or --start-date/--end-date, not both.") | |
| if not args.dates and not (args.start_date and args.end_date): | |
| raise SystemExit("Provide at least one date or a --start-date/--end-date range.") | |
| if args.start_date and args.end_date: | |
| if args.end_date < args.start_date: | |
| raise SystemExit("--end-date must be on or after --start-date.") | |
| dates: List[date] = [] | |
| cursor = args.start_date | |
| while cursor <= args.end_date: | |
| dates.append(cursor) | |
| cursor += timedelta(days=1) | |
| else: | |
| dates = list(args.dates) | |
| api = BookedAPI() | |
| resource = api.find_resource_by_name(args.resource) | |
| if not resource: | |
| raise SystemExit(f"Resource not found: {args.resource}") | |
| dates_sorted = sorted(dates) | |
| calcium_by_date: Dict[date, List[EventView]] = {} | |
| scandium_by_date: Dict[date, List[EventView]] = {} | |
| pairs_by_date: Dict[ | |
| date, | |
| List[Tuple[Optional[EventView], Optional[EventView], Tuple[datetime, datetime], date]], | |
| ] = {} | |
| calcium_by_date = build_calcium_events_by_date( | |
| events_path=events_path, | |
| preferences_path=preferences_path, | |
| tz=tz, | |
| dates=dates_sorted, | |
| start_time=args.start_time, | |
| end_time=args.end_time, | |
| ) | |
| if args.typ: | |
| for date_value in dates_sorted: | |
| calcium_by_date[date_value] = [ | |
| event | |
| for event in calcium_by_date.get(date_value, []) | |
| if (event.compare_fields.get("TYP") or "").lower() == args.typ.lower() | |
| ] | |
| for date_value in tqdm(dates_sorted, desc="Reading Scandium"): | |
| scandium_events = build_scandium_events_for_date( | |
| api=api, | |
| resource_name=args.resource, | |
| tz=tz, | |
| target_date=date_value, | |
| start_time=args.start_time, | |
| end_time=args.end_time, | |
| resource=resource, | |
| ) | |
| if args.typ: | |
| scandium_events = [ | |
| event | |
| for event in scandium_events | |
| if (event.compare_fields.get("TYP") or "").lower() == args.typ.lower() | |
| ] | |
| scandium_by_date[date_value] = scandium_events | |
| for date_value in dates_sorted: | |
| pairs_by_date[date_value] = build_pairs( | |
| calcium_by_date.get(date_value, []), | |
| scandium_by_date.get(date_value, []), | |
| date_value, | |
| ) | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| memory_path = Path("sync-memory.dat") | |
| accepted_hashes = load_accepted_hashes(memory_path) | |
| hashes_by_date: Dict[date, List[str]] = {} | |
| compare_results_by_date: Dict[date, List[Optional[Dict[str, object]]]] = {} | |
| cache: Dict[str, Dict[str, object]] = {} | |
| for date_value in dates_sorted: | |
| pairs = pairs_by_date.get(date_value, []) | |
| hashes = [canonical_hash(cal, sca) for cal, sca, _key, _date in pairs] | |
| hashes_by_date[date_value] = hashes | |
| compare_results_by_date[date_value] = compute_compare_results_for_pairs( | |
| pairs, | |
| hashes=hashes, | |
| accepted_hashes=accepted_hashes, | |
| use_ai=not args.no_ai, | |
| model=args.ai_model, | |
| api_key=api_key, | |
| cache=cache, | |
| ) | |
| display_pairs( | |
| dates_sorted, | |
| pairs_by_date, | |
| hashes_by_date, | |
| compare_results_by_date, | |
| calcium_by_date, | |
| scandium_by_date, | |
| accepted_hashes, | |
| memory_path, | |
| tz, | |
| use_ai=not args.no_ai, | |
| model=args.ai_model, | |
| api_key=api_key, | |
| api=api, | |
| resource=resource, | |
| resource_name=args.resource, | |
| typ_filter=args.typ, | |
| start_time=args.start_time, | |
| end_time=args.end_time, | |
| ) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
jfc