Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = mayim
version = 1.2.0
version = 1.3.0
description = The NOT ORM hydrator
long_description = file: README.md
long_description_content_type = text/markdown
Expand Down
33 changes: 20 additions & 13 deletions src/mayim/base/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from abc import ABC, abstractmethod
from collections import namedtuple
from contextvars import ContextVar
from typing import Any, Optional, Set, Type
from urllib.parse import urlparse

Expand Down Expand Up @@ -62,8 +61,10 @@ def __init__(
password (str, optional): DB password
db (int, optional): DB db. Defaults to 1
query (str, optional): DB query parameters. Defaults to None
min_size (int, optional): Minimum number of connections in pool. Defaults to 1
max_size (int, optional): Maximum number of connections in pool. Defaults to None
min_size (int, optional): Minimum number of connections in pool.
Defaults to 1
max_size (int, optional): Maximum number of connections in pool.
Defaults to None
"""

if dsn and host:
Expand Down Expand Up @@ -99,13 +100,8 @@ def __init__(
self._min_size = min_size
self._max_size = max_size
self._full_dsn: Optional[str] = None
self._connection: ContextVar[Any] = ContextVar(
"connection", default=None
)
self._transaction: ContextVar[bool] = ContextVar(
"transaction", default=False
)
self._commit: ContextVar[bool] = ContextVar("commit", default=True)
# Transaction connection (set by transaction coordinator)
self._transaction_connection: Optional[Any] = None

self._populate_connection_args()
self._populate_dsn()
Expand Down Expand Up @@ -199,10 +195,21 @@ def max_size(self):
return self._max_size

def existing_connection(self):
return self._connection.get()
"""Get existing connection (transaction connection if available)"""
return self._transaction_connection

def in_transaction(self) -> bool:
return self._transaction.get()
"""Check if in transaction"""
return self._transaction_connection is not None

def do_commit(self) -> bool:
return self._commit.get()
"""Check if should commit (always True for simplified system)"""
return True

def _set_transaction_connection(self, connection) -> None:
"""Set transaction connection (used by transaction coordinator)"""
self._transaction_connection = connection

def _clear_transaction_connection(self) -> None:
"""Clear transaction connection"""
self._transaction_connection = None
9 changes: 6 additions & 3 deletions src/mayim/exception.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
class MayimError(Exception): ...
class MayimError(Exception):
pass


class RecordNotFound(MayimError): ...
class RecordNotFound(MayimError):
pass


class MissingSQL(MayimError): ...
class MissingSQL(MayimError):
pass
3 changes: 2 additions & 1 deletion src/mayim/extension/quart_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
Quart = type("Quart", (), {}) # type: ignore


class Default: ...
class Default:
pass


_default = Default()
Expand Down
3 changes: 2 additions & 1 deletion src/mayim/extension/starlette_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
Starlette = type("Starlette", (), {}) # type: ignore


class Default: ...
class Default:
pass


_default = Default()
Expand Down
95 changes: 76 additions & 19 deletions src/mayim/mayim.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from asyncio import get_running_loop
from inspect import isclass
from typing import Optional, Sequence, Type, TypeVar, Union
from typing import Literal, Optional, Sequence, Type, TypeVar, Union
from urllib.parse import urlparse

from mayim.base import Executor, Hydrator
Expand All @@ -11,6 +11,7 @@
from mayim.sql.executor import SQLExecutor
from mayim.sql.postgres.interface import PostgresPool
from mayim.transaction import TransactionCoordinator
from mayim.transaction.interfaces import IsolationLevel

T = TypeVar("T", bound=Executor)
DEFAULT_INTERFACE = PostgresPool
Expand Down Expand Up @@ -252,39 +253,90 @@ def transaction(
*executors: Union[SQLExecutor, Type[SQLExecutor]],
use_2pc: bool = False,
timeout: Optional[float] = None,
isolation_level: Union[
Literal[
"READ UNCOMMITTED",
"READ COMMITTED",
"REPEATABLE READ",
"SERIALIZABLE",
],
IsolationLevel,
str,
] = IsolationLevel.READ_COMMITTED,
):
"""
Create a transaction across multiple executors.

Can be used as either:
1. Old style context manager: async with Mayim.transaction(exec1, exec2):
2. New style: txn = await Mayim.transaction(exec1, exec2); await txn.begin()
3. New style context: async with await Mayim.transaction(exec1, exec2) as txn:
async with Mayim.transaction(exec1, exec2):
or:
txn = await Mayim.transaction(exec1, exec2); await txn.begin()
or:
async with await Mayim.transaction(exec1, exec2) as txn:

Args:
executors: Executor classes or instances to include in transaction.
If not provided, includes all registered SQL executors.
use_2pc: Whether to use two-phase commit protocol if available.
timeout: Maximum duration in seconds before transaction is automatically rolled back.
timeout: Maximum duration in seconds before transaction
is automatically rolled back.
isolation_level: SQL isolation level.
Can be a string like "SERIALIZABLE" or IsolationLevel enum.

Returns:
_TransactionWrapper that provides backward compatibility
"""
return _TransactionWrapper(cls, executors, use_2pc, timeout)
return _TransactionWrapper(
cls, executors, use_2pc, timeout, isolation_level
)


class _TransactionWrapper:
"""
Wrapper to provide backward compatibility for Mayim.transaction().
Can be used both as an awaitable (new style) and as async context manager (old style).
Can be used both as an awaitable (new style)
and as async context manager (old style).
"""

def __init__(self, mayim_cls, executors, use_2pc=False, timeout=None):
def __init__(
self,
mayim_cls,
executors,
use_2pc=False,
timeout=None,
isolation_level=IsolationLevel.READ_COMMITTED,
):
self._mayim_cls = mayim_cls
self._executors = executors
self._coordinator = None
self._use_2pc = use_2pc
self._timeout = timeout
self._isolation_level = self._normalize_isolation_level(
isolation_level
)

def _normalize_isolation_level(self, isolation_level):
"""Convert string isolation levels to IsolationLevel enum"""
# Check if it's already an IsolationLevel enum
if isinstance(isolation_level, IsolationLevel):
return isolation_level

if isinstance(isolation_level, str):
isolation_upper = isolation_level.upper()
try:
return IsolationLevel[isolation_upper.replace(" ", "_")]
except KeyError:
# If no exact match, raise an error
valid_levels = [level.value for level in IsolationLevel]
raise MayimError(
f"Invalid isolation level '{isolation_level}'. "
f"Valid levels: {valid_levels}"
)

raise MayimError(
f"isolation_level must be str or IsolationLevel, "
f"got {type(isolation_level)}"
)

def __await__(self):
"""Support: txn = await Mayim.transaction(...)"""
Expand All @@ -294,10 +346,8 @@ async def _create():

return _create().__await__()

async def _create_coordinator(self) -> TransactionCoordinator:
"""Create the actual TransactionCoordinator"""
async def _create_coordinator(self):
if not self._executors:
# Default to all registered SQL executors
executors = tuple(
executor
for executor in Registry().values()
Expand All @@ -309,7 +359,6 @@ async def _create_coordinator(self) -> TransactionCoordinator:
else:
executors = self._executors

# Convert classes to instances and validate
resolved_executors = []
for maybe_executor in executors:
if maybe_executor is None:
Expand All @@ -319,7 +368,8 @@ async def _create_coordinator(self) -> TransactionCoordinator:
# First check if it's a SQL executor class
if not issubclass(maybe_executor, SQLExecutor):
raise MayimError(
f"All executors must be SQL executors, got {maybe_executor}"
f"All executors must be SQL executors, "
f"got {maybe_executor}"
)
try:
executor = self._mayim_cls.get(maybe_executor)
Expand All @@ -332,26 +382,33 @@ async def _create_coordinator(self) -> TransactionCoordinator:
# Validate it's a SQL executor instance
if not isinstance(executor, SQLExecutor):
raise MayimError(
f"All executors must be SQL executors, got {type(executor)}"
f"All executors must be SQL executors, "
f"got {type(executor)}"
)
# For instances, check if they're registered by checking if we can get the class
# For instances, check if they're registered by checking
# if we can get the class
try:
registered_instance = self._mayim_cls.get(
executor.__class__
)
# If the registered instance is different, the passed instance is not registered
# If the registered instance is different, the passed
# instance is not registered
if registered_instance is not executor:
raise MayimError(f"Executor {executor} not registered")
except MayimError:
raise MayimError(f"Executor {executor} not registered")

resolved_executors.append(executor)

# Return the transaction coordinator
return TransactionCoordinator(
resolved_executors, use_2pc=self._use_2pc, timeout=self._timeout
coordinator = TransactionCoordinator(
executors=resolved_executors,
use_2pc=self._use_2pc,
timeout=self._timeout,
isolation_level=self._isolation_level,
)

return coordinator

async def __aenter__(self):
"""Support old style: async with Mayim.transaction(...)"""
self._coordinator = await self._create_coordinator()
Expand Down
5 changes: 3 additions & 2 deletions src/mayim/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ def reset(cls):

class PoolRegistry:
"""
Registry to ensure executors with the same DSN share the same pool instance.
This prevents duplicate connections and enables proper transaction coordination.
Registry to ensure executors with the same DSN share the same pool
instance. This prevents duplicate connections and enables proper
transaction coordination.
"""

_singleton = None
Expand Down
Loading