Skip to content

Instantly share code, notes, and snippets.

@erik4github
Created October 7, 2025 19:41
Show Gist options
  • Select an option

  • Save erik4github/48c5322c5bcce2d6e6c925182e9a4999 to your computer and use it in GitHub Desktop.

Select an option

Save erik4github/48c5322c5bcce2d6e6c925182e9a4999 to your computer and use it in GitHub Desktop.
import psycopg2
from psycopg2 import OperationalError
from psycopg2.extras import RealDictCursor
CONNECTION = None
def _new_connection():
cfg = get_db_secrets()
return psycopg2.connect(
host=cfg.host,
database=cfg.database,
user=cfg.user,
password=cfg.password,
port=cfg.port,
connect_timeout=5,
# TCP keepalives help with freeze/thaw + idle disconnects:
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5,
)
def connect_to_db():
global CONNECTION
try:
if CONNECTION is None or CONNECTION.closed != 0:
CONNECTION = _new_connection()
return CONNECTION
except Exception:
logger.exception("error connecting to database")
return None
def handler(event, context):
conn = connect_to_db()
if conn is None:
raise RuntimeError("DB unavailable")
# Defensive: if a prior invocation left the tx aborted, clear it.
try:
conn.rollback()
except Exception:
pass
try:
with conn: # <- per-invocation transaction; commits or rollbacks automatically
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"INSERT INTO table1 (foo, bar, baz) VALUES (%s,%s,%s)",
(event["foo"], event["bar"], event["baz"]),
)
state_flag = fetch_state_flag(conn, event["state_code"])
return {"state_flag": state_flag}
except OperationalError:
# Common case: connection died while frozen; recreate once and retry this invocation.
global CONNECTION
CONNECTION = None
conn = connect_to_db()
if conn is None: # still bad
raise
conn.rollback()
with conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"INSERT INTO table1 (foo, bar, baz) VALUES (%s,%s,%s)",
(event["foo"], event["bar"], event["baz"]),
)
return {"state_flag": fetch_state_flag(conn, event["state_code"])}
def fetch_state_flag(conn, state_code):
with conn.cursor() as cur:
cur.execute("""
SELECT state_flag
FROM states
WHERE state_code = %s
""", (state_code,))
row = cur.fetchone()
return row[0] if row else None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment