Quellcode für tedega_storage.rdbms.storage

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
from contextlib import contextmanager
from sqlalchemy.orm import sessionmaker
from sqlalchemy.event import listen
from sqlalchemy import create_engine


@contextmanager
[Doku]def scoped_session(): Session = sessionmaker(bind=ENGINE) session = Session() try: yield session session.commit() except: session.rollback() raise finally: session.close()
def sqlite_do_connect(dbapi_connection, connection_record): # disable pysqlite's emitting of the BEGIN statement entirely. # also stops it from emitting COMMIT before any DDL. dbapi_connection.isolation_level = None def sqlite_do_begin(conn): # emit our own BEGIN conn.execute("BEGIN") return ENGINE
[Doku]class Storage(object): """Docstring for Storage. """ def __init__(self, engine=None): """TODO: to be defined1. :engine: TODO """ self.engine = engine #RDBMSStorageBase.metadata.create_all(self.engine) Session = sessionmaker(bind=self.engine) self.session = Session() def __enter__(self): return self def __exit__(self, e_type, e_value, tb): if e_type is not None: self.session.rollback() raise else: self.session.commit() self.session.close() return True def create(self, item): item_id = None self.session.begin_nested() try: self.session.add(item) self.session.commit() item_id = item.id except: self.session.rollback() raise return item_id def read(self, clazz, id=None): if id is None: return self.session.query(clazz).all() else: return self.session.query(clazz).filter(clazz.id == id).one() def update(self, item): return self.session.flush() def delete(self, item): self.session.delete(item)
DEFAUL_DB_URI = "sqlite:///:memory:" DB_URI = os.environ.get("TEDEGA_STORAGE_URI", DEFAUL_DB_URI) ENGINE = create_engine(DB_URI, echo=False) if str(ENGINE.url).find("sqlite") > -1: # SQLite doesn not support nested transactions directly. # But there is a workaround. See # http://docs.sqlalchemy.org/en/rel_1_0/dialects/sqlite.html#pysqlite-serializable # for more details. listen(ENGINE, "connect", sqlite_do_connect) listen(ENGINE, "begin", sqlite_do_begin) STORAGE = Storage