Skip to content

Commit

Permalink
Bring back changes to services.py from 0.3.1 just fix import logic
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha authored and lqhuang committed Dec 3, 2022
1 parent 030d69f commit f96657f
Showing 1 changed file with 24 additions and 34 deletions.
58 changes: 24 additions & 34 deletions mode/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from types import TracebackType
from typing import (
Any,
AsyncContextManager,
AsyncIterator,
Awaitable,
Callable,
ClassVar,
ContextManager,
Coroutine,
Dict,
Generator,
Iterable,
List,
Mapping,
Expand All @@ -40,26 +41,17 @@
from .utils.tracebacks import format_task_stack
from .utils.trees import Node
from .utils.types.trees import NodeT
from .utils.typing import AsyncContextManager

__all__ = [
"ServiceBase",
"Service",
"Diag",
"task",
"timer",
]
__all__ = ["ServiceBase", "Service", "Diag", "task", "timer", "crontab"]

ClockArg = Callable[[], float]

#: Future type: Different types of awaitables.
FutureT = Union[asyncio.Future, Generator[Any, None, Any], Awaitable]
FutureT = Union[asyncio.Future, Coroutine[Any, None, Any], Awaitable]

#: Argument type for ``Service.wait(*events)``
#: Wait can take any number of futures or events to wait for.
WaitArgT = Union[FutureT, asyncio.Event, Event]

EVENT_TYPES = (asyncio.Event, Event)
WaitArgT = Union[FutureT, Event]


class WaitResults(NamedTuple):
Expand Down Expand Up @@ -96,10 +88,10 @@ class ServiceBase(ServiceT):
# the None to logger.
logger: logging.Logger = cast(logging.Logger, None)

def __init_subclass__(self) -> None:
if self.abstract:
self.abstract = False
self._init_subclass_logger()
def __init_subclass__(cls) -> None:
if cls.abstract:
cls.abstract = False
cls._init_subclass_logger()

@classmethod
def _init_subclass_logger(cls) -> None:
Expand Down Expand Up @@ -146,11 +138,11 @@ def _repr_name(self) -> str:
@property
def loop(self) -> asyncio.AbstractEventLoop:
if self._loop is None:
self._loop = asyncio.get_event_loop()
self._loop = asyncio.get_event_loop_policy().get_event_loop()
return self._loop

@loop.setter
def loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
def loop(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop


Expand Down Expand Up @@ -231,7 +223,7 @@ class ServiceCallbacks:
When calling ``await service.start()`` this happens:
.. sourcecode:: text
.. code-block:: text
+--------------------+
| INIT (not started) |
Expand All @@ -255,7 +247,7 @@ class ServiceCallbacks:
When stopping and ``wait_for_shutdown`` is unset, this happens:
.. sourcecode:: text
.. code-block:: text
.-----------------------.
/ await service.stop() |
Expand All @@ -272,7 +264,7 @@ class ServiceCallbacks:
When stopping and ``wait_for_shutdown`` is set, the stop operation
will wait for something to set the shutdown flag ``self.set_shutdown()``:
.. sourcecode:: text
.. code-block:: text
.-----------------------.
/ await service.stop() |
Expand All @@ -293,7 +285,7 @@ class ServiceCallbacks:
When restarting the order is as follows (assuming
``wait_for_shutdown`` unset):
.. sourcecode:: text
.. code-block:: text
.-------------------------.
/ await service.restart() |
Expand Down Expand Up @@ -511,13 +503,13 @@ async def _and_transition(self: ServiceT, *args: Any, **kwargs: Any) -> Any:

return _decorate

def __init_subclass__(self) -> None:
def __init_subclass__(cls) -> None:
# Every new subclass adds @Service.task decorated methods
# to the class-local `_tasks` list.
if self.abstract:
self.abstract = False
self._init_subclass_logger()
self._init_subclass_tasks()
if cls.abstract:
cls.abstract = False
cls._init_subclass_logger()
cls._init_subclass_tasks()

@classmethod
def _init_subclass_tasks(cls) -> None:
Expand Down Expand Up @@ -640,7 +632,7 @@ def add_future(self, coro: Awaitable) -> asyncio.Future:
"""
fut = asyncio.ensure_future(self._execute_task(coro), loop=self.loop)
try:
fut.set_name(repr(coro)) # type: ignore
fut.set_name(repr(coro))
except AttributeError:
pass
fut.__wrapped__ = coro # type: ignore
Expand Down Expand Up @@ -698,13 +690,11 @@ async def join_services(self, services: Sequence[ServiceT]) -> None:
for service in reversed(services):
await service.stop()

async def sleep(
self, n: Seconds, *, loop: asyncio.AbstractEventLoop = None
) -> None:
async def sleep(self, n: Seconds) -> None:
"""Sleep for ``n`` seconds, or until service stopped."""
try:
await asyncio.wait_for(
asyncio.ensure_future(self._stopped.wait(), loop=self.loop),
self._stopped.wait(),
timeout=want_seconds(n),
)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -749,7 +739,7 @@ async def wait_first(

futures = {
coro: asyncio.ensure_future(
(coro.wait() if isinstance(coro, EVENT_TYPES) else coro),
coro if isinstance(coro, Awaitable) else coro.wait(),
loop=loop,
)
for coro in coros
Expand Down

0 comments on commit f96657f

Please sign in to comment.