import contextlib
import os
import traceback
from contextvars import ContextVar
from sheraf.exceptions import ConnectionAlreadyOpened
# Isolated contexts state
global_context_connections_state = ContextVar("global_context_connections_state")
global_context_last_connection_state = ContextVar(
"global_context_last_connection_state"
)
class LocalData:
instance = None
def __init__(self, pid=None):
self.pid = pid or os.getpid()
self.databases = {}
self.last_database_context = {}
self.zodb_databases = {}
class GlobalThreadContext:
@property
def connections(self):
try:
return global_context_connections_state.get()
except LookupError:
global_context_connections_state.set([])
return global_context_connections_state.get()
@property
def last_connection_context(self):
try:
return global_context_last_connection_state.get()
except LookupError:
global_context_last_connection_state.set(None)
return global_context_last_connection_state.get()
@last_connection_context.setter
def last_connection_context(self, value):
global_context_last_connection_state.set(value)
def reset_connections_state(self):
global_context_connections_state.set([])
global_context_last_connection_state.set(None)
self.thread_context = GlobalThreadContext()
@classmethod
def get(cls):
# TODO: put a lock on pid
pid = os.getpid()
if not cls.instance or cls.instance.pid != pid:
cls.instance = LocalData(pid)
return cls.instance
# Isolated context state
database_context_connections_state = ContextVar("database_context_connections_state")
[docs]class Database:
"""A ZODB :class:`ZODB.DB` wrapper with a :class:`ZODB.interfaces.IStorage`
factory.
The storage factory will either create a
:class:`~ZEO.ClientStorage.ClientStorage`, a
:class:`~ZODB.FileStorage.FileStorage.FileStorage`, a
:class:`~ZODB.DemoStorage.DemoStorage`, a Relstorage
:class:`~relstorage.adapters.postgresql.adapter.PostgreSQLAdapter` or use a
user defined storage depending on the argument passed at the initialization
of the object.
A Storage object is created and pass it to the :class:`ZODB.DB` constructor
Several connections can be used at the same time. The connections are
identified by their name.
:param database_name: The name of the connection.
:param storage: If set, this user defined storage will be used.
:param uri: A zodburi to the database.
:type uri: An URI that will be parsed by :func:`zodburi.resolve_uri`.
:param db_args: Arguments to pass to the :class:`ZODB.DB`.
:param nestable: If `False`, will raise a
:class:`~sheraf.exceptions.ConnectionAlreadyOpened` if a connection has
already been opened.
"""
DEFAULT_DATABASE_NAME = "unnamed"
def __init__(self, uri=None, storage=None, nestable=False, db_args=None):
self.nestable = nestable
self.uri = uri
self.db = None
self.storage = None
self.db_args = db_args or {}
class DatabaseThreadContext:
@property
def connections(self):
try:
return database_context_connections_state.get()
except LookupError:
database_context_connections_state.set([])
return database_context_connections_state.get()
def reset_connections_state(self):
database_context_connections_state.set([])
self.thread_context = DatabaseThreadContext()
self.db_args["databases"] = LocalData.get().zodb_databases
self.reset(storage, uri)
stack = traceback.extract_stack()[-2]
LocalData.get().last_database_context[self.name] = (
stack.filename,
stack.lineno,
)
def __repr__(self):
description = f"<Database database_name='{self.name}'"
if self.db_args.get("read_only", False):
description += " ro"
if self.nestable:
description += " nestable"
description += ">"
return description
[docs] def reset(self, storage=None, uri=None):
"""Close and reopen a database connection."""
import zodburi
import ZODB.DB
from ZODB.DemoStorage import DemoStorage
if self.db:
self.close()
if storage is not None:
self.storage = storage
elif uri:
storage_factory, db_args = zodburi.resolve_uri(uri)
self.storage = storage_factory()
db_args.update(self.db_args)
self.db_args = db_args
else:
self.storage = DemoStorage()
self.name = self.db_args.get("database_name", Database.DEFAULT_DATABASE_NAME)
if self.name in LocalData.get().databases:
last_context = LocalData.get().last_database_context.get(self.name)
raise KeyError(
"A database named '{}' already exists. Last opening was on {} at line {}".format(
self.name, last_context[0], last_context[1]
)
if last_context
else f"A database named '{self.name}' already exists."
)
self.db = ZODB.DB(self.storage, **self.db_args)
LocalData.get().databases[self.name] = self
[docs] def connection_open(self):
"""Opens a connection. Returns a connection to this database.
If `nestable` is set and a connection has already been opened,
raises a :class:`~sheraf.exceptions.ConnectionAlreadyOpened` exception.
If `nestable` is False
and a connection has already been opened, it returns a new connection
with a new transaction_manager.
:return: A :class:`~ZODB.Connection.Connection` object.
"""
import transaction
data = LocalData.get()
# No other connection exists
if not Database.last_connection():
connection = self.db.open()
# A connection to this database exists, and the second one is not allowed.
elif not self.nestable:
message = (
"First connection was {} on {} at line {}".format(
Database.last_connection(),
*data.thread_context.last_connection_context,
)
if data.thread_context.last_connection_context
else f"First connection was {Database.last_connection()}"
)
raise ConnectionAlreadyOpened(message)
# A connection to this database exists, and the second one is allowed, but
# with a new transaction manager.
else:
connection = self.db.open(
transaction_manager=transaction.TransactionManager()
)
self.thread_context.connections.append(connection)
data.thread_context.connections.append(connection)
return connection
[docs] def connection_close(self, connection=None):
"""Closes a connection opened on the database.
:param connection: The connection to close, if `None` the last
connection opened on the database is closed.
"""
connection = connection or Database.last_connection(self)
if connection.opened:
connection.close()
if connection in LocalData.get().thread_context.connections:
LocalData.get().thread_context.connections.remove(connection)
if connection in self.thread_context.connections:
self.thread_context.connections.remove(connection)
[docs] def close(self):
"""Closes the database."""
data = LocalData.get()
for connection in list(self.thread_context.connections):
if connection and connection.opened:
connection.close()
data.thread_context.connections.remove(connection)
self.thread_context.connections.remove(connection)
if self.db:
self.db.close()
if self.name in data.databases:
del data.databases[self.name]
if self.name in data.zodb_databases:
del data.zodb_databases[self.name]
if self.name in data.last_database_context:
del data.last_database_context[self.name]
self.db = None
self.storage = None
[docs] @contextlib.contextmanager
def connection(
self, commit=False, cache_minimize=False, reuse=False, _trackeback_shift=0
):
"""A context manager opening a connection on this database.
:param commit: Whether to commit the transaction when leaving the
context manager.
:type commit: boolean
:param cache_minimize: Whether to call
:func:`ZODB.Connection.Connection.cache_minimize` when leaving the
context manager.
:type cache_minimize: boolean
:param reuse: If a connection is already opened, reuse it.
:type reuse: boolean
>>> database = sheraf.Database()
>>> with database.connection() as connection:
... sheraf.Database.current_connection() is connection
True
"""
if reuse and Database.last_connection(self):
yield Database.last_connection(self)
return
_connection = self.connection_open()
if not self.nestable:
stack = traceback.extract_stack()[-3 - _trackeback_shift]
LocalData.get().thread_context.last_connection_context = (
stack.filename,
stack.lineno,
)
try:
yield _connection
if commit:
_connection.transaction_manager.commit()
except BaseException:
if commit and _connection.transaction_manager:
_connection.transaction_manager.abort()
raise
finally:
# TODO: to be changed with try/except NoTransaction when upgrading transaction>2.0 @cedric
try:
if not commit and _connection.transaction_manager:
_connection.transaction_manager.abort()
if cache_minimize:
for conn in _connection.connections.values():
conn.cacheMinimize()
finally:
# Always close the connection even if the abort raises an OperationalError.
# An OperationalError can be raised with RelStorage for example when the
# PostgreSQL server is restaring or is in recovery mode.
self.connection_close(_connection)
@classmethod
def last_connection(cls, database=None):
if database:
return (
database.thread_context.connections[-1]
if database.thread_context.connections
else None
)
return (
LocalData.get().thread_context.connections[-1]
if LocalData.get().thread_context.connections
else None
)
@classmethod
def current_connection(cls, database_name=None):
if not Database.last_connection():
return None
if not database_name:
return Database.last_connection()
return Database.last_connection().get_connection(database_name)
@classmethod
def current_name(cls):
if Database.last_connection():
return Database.last_connection().db().database_name
return None
[docs] @classmethod
def all(cls):
"""
:return: A list containing all the existing :class:`Database` in a
tuple `(name, Database)`.
"""
return LocalData.get().databases.items()
[docs] @classmethod
def get(cls, database_name=None):
"""
:param database_name: The name of the queried database.
:return: The database object if it exists. A :class:`KeyError` is raised elsewise.
"""
database_name = database_name or Database.DEFAULT_DATABASE_NAME
try:
return LocalData.get().databases[database_name]
except KeyError:
raise KeyError(f"No database named '{database_name}'.")
[docs] @classmethod
def get_or_create(cls, **kwargs):
"""
:return: The database object if it exists. If the database does not exist, it is created with the `kwargs` arguments.
"""
try:
return Database.get(database_name=kwargs.get("database_name"))
except KeyError:
return Database(**kwargs)
[docs]@contextlib.contextmanager
def connection(database_name=None, commit=False, cache_minimize=False, reuse=False):
"""
Shortcut for :meth:`sheraf.databases.Database.connection`
:param database_name: The name of the database on which to open a connection.
If not set, the default database will be used.
:param *kwargs: See :meth:`sheraf.databases.Database.connection` arguments.
"""
database = Database.get(database_name)
with database.connection(
commit=commit,
cache_minimize=cache_minimize,
reuse=reuse,
_trackeback_shift=2,
) as conn:
yield conn