Last active
February 7, 2017 22:59
-
-
Save dillonhicks/3cee9f3f5edb79be361756ae525a24c5 to your computer and use it in GitHub Desktop.
Less Magical Flask-SQLAlchemy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import contextlib | |
| import inspect | |
| import logging | |
| import os | |
| import flask | |
| import six | |
| from sqlalchemy import create_engine | |
| from sqlalchemy.orm import scoped_session, sessionmaker | |
| # From werkzeug.local. Internally flask sqlalchemy steals this mechanism for to | |
| # | |
| # since each thread has its own greenlet we can just use those as identifiers | |
| # for the context. If greenlets are not available we fall back to the | |
| # current thread ident depending on where it is. | |
| try: | |
| from greenlet import getcurrent as get_ident | |
| except ImportError: | |
| try: | |
| from thread import get_ident | |
| except ImportError: | |
| from _thread import get_ident | |
| LOG = logging.getLogger(__name__) | |
| def Minutes(min): | |
| return min * 60 | |
| class DatabaseSessionManager(object): | |
| def __init__(self, engine, Session): | |
| self.engine = engine | |
| self.Session = Session | |
| @contextlib.contextmanager | |
| def session_context(self, existing_session=None): | |
| should_close = False | |
| if existing_session is None: | |
| session = self.Session() | |
| should_close = True | |
| else: | |
| session = existing_session | |
| try: | |
| yield session | |
| finally: | |
| if should_close: | |
| session.invalidate() | |
| self.Session.remove() | |
| def inject_session(self, view): | |
| spec = inspect.getargspec(view) | |
| if 'session' not in spec.args: | |
| raise ValueError('inject_session decorated function {} does not accept required session keyword'.format(view.__name__)) | |
| @six.wraps(view) | |
| def wrapped_view(*args, **kwargs): | |
| assert 'session' not in kwargs, 'Someone else set our session!' | |
| with self.session_context() as session: | |
| return view(*args, session=session, **kwargs) | |
| return wrapped_view | |
| @staticmethod | |
| def create(url, **kwargs): | |
| # Agressively recycle connection to prevent mysql timeout | |
| # http://docs.sqlalchemy.org/en/latest/core/engines.html#sqlalchemy.create_engine.params.pool_recycle | |
| engine = create_engine(url, pool_recycle=Minutes(15), **kwargs) | |
| Session = scoped_session(sessionmaker(bind=engine), scopefunc=get_ident) | |
| return DatabaseSessionManager(engine, Session) | |
| db_manager = DatabaseSessionManager.create(os.environ['DATABASE_URL']) | |
| app = flask.Flask(__name__) | |
| @app.route('/') | |
| @db_manager.inject_session | |
| def hello_world(session=None): | |
| result = session.execute('select count(*) from med;').first() | |
| return flask.jsonify(dict(count=result[0])) | |
| if __name__ == '__main__': | |
| logging.basicConfig(level=logging.DEBUG) | |
| logging.getLogger('sqlalchemy').setLevel(level=logging.DEBUG) # observe connection checkouts from pool | |
| app.run('0.0.0.0', 5005, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment