Skip to content

Instantly share code, notes, and snippets.

@DKbyo
Last active March 24, 2022 20:24
Show Gist options
  • Select an option

  • Save DKbyo/ea89f8478134ca9a038c7a6354af4a0a to your computer and use it in GitHub Desktop.

Select an option

Save DKbyo/ea89f8478134ca9a038c7a6354af4a0a to your computer and use it in GitHub Desktop.
Thread Safe Sessions on SQLAlchemy on Cloud Run
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.10-slim
# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True
# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./
# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt
# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
#ENTRYPOINT ["gunicorn", "--bind", ":8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app" ]
#!/bin/bash
expressions=("Daniel" "Sundar" "Bill" "Thomas" "Steve")
# Seed random generator
RANDOM=$$$(date +%s)
token=$(gcloud auth print-identity-token)
CLOUD_RUN_DOMAIN=
# Loop loop loop loop loop loop ...
while [ 1 ]
do
selectedexpression=${expressions[ $RANDOM % ${#expressions[@]} ]}
echo $selectedexpression
curl https://$CLOUD_RUN_DOMAIN/vote/$selectedexpression -H "Authorization: Bearer ${token}"
done
import os
from flask import Flask, jsonify
from sqlalchemy.orm import sessionmaker, scoped_session, close_all_sessions
from sqlalchemy import engine as sqlalchemy_engine, create_engine, Table, Column, Integer, String, MetaData, ForeignKey, select, func
import contextlib
import logging
import signal
import sys
metadata_obj = MetaData()
vote = Table('votes', metadata_obj,
Column('id', Integer, primary_key=True),
Column('vote', String),
)
logger = logging.getLogger(__name__)
engine = None
thread_safe_session_factory = None
def __get_engine__(): # noqa: N807
global engine
db_config = {
"echo": True,
"pool_pre_ping": True,
"pool_size": 5,
"pool_recycle": True, # ENG-6640
"connect_args": {'connect_timeout': 10},
}
engine = _create_engine( **db_config)
logger.info("Created database engine", engine_url=repr(engine.url), **db_config)
def _create_engine(**db_config):
db_user = os.environ["DB_USER"]
db_pass = os.environ["DB_PASS"]
db_name = os.environ["DB_NAME"]
db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")
instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]
pool = create_engine(
sqlalchemy_engine.url.URL.create(
drivername="mysql+pymysql",
username=db_user,
password=db_pass,
database=db_name,
query={
"unix_socket": #db_socket_dir
"{}/{}".format(
db_socket_dir, # e.g. "/cloudsql"
instance_connection_name) # i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>"
}
),
**db_config
)
return pool
def _create_session_factory():
global engine, thread_safe_session_factory
if engine is None:
raise ValueError("Initialize engine by calling init_engine before calling init_session_factory!")
if thread_safe_session_factory is None:
thread_safe_session_factory = scoped_session(sessionmaker(bind=engine))
logger.info("New scoped session factory created")
return thread_safe_session_factory
@contextlib.contextmanager
def ManagedSession():
global thread_safe_session_factory
if thread_safe_session_factory is None:
raise ValueError("Call init_session_factory before using ManagedSession!")
with thread_safe_session_factory() as session:
try:
yield session
session.commit()
session.flush()
except Exception:
session.rollback()
raise
thread_safe_session_factory.remove()
def create_app():
app = Flask(__name__)
with app.app_context():
__get_engine__()
_create_session_factory()
return app
app = create_app()
@app.route("/vote/<string:person>")
def do_vote(person):
with ManagedSession() as session:
stmt = vote.insert().values(vote=person)
session.execute(stmt)
return "OK"
@app.route("/")
def results():
votes = []
with ManagedSession() as session:
votes = session.query(
vote.c.vote,
func.count(vote.c.vote).label('votes')).group_by(vote.c.vote).all()
data = {}
for row in votes:
data[row['vote']] = row['votes']
return jsonify({
"data": data
})
@app.before_first_request
def create_tables():
with ManagedSession() as session:
session.execute(
"CREATE TABLE IF NOT EXISTS votes "
"( id SERIAL NOT NULL, "
"vote CHAR(100) NOT NULL, PRIMARY KEY (id) );"
)
def shutdown_handler(signal, frame):
logger.info("Signal received, safely shutting down.")
# This could be needed if we aren't closing the sessions properly
# close_all_sessions()
if engine:
print("Closing database")
engine.dispose()
print("Exiting process.", flush=True)
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, shutdown_handler)
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
else:
signal.signal(signal.SIGTERM, shutdown_handler)
Flask==2.0.2
gunicorn==20.1.0
sqlalchemy==1.4.32
PyMySQL==1.0.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment