Source code for dbapix.connection
import abc
import six
[docs]@six.add_metaclass(abc.ABCMeta)
class Connection(object):
"""A normalized connection to a database.
See `the DB-API 2.0 specs <https://www.python.org/dev/peps/pep-0249/#connection-objects>`_
for the minimal methods provided there.
"""
def __init__(self, engine, con):
self._engine = engine
self.wrapped = con
# For tracking state around `begin()`.
self._autocommit = None
# The next 3 are here mostly for the docs.
@property
def closed(self):
"""Has this connection been closed?"""
return self.wrapped.closed
[docs] def fileno(self):
return self.wrapped.fileno()
[docs] def close(self):
"""Close the connection immediately.
This prevents the engine from reusing connections, and so is discouraged
unless you are sure the connection should not be reused.
"""
self.wrapped.close()
[docs] def reset_session(self, autocommit=False):
"""Reset the connection to an initial clean state.
This is automatically called when connections are retreived from an engine,
and is designed so that every connection feels like a new one, even
though they are reused.
"""
self.autocommit = autocommit
def _should_put_close(self):
pass
def _get_nonidle_status(self):
pass
[docs] def __getattr__(self, key):
"""Attributes that are not provided by dbapix are passed through to the wrapped connection."""
return getattr(self.wrapped, key)
[docs] def cursor(self):
"""Get a :class:`.Cursor` for this connection.
.. testcode::
cur = con.cursor()
cur.execute('SELECT 1')
assert next(cur)[0] == 1
"""
raw_cur = self.wrapped.cursor()
return self._engine.cursor_class(self._engine, raw_cur)
@abc.abstractmethod
def _can_disable_autocommit(self):
pass
@property
def autocommit(self):
"""Is every query executed in an implicit transaction that is auto committed?
Defaults to ``True`` on new connections.
"""
return self.wrapped.autocommit
@autocommit.setter
def autocommit(self, value):
self.wrapped.autocommit = value
def __enter__(self):
# DB-API2 Connection.__enter__ doesn't actually do anything, and
# replies on the implicit start of transactions.
# Assert that we're not autocommitting.
self._begin()
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type is None:
self.commit()
else:
self.rollback()
[docs] def begin(self):
"""Assert that we are or will be in a transaction.
:return: A context manager that will automatically
rollback or commit the current transaction.
If it can be avoided, this does not actually start a transaction,
but instead asserts that the connection is not in autocommit mode.
If autocommit cannot be disabled, then an explicit ``BEGIN``
will be executed.
:meth:`commit` and :meth:`rollback` restore autocommit to
it's value when :meth:`begin` was called.
"""
self._begin()
return TransactionContext(self)
def _begin(self):
# Optimize to use the included methods if we can. We will drop back
# into autocommit mode later.
if self.autocommit and self._can_disable_autocommit():
self._autocommit = True
self.autocommit = False
# If we can't drop autocommit, then issue an explicit BEGIN.
if self.autocommit:
self.execute('BEGIN')
def _end(self):
if self._autocommit is not None:
self.autocommit = self._autocommit
self._autocommit = None
[docs] def commit(self):
"""Commit changes made since the transaction started."""
if self.autocommit:
raise RuntimeError("Connection is in autocommit mode.")
self.wrapped.commit()
self._end()
[docs] def rollback(self):
"""Rollback changes made since the transaction started."""
if self.autocommit:
raise RuntimeError("Connection is in autocommit mode.")
self.wrapped.rollback()
self._end()
[docs] def execute(self, query, params=None):
"""Create a cursor, and execute a query on it in one step.
:return: The created :class:`.Cursor`.
.. seealso:: :meth:`.Cursor.execute` for parameters and examples.
"""
# No cursor context here since it needs to be read.
cur = self.cursor()
cur.execute(query, params, 1)
return cur
[docs] def select(self, *args, **kwargs):
"""Pythonic wrapper for selecting.
.. seealso:: :meth:`.Cursor.select` for parameters and examples.
"""
kwargs['_stack_depth'] = 1
# No cursor context here since it needs to be read.
cur = self.cursor()
return cur.select(*args, **kwargs)
[docs] def insert(self, *args, **kwargs):
"""Pythonic wrapper for inserting.
.. seealso:: :meth:`.Cursor.insert` for parameters and examples.
"""
with self.cursor() as cur:
return cur.insert(*args, **kwargs)
[docs] def update(self, *args, **kwargs):
"""Pythonic wrapper for updating.
.. seealso:: :meth:`.Cursor.update` for parameters and examples.
"""
kwargs['_stack_depth'] = 1
with self.cursor() as cur:
return cur.update(*args, **kwargs)
class TransactionContext(object):
def __init__(self, con):
self._con = con
def __enter__(self):
return self._con
def __exit__(self, exc_type=None, *args):
if exc_type:
self._con.rollback()
else:
self._con.commit()
def __getattr__(self, key):
return getattr(self._con, key)