Last active
March 24, 2022 20:24
-
-
Save DKbyo/ea89f8478134ca9a038c7a6354af4a0a to your computer and use it in GitHub Desktop.
Thread Safe Sessions on SQLAlchemy on Cloud Run
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Use the official lightweight Python image. | |
| # https://hub.docker.com/_/python | |
| FROM python:3.10-slim | |
| # Allow statements and log messages to immediately appear in the Knative logs | |
| ENV PYTHONUNBUFFERED True | |
| # Copy local code to the container image. | |
| ENV APP_HOME /app | |
| WORKDIR $APP_HOME | |
| COPY . ./ | |
| # Install production dependencies. | |
| RUN pip install --no-cache-dir -r requirements.txt | |
| # Run the web service on container startup. Here we use the gunicorn | |
| # webserver, with one worker process and 8 threads. | |
| # For environments with multiple CPU cores, increase the number of workers | |
| # to be equal to the cores available. | |
| # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling. | |
| CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app | |
| #ENTRYPOINT ["gunicorn", "--bind", ":8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app" ] | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| expressions=("Daniel" "Sundar" "Bill" "Thomas" "Steve") | |
| # Seed random generator | |
| RANDOM=$$$(date +%s) | |
| token=$(gcloud auth print-identity-token) | |
| CLOUD_RUN_DOMAIN= | |
| # Loop loop loop loop loop loop ... | |
| while [ 1 ] | |
| do | |
| selectedexpression=${expressions[ $RANDOM % ${#expressions[@]} ]} | |
| echo $selectedexpression | |
| curl https://$CLOUD_RUN_DOMAIN/vote/$selectedexpression -H "Authorization: Bearer ${token}" | |
| done |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from flask import Flask, jsonify | |
| from sqlalchemy.orm import sessionmaker, scoped_session, close_all_sessions | |
| from sqlalchemy import engine as sqlalchemy_engine, create_engine, Table, Column, Integer, String, MetaData, ForeignKey, select, func | |
| import contextlib | |
| import logging | |
| import signal | |
| import sys | |
| metadata_obj = MetaData() | |
| vote = Table('votes', metadata_obj, | |
| Column('id', Integer, primary_key=True), | |
| Column('vote', String), | |
| ) | |
| logger = logging.getLogger(__name__) | |
| engine = None | |
| thread_safe_session_factory = None | |
| def __get_engine__(): # noqa: N807 | |
| global engine | |
| db_config = { | |
| "echo": True, | |
| "pool_pre_ping": True, | |
| "pool_size": 5, | |
| "pool_recycle": True, # ENG-6640 | |
| "connect_args": {'connect_timeout': 10}, | |
| } | |
| engine = _create_engine( **db_config) | |
| logger.info("Created database engine", engine_url=repr(engine.url), **db_config) | |
| def _create_engine(**db_config): | |
| db_user = os.environ["DB_USER"] | |
| db_pass = os.environ["DB_PASS"] | |
| db_name = os.environ["DB_NAME"] | |
| db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql") | |
| instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"] | |
| pool = create_engine( | |
| sqlalchemy_engine.url.URL.create( | |
| drivername="mysql+pymysql", | |
| username=db_user, | |
| password=db_pass, | |
| database=db_name, | |
| query={ | |
| "unix_socket": #db_socket_dir | |
| "{}/{}".format( | |
| db_socket_dir, # e.g. "/cloudsql" | |
| instance_connection_name) # i.e "<PROJECT-NAME>:<INSTANCE-REGION>:<INSTANCE-NAME>" | |
| } | |
| ), | |
| **db_config | |
| ) | |
| return pool | |
| def _create_session_factory(): | |
| global engine, thread_safe_session_factory | |
| if engine is None: | |
| raise ValueError("Initialize engine by calling init_engine before calling init_session_factory!") | |
| if thread_safe_session_factory is None: | |
| thread_safe_session_factory = scoped_session(sessionmaker(bind=engine)) | |
| logger.info("New scoped session factory created") | |
| return thread_safe_session_factory | |
| @contextlib.contextmanager | |
| def ManagedSession(): | |
| global thread_safe_session_factory | |
| if thread_safe_session_factory is None: | |
| raise ValueError("Call init_session_factory before using ManagedSession!") | |
| with thread_safe_session_factory() as session: | |
| try: | |
| yield session | |
| session.commit() | |
| session.flush() | |
| except Exception: | |
| session.rollback() | |
| raise | |
| thread_safe_session_factory.remove() | |
| def create_app(): | |
| app = Flask(__name__) | |
| with app.app_context(): | |
| __get_engine__() | |
| _create_session_factory() | |
| return app | |
| app = create_app() | |
| @app.route("/vote/<string:person>") | |
| def do_vote(person): | |
| with ManagedSession() as session: | |
| stmt = vote.insert().values(vote=person) | |
| session.execute(stmt) | |
| return "OK" | |
| @app.route("/") | |
| def results(): | |
| votes = [] | |
| with ManagedSession() as session: | |
| votes = session.query( | |
| vote.c.vote, | |
| func.count(vote.c.vote).label('votes')).group_by(vote.c.vote).all() | |
| data = {} | |
| for row in votes: | |
| data[row['vote']] = row['votes'] | |
| return jsonify({ | |
| "data": data | |
| }) | |
| @app.before_first_request | |
| def create_tables(): | |
| with ManagedSession() as session: | |
| session.execute( | |
| "CREATE TABLE IF NOT EXISTS votes " | |
| "( id SERIAL NOT NULL, " | |
| "vote CHAR(100) NOT NULL, PRIMARY KEY (id) );" | |
| ) | |
| def shutdown_handler(signal, frame): | |
| logger.info("Signal received, safely shutting down.") | |
| # This could be needed if we aren't closing the sessions properly | |
| # close_all_sessions() | |
| if engine: | |
| print("Closing database") | |
| engine.dispose() | |
| print("Exiting process.", flush=True) | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| signal.signal(signal.SIGINT, shutdown_handler) | |
| app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080))) | |
| else: | |
| signal.signal(signal.SIGTERM, shutdown_handler) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Flask==2.0.2 | |
| gunicorn==20.1.0 | |
| sqlalchemy==1.4.32 | |
| PyMySQL==1.0.2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment