This package provides an easy way to configure and use SQLAlchemy engines and sessions without depending on frameworks.
It is composed by two main components:
- A manager class for SQLAlchemy engine and session configuration
- A repository/unit-of-work pattern implementation for model retrieval and persistence
pip install sqlalchemy-bind-manager
- SQLAlchemy manager: Implementation is mostly finalised, needs testing in production.
- Repository: Implementation is mostly finalised, needs testing in production.
- Unit of work: The implementation is working but limited to repositories using the same engine. Distributed transactions across different engines are not yet supported.
The complete documentation can be found here
Initialise the manager providing an instance of SQLAlchemyConfig
from sqlalchemy_bind_manager import SQLAlchemyConfig, SQLAlchemyBindManager
config = SQLAlchemyConfig(
engine_url="sqlite:///./sqlite.db",
engine_options=dict(connect_args={"check_same_thread": False}, echo=True),
session_options=dict(expire_on_commit=False),
)
sa_manager = SQLAlchemyBindManager(config)
🚨 NOTE: Using global variables is not thread-safe, please read the Documentation if your application uses multi-threading.
The engine_url
and engine_options
dictionaries accept the same parameters as SQLAlchemy create_engine()
The session_options
dictionary accepts the same parameters as SQLALchemy sessionmaker()
The SQLAlchemyBindManager
provides some helper methods for common operations:
get_bind
: returns aSQLAlchemyBind
orSQLAlchemyAsyncBind
objectget_session
: returns aSession
object, which works also as a context managerget_mapper
: returns the mapper associated with the bind
Example:
bind = sa_manager.get_bind()
class MyModel(bind.declarative_base):
pass
# Persist an object
o = MyModel()
o.name = "John"
with sa_manager.get_session() as session:
session.add(o)
session.commit()
Imperative model declaration is also supported.
SQLAlchemyBindManager
accepts also multiple databases configuration, provided as a dictionary. The dictionary keys are used as a reference name for each bind.
from sqlalchemy_bind_manager import SQLAlchemyConfig, SQLAlchemyBindManager
config = {
"default": SQLAlchemyConfig(
engine_url="sqlite:///./sqlite.db",
engine_options=dict(connect_args={"check_same_thread": False}, echo=True),
session_options=dict(expire_on_commit=False),
),
"secondary": SQLAlchemyConfig(
engine_url="sqlite:///./secondary.db",
engine_options=dict(connect_args={"check_same_thread": False}, echo=True),
session_options=dict(expire_on_commit=False),
),
}
sa_manager = SQLAlchemyBindManager(config)
All the SQLAlchemyBindManager
helper methods accept the bind_name
optional parameter:
get_bind(bind_name="default")
: returns aSQLAlchemyBind
orSQLAlchemyAsyncBind
objectget_session(bind_name="default")
: returns aSession
orAsyncSession
object, which works also as a context managerget_mapper(bind_name="default")
: returns the mapper associated with the bind
Is it possible to supply configurations for asyncio supported engines.
config = SQLAlchemyAsyncConfig(
engine_url="postgresql+asyncpg://scott:tiger@localhost/test",
)
This will make sure we have an AsyncEngine
and an AsyncSession
are initialised, as an asynchronous context manager.
async with sa_manager.get_session() as session:
session.add(o)
await session.commit()
Note that async implementation has several differences from the sync one, make sure to check SQLAlchemy asyncio documentation
The SQLAlchemyRepository
and SQLAlchemyAsyncRepository
class can be used directly or by extending them.
from sqlalchemy_bind_manager.repository import SQLAlchemyRepository
class MyModel(declarative_base):
pass
# Direct usage
repo_instance = SQLAlchemyRepository(sqlalchemy_bind_manager.get_bind(), model_class=MyModel)
class ModelRepository(SQLAlchemyRepository[MyModel]):
_model = MyModel
def _some_custom_method_implemented(self):
...
# Extended class usage
extended_repo_instance = ModelRepository(sqlalchemy_bind_manager.get_bind())
The repository classes provides methods for common use case:
get
: Retrieve a model by primary keysave
: Persist a modelsave_many
: Persist multiple models in a single transactiondelete
: Delete a modelfind
: Search for a list of models (basically an adapter for SELECT queries)paginated_find
: Search for a list of models, with pagination supportcursor_paginated_find
: Search for a list of models, with cursor based pagination support
It is possible we need to run several operations in a single database transaction. While a single repository provide by itself an isolated session for single operations, we have to use a different approach for multiple operations.
We can use the UnitOfWork
or the AsyncUnitOfWork
class to provide a shared session to
be used for repository operations, assumed the same bind is used for all the repositories.
class MyRepo(SQLAlchemyRepository):
_model = MyModel
bind = sa_manager.get_bind()
uow = UnitOfWork(bind)
uow.register_repository("repo_a", MyRepo)
uow.register_repository("repo_b", SQLAlchemyRepository, MyOtherModel)
with uow.transaction():
uow.repository("repo_a").save(some_model)
uow.repository("repo_b").save(some_other_model)