Skip to content

Commit

Permalink
Bring back custom Event and fix improper usage of `_wakeup_timer_in…
Browse files Browse the repository at this point in the history
…_thread` (#39)


* fix import logic again to address #38

* fix thread test import Event

* fix thread test import Event

* commit potential fix?

* replace asyncio.sleep with self.sleep

* readd get_event_loop_policy()

* readd loop kwargs until we figure out what to do

* fix server test import

* fix erroneous loops because im illiterate

* Add changes by @lqhuang
  • Loading branch information
wbarnha committed Dec 16, 2022
1 parent 5ef6c48 commit cd40e6f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 18 deletions.
10 changes: 5 additions & 5 deletions mode/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from types import TracebackType
from typing import (
Any,
AsyncContextManager,
AsyncIterator,
Awaitable,
Callable,
Expand Down Expand Up @@ -41,6 +40,7 @@
from .utils.tracebacks import format_task_stack
from .utils.trees import Node
from .utils.types.trees import NodeT
from .utils.typing import AsyncContextManager

__all__ = ["ServiceBase", "Service", "Diag", "task", "timer", "crontab"]

Expand Down Expand Up @@ -567,16 +567,16 @@ def __init__(
super().__init__(loop=self._loop)

def _new_started_event(self) -> Event:
return Event()
return Event(loop=self.loop)

def _new_stopped_event(self) -> Event:
return Event()
return Event(loop=self.loop)

def _new_shutdown_event(self) -> Event:
return Event()
return Event(loop=self.loop)

def _new_crashed_event(self) -> Event:
return Event()
return Event(loop=self.loop)

async def transition_with(
self, flag: str, fut: Awaitable, *args: Any, **kwargs: Any
Expand Down
17 changes: 9 additions & 8 deletions mode/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import sys
import threading
import traceback
from asyncio.locks import Event
from time import monotonic
from typing import (
Any,
Expand All @@ -26,6 +25,7 @@

from .services import Service
from .utils.futures import maybe_async, maybe_set_exception, maybe_set_result, notify
from .utils.locks import Event

__all__ = [
"QueuedMethod",
Expand Down Expand Up @@ -104,16 +104,18 @@ def __init__(
**kwargs: Any,
) -> None:
# cannot share loop between threads, so create a new one
assert asyncio.get_event_loop_policy().get_event_loop()
if executor is not None:
raise NotImplementedError("executor argument no longer supported")
self.parent_loop = loop or asyncio.get_event_loop_policy().get_event_loop()
self.thread_loop = (
thread_loop or asyncio.get_event_loop_policy().new_event_loop()
)
self._thread_started = Event()
self._thread_started = Event(loop=self.parent_loop)
if Worker is not None:
self.Worker = Worker
super().__init__(loop=self.thread_loop, **kwargs)
assert self._shutdown.loop is self.parent_loop

async def on_thread_started(self) -> None:
...
Expand Down Expand Up @@ -148,7 +150,7 @@ async def on_thread_stop(self) -> None:
# thread calls _shutdown.set(), parent calls _shutdown.wait()

def _new_shutdown_event(self) -> Event:
return Event()
return Event(loop=self.parent_loop)

async def maybe_start(self) -> bool:
if not self._thread_started.is_set():
Expand Down Expand Up @@ -179,13 +181,12 @@ async def start(self) -> None:

async def _keepalive2(self) -> None:
while not self.should_stop:
await self.sleep(1.1)
await self.sleep(2.0)
if self.last_wakeup_at:
if monotonic() - self.last_wakeup_at > 3.0:
self.log.error("Thread keepalive is not responding...")
asyncio.run_coroutine_threadsafe(
self._wakeup_timer_in_thread(), self.thread_loop
)
await asyncio.sleep(0.0) # for unittest to invoke `call_soon`
await self._wakeup_timer_in_thread()

async def _wakeup_timer_in_thread(self) -> None:
self.last_wakeup_at = monotonic()
Expand Down Expand Up @@ -323,7 +324,7 @@ class MethodQueue(Service):
def __init__(self, num_workers: int = 2, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._queue = asyncio.Queue()
self._queue_ready = Event()
self._queue_ready = Event(loop=self.loop)
self.num_workers = num_workers
self._workers = []

Expand Down
6 changes: 3 additions & 3 deletions mode/utils/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import asyncio
import math
import typing
from asyncio.locks import Event
from collections import deque
from typing import Any, Callable, List, Set, TypeVar, cast, no_type_check
from weakref import WeakSet

from .locks import Event
from .objects import cached_property
from .typing import Deque

Expand Down Expand Up @@ -60,8 +60,8 @@ def __init__(
loop: asyncio.AbstractEventLoop = None
) -> None:
self.loop = loop
self._resume = Event()
self._suspend = Event()
self._resume = Event(loop=self.loop)
self._suspend = Event(loop=self.loop)
if initially_suspended:
self._suspend.set()
self._queues = WeakSet()
Expand Down
2 changes: 1 addition & 1 deletion t/functional/test_service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
import logging
from asyncio.locks import Event
from typing import AsyncContextManager, ContextManager
from unittest.mock import Mock

import pytest

import mode
from mode.utils.locks import Event


class X(mode.Service):
Expand Down
19 changes: 18 additions & 1 deletion t/unit/test_threads.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import sys
import threading
from asyncio.locks import Event
from unittest.mock import ANY, Mock, patch

if sys.version_info < (3, 8):
Expand All @@ -13,6 +12,7 @@

from mode.threads import MethodQueue, QueueServiceThread, ServiceThread, WorkerThread
from mode.utils.futures import done_future
from mode.utils.locks import Event


class test_WorkerThread:
Expand Down Expand Up @@ -263,6 +263,23 @@ async def test_crash(self, *, thread):
await asyncio.sleep(0.1) # wait for call_soon_threadsafe
thread._thread_running.set_exception.assert_called_with(exc)

@pytest.mark.asyncio
async def test__wakeup_timer_in_thread(self, *, thread, event_loop):
thread.add_future = Mock(name="thread.add_future")
thread._wakeup_timer_in_thread = AsyncMock()
thread._stopped.is_set = Mock(return_value=False)
thread._crashed.is_set = Mock(return_value=False)
thread.sleep = AsyncMock()

def cb():
thread._stopped.is_set.return_value = True
assert thread.should_stop

event_loop.call_soon(cb)
await thread._keepalive2()

thread._wakeup_timer_in_thread.assert_awaited()


class test_MethodQueue:
@pytest.mark.asyncio
Expand Down

0 comments on commit cd40e6f

Please sign in to comment.