-
Notifications
You must be signed in to change notification settings - Fork 560
Update FakeChannel.await_result(...) to drive async Rust (Tokio runtime/thread pool)
#19879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 8 commits
f5710de
bb9e266
0fb3722
c10f8cb
0281dd2
842fd59
3565a7f
7c19a47
622ec3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Update `HomeserverTestCase.get_success(...)` and friends to drive async Rust (Tokio runtime/thread pool). | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -566,7 +566,9 @@ def test_wait_for_sync_token(self) -> None: | |
| # timeout | ||
| with self.assertRaises(TimedOutException): | ||
| channel.await_result(timeout_ms=9900) | ||
| channel.await_result(timeout_ms=200) | ||
| # `notifier.wait_for_stream_token(from_token)` only checks every 500ms so we | ||
| # need to match that in order to make sure we hit the wake-up for sure. | ||
| channel.await_result(timeout_ms=500) | ||
|
Comment on lines
+569
to
+571
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, this worked because Now I've updated the second wait to be 500ms to match |
||
| self.assertEqual(channel.code, 200, channel.json_body) | ||
|
|
||
| # We expect the next `pos` in the result to be the same as what we requested | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,6 +101,7 @@ | |
| from synapse.storage.prepare_database import prepare_database | ||
| from synapse.types import ISynapseReactor, JsonDict | ||
| from synapse.util.clock import Clock | ||
| from synapse.util.duration import Duration | ||
| from synapse.util.json import json_encoder | ||
|
|
||
| from tests.utils import ( | ||
|
|
@@ -301,15 +302,85 @@ def transport(self) -> "FakeChannel": | |
| def await_result(self, timeout_ms: int = 1000) -> None: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar structure to what we've done in #19871 |
||
| """ | ||
| Wait until the request is finished. | ||
|
|
||
| Advances the Twisted reactor clock by 0.1s and suspending execution of the | ||
| Python thread (to allow other threads to do work) in a loop until we see a | ||
| result. We timeout when both the Twisted reactor clock has been advanced enough | ||
| AND we've waited the 1s of real-time before giving up. | ||
|
|
||
| The loop 1) allows `clock.call_later` scheduled callbacks to run if they are | ||
| scheduled to run now and 2) will also allow other threads to make progress. This | ||
| could be things spawned on the Twisted reactor threadpool or Tokio runtime | ||
| (async Rust code). | ||
|
|
||
| Args: | ||
| timeout_ms: The Twisted reactor time we wait until we raise a `TimedOutException` | ||
| """ | ||
| end_time = self._reactor.seconds() + timeout_ms / 1000.0 | ||
| timeout = Duration(milliseconds=timeout_ms) | ||
| start_time_seconds = self._reactor.seconds() | ||
|
|
||
| # 1s is an arbitrary small number so we don't have to wait that long when | ||
| # something is stuck and because we assume any task on another thread will be | ||
| # fast enough. | ||
| # | ||
| # We don't use the same `timeout_ms` passed in because some tests specify 20s | ||
| # and we don't want to be waiting that long unnecessarily. | ||
| real_time_timeout = Duration(seconds=1) | ||
| start_real_time_seconds = time.time() | ||
|
|
||
| # TODO: Why? | ||
| self._reactor.run() | ||
|
Comment on lines
+331
to
332
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prior art. Something to figure out in a future PR. Just noting that I don't know. The reactor should already be running and we can potentially remove it in another PR. |
||
|
|
||
| loop_count = 0 | ||
| while not self.is_finished(): | ||
| if self._reactor.seconds() > end_time: | ||
| if ( | ||
| # Exceeded the Twisted reactor time timeout | ||
| # | ||
| # We use `>=` for the reactor time condition as it's possible we advance | ||
| # exactly the `timeout` amount and we don't want to get stuck in an | ||
| # infinite loop | ||
| self._reactor.seconds() >= start_time_seconds + timeout.as_secs() | ||
| # And exceeded the real-time timeout | ||
| and time.time() > start_real_time_seconds + real_time_timeout.as_secs() | ||
| ): | ||
| raise TimedOutException("Timed out waiting for request to finish.") | ||
|
|
||
| self._reactor.advance(0.1) | ||
| # Suspend execution of this thread to allow other threads to do work. This | ||
| # could be things spawned on the Twisted reactor threadpool or Tokio thread | ||
| # pool (async Rust code). | ||
| # | ||
| # Note: Since we're waiting real-time (`timeout` duration), the tests also | ||
| # pass with `time.sleep(0)` commented out because Python has a default | ||
| # thread switch interval (5ms for cpython) (see | ||
| # `sys.setswitchinterval(interval)`). We still want this here as we're able | ||
| # to preempt and cause the thread context swtich to happen faster. | ||
| # | ||
| # After a few cycles, we use `time.sleep(0.001)` instead of `time.sleep(0)` | ||
| # to avoid tightlooping on the main thread (CPU 100%) because it's wasteful | ||
| # and may starve out other threads. 10 is arbitrary but many cases will have | ||
| # none or only a few round-trips so we can just try to go as fast as | ||
| # posssible. | ||
| if loop_count < 10: | ||
| time.sleep(0) | ||
| else: | ||
| time.sleep(0.001) | ||
|
|
||
| # Advance the Twisted reactor and run any scheduled callbacks | ||
| # | ||
| # Don't advance the Twisted reactor clock further than the timeout duration | ||
| # as someone should increase the timeout if they expect things to take | ||
| # longer. | ||
| if self._reactor.seconds() < start_time_seconds + timeout.as_secs(): | ||
| self._reactor.advance(0.1) | ||
| else: | ||
| # But we want to still keep running whatever might be getting scheduled | ||
| # to run now. | ||
| # | ||
| # For example from other threads, they may have scheduled something on | ||
| # the reactor to run (like `reactor.callFromThread(...)`) | ||
| self._reactor.advance(0) | ||
|
|
||
| loop_count += 1 | ||
|
|
||
| def extract_cookies(self, cookies: MutableMapping[str, str]) -> None: | ||
| """Process the contents of any Set-Cookie headers in the response | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same changelog as #19871 so they merge