Created
October 7, 2025 19:41
-
-
Save erik4github/48c5322c5bcce2d6e6c925182e9a4999 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import psycopg2 | |
| from psycopg2 import OperationalError | |
| from psycopg2.extras import RealDictCursor | |
| CONNECTION = None | |
| def _new_connection(): | |
| cfg = get_db_secrets() | |
| return psycopg2.connect( | |
| host=cfg.host, | |
| database=cfg.database, | |
| user=cfg.user, | |
| password=cfg.password, | |
| port=cfg.port, | |
| connect_timeout=5, | |
| # TCP keepalives help with freeze/thaw + idle disconnects: | |
| keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5, | |
| ) | |
| def connect_to_db(): | |
| global CONNECTION | |
| try: | |
| if CONNECTION is None or CONNECTION.closed != 0: | |
| CONNECTION = _new_connection() | |
| return CONNECTION | |
| except Exception: | |
| logger.exception("error connecting to database") | |
| return None | |
| def handler(event, context): | |
| conn = connect_to_db() | |
| if conn is None: | |
| raise RuntimeError("DB unavailable") | |
| # Defensive: if a prior invocation left the tx aborted, clear it. | |
| try: | |
| conn.rollback() | |
| except Exception: | |
| pass | |
| try: | |
| with conn: # <- per-invocation transaction; commits or rollbacks automatically | |
| with conn.cursor(cursor_factory=RealDictCursor) as cur: | |
| cur.execute( | |
| "INSERT INTO table1 (foo, bar, baz) VALUES (%s,%s,%s)", | |
| (event["foo"], event["bar"], event["baz"]), | |
| ) | |
| state_flag = fetch_state_flag(conn, event["state_code"]) | |
| return {"state_flag": state_flag} | |
| except OperationalError: | |
| # Common case: connection died while frozen; recreate once and retry this invocation. | |
| global CONNECTION | |
| CONNECTION = None | |
| conn = connect_to_db() | |
| if conn is None: # still bad | |
| raise | |
| conn.rollback() | |
| with conn: | |
| with conn.cursor(cursor_factory=RealDictCursor) as cur: | |
| cur.execute( | |
| "INSERT INTO table1 (foo, bar, baz) VALUES (%s,%s,%s)", | |
| (event["foo"], event["bar"], event["baz"]), | |
| ) | |
| return {"state_flag": fetch_state_flag(conn, event["state_code"])} | |
| def fetch_state_flag(conn, state_code): | |
| with conn.cursor() as cur: | |
| cur.execute(""" | |
| SELECT state_flag | |
| FROM states | |
| WHERE state_code = %s | |
| """, (state_code,)) | |
| row = cur.fetchone() | |
| return row[0] if row else None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment