Skip to content

Commit 1f40a26

Browse files
authored
#70 Introducing a host of listeners in the bulkhead component (#73)
* #70 Introducing a host of listeners in the bulkhead component * #70 Covered bulkhead listeners by tests
1 parent d9d73ff commit 1f40a26

File tree

5 files changed

+79
-12
lines changed

5 files changed

+79
-12
lines changed

hyx/bulkhead/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from hyx.bulkhead.api import bulkhead
2+
from hyx.bulkhead.listeners import BulkheadListener, register_bulkhead_listener
23

3-
__all__ = ("bulkhead",)
4+
__all__ = ("bulkhead", "BulkheadListener", "register_bulkhead_listener")

hyx/bulkhead/api.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import functools
22
from types import TracebackType
3-
from typing import Any, Optional, Type, cast
3+
from typing import Any, Optional, Sequence, Type, cast
44

5+
from hyx.bulkhead.listeners import BulkheadListener
56
from hyx.bulkhead.manager import BulkheadManager
7+
from hyx.common.events import EventDispatcher
68
from hyx.common.typing import FuncT
79

810

@@ -22,10 +24,20 @@ class bulkhead:
2224

2325
__slots__ = ("_manager",)
2426

25-
def __init__(self, max_concurrency: int, max_capacity: int) -> None:
27+
def __init__(
28+
self,
29+
max_concurrency: int,
30+
max_capacity: int,
31+
*,
32+
name: Optional[str] = None,
33+
listeners: Optional[Sequence[BulkheadListener]] = None,
34+
) -> None:
35+
2636
self._manager = BulkheadManager(
2737
max_concurrency=max_concurrency,
2838
max_capacity=max_capacity,
39+
name=name,
40+
event_dispatcher=EventDispatcher(listeners).as_listener,
2941
)
3042

3143
async def __aenter__(self) -> "bulkhead":

hyx/bulkhead/listeners.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from typing import TYPE_CHECKING
2+
3+
from hyx.common.events import ListenerRegistry
4+
5+
if TYPE_CHECKING:
6+
from hyx.bulkhead.manager import BulkheadManager
7+
8+
_BULKHEAD_LISTENERS: ListenerRegistry["BulkheadListener"] = ListenerRegistry()
9+
10+
11+
class BulkheadListener:
12+
async def on_bulkhead_full(self, bulkhead: "BulkheadManager") -> None:
13+
...
14+
15+
16+
def register_bulkhead_listener(listener: BulkheadListener) -> None:
17+
"""
18+
Register a listener that will listen to all fallback components in the system
19+
"""
20+
global _BULKHEAD_LISTENERS
21+
22+
_BULKHEAD_LISTENERS.register(listener)

hyx/bulkhead/manager.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import asyncio
2-
from typing import Any
2+
from typing import Any, Optional
33

44
from hyx.bulkhead.exceptions import BulkheadFull
5+
from hyx.bulkhead.listeners import BulkheadListener
56
from hyx.common.typing import FuncT
67

78

@@ -10,9 +11,15 @@ class BulkheadManager:
1011
Semaphore-based bulkhead implementation
1112
"""
1213

13-
__slots__ = ("_total_execs_limiter", "_concurrency_limiter")
14+
__slots__ = ("_total_execs_limiter", "_concurrency_limiter", "_name", "_event_dispatcher")
1415

15-
def __init__(self, max_concurrency: int, max_capacity: int) -> None:
16+
def __init__(
17+
self,
18+
max_concurrency: int,
19+
max_capacity: int,
20+
event_dispatcher: BulkheadListener,
21+
name: Optional[str] = None,
22+
) -> None:
1623
if max_concurrency <= 0:
1724
raise ValueError(f'max_concurrency should be greater than zero ("{max_concurrency}" given)')
1825

@@ -25,12 +32,17 @@ def __init__(self, max_concurrency: int, max_capacity: int) -> None:
2532
self._concurrency_limiter = asyncio.Semaphore(max_concurrency)
2633
self._total_execs_limiter = asyncio.Semaphore(max_capacity)
2734

28-
def _raise_on_exceed(self) -> None:
35+
self._name = name
36+
self._event_dispatcher = event_dispatcher
37+
38+
async def _raise_on_exceed(self) -> None:
2939
if self._total_execs_limiter.locked():
40+
await self._event_dispatcher.on_bulkhead_full(self)
41+
3042
raise BulkheadFull
3143

3244
async def acquire(self) -> None:
33-
self._raise_on_exceed()
45+
await self._raise_on_exceed()
3446

3547
await self._total_execs_limiter.acquire()
3648
await self._concurrency_limiter.acquire()
@@ -40,7 +52,7 @@ async def release(self) -> None:
4052
self._total_execs_limiter.release()
4153

4254
async def __call__(self, func: FuncT) -> Any:
43-
self._raise_on_exceed()
55+
await self._raise_on_exceed()
4456

4557
async with self._total_execs_limiter:
4658
async with self._concurrency_limiter:

tests/test_bulkhead/test_api.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,35 @@
11
import asyncio
2+
from unittest.mock import Mock
23

34
import pytest
45

5-
from hyx.bulkhead import bulkhead
6+
from hyx.bulkhead import BulkheadListener, bulkhead
67
from hyx.bulkhead.exceptions import BulkheadFull
8+
from hyx.bulkhead.manager import BulkheadManager
9+
from tests.conftest import event_manager
10+
11+
12+
class Listener(BulkheadListener):
13+
def __init__(self) -> None:
14+
self.is_full = Mock()
15+
16+
async def on_bulkhead_full(self, bulkhead: "BulkheadManager") -> None:
17+
self.is_full()
718

819

920
async def test__bulkhead__decorator() -> None:
10-
@bulkhead(max_capacity=3, max_concurrency=2)
21+
listener = Listener()
22+
23+
@bulkhead(max_capacity=3, max_concurrency=2, listeners=(listener,))
1124
async def calculations() -> float:
1225
await asyncio.sleep(0.5)
1326
return 42
1427

1528
assert await calculations() == 42
1629

30+
await event_manager.wait_for_tasks()
31+
listener.is_full.assert_not_called()
32+
1733

1834
async def test__bulkhead__context() -> None:
1935
async with bulkhead(max_capacity=3, max_concurrency=2):
@@ -22,14 +38,18 @@ async def test__bulkhead__context() -> None:
2238

2339

2440
async def test__bulkhead__capacity_exceeded() -> None:
25-
bh = bulkhead(max_capacity=2, max_concurrency=2)
41+
listener = Listener()
42+
bh = bulkhead(max_capacity=2, max_concurrency=2, listeners=(listener,))
2643

2744
with pytest.raises(BulkheadFull):
2845
async with bh:
2946
async with bh:
3047
async with bh:
3148
assert True
3249

50+
await event_manager.wait_for_tasks()
51+
listener.is_full.assert_called()
52+
3353

3454
async def test__bulkhead__capacity_exceeded_from_different_coroutines() -> None:
3555
bh = bulkhead(max_capacity=3, max_concurrency=2)

0 commit comments

Comments
 (0)