Update HomeserverTestCase.get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)#19871
Update HomeserverTestCase.get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)#19871MadLittleMods wants to merge 63 commits into
HomeserverTestCase.get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)#19871Conversation
get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)HomeserverTestCase.get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)
| event.room_version, | ||
| ), | ||
| exc=LimitExceededError, | ||
| by=0.5, |
There was a problem hiding this comment.
In a lot of cases, the by usage didn't seem necessary at all (test still passes) (no need to advance time in the reactor/clock)
| # whole chain to completion. | ||
| self.reactor.pump([by] * 100) | ||
|
|
||
| def get_success(self, d: Awaitable[TV], by: float = 0.0) -> TV: |
There was a problem hiding this comment.
Removed the by arg as it encourages bad behavior (people use it as a hammer to advance time without reasoning to make things work) and we arbitrarily advance time 100x this amount (imprecise).
I've instead updated the few places that we use this with a precise self.reactor.advance(...) as necessary.
There was a problem hiding this comment.
I agree that by is misleading, unintuitive and should be removed.
However, I do think we probably want some way of advancing the reactor time to handle things like the cooperator. It seems relatively common (and expected) enough, and doing it manually looks tedious and noisy. This is especially true because we're adding a similar thing in _wait_for_deferred (out of necessity). I don't think we care about the exact timings, so long as it succeeds in a suitable quick manner.
I'd vote for adding an optional advance: bool = false (but with a better name) which tries advancing the reactor a tiny step. I think it's OK for this to be off by default, and if later we find we keep needing it we can turn it on.
There was a problem hiding this comment.
It seems relatively common (and expected) enough, and doing it manually looks tedious and noisy.
Based on the refactor necessary here, I don't think it's common at all. Most places don't need any time advancement. We have 3.8k tests, 3.57k places where we call get_success(...)/get_failure(...) and only 13 of those need time advancement > 0 to drive the awaitable to completion.
- 7x
self.reactor.advance(Duration(microseconds=1).as_secs()) - 2x
self.reactor.advance(0.1) - 3x
self.reactor.advance(Duration(seconds=1).as_secs()) - 1x
self.reactor.advance(Duration(seconds=5).as_secs())
It looks like there is more changes in this PR?
There are more test changes in this PR but those other cases are where we should be waiting separately (like the fire-and-forget examples). And those were just covered up because of the sloppy nature of the 100x self.pump(by=by) hidden inside get_success(...).
The main problem I have with my suggested solution is that its non-obvious for anyone running into a timeout. As a saving grace, we do explain it in the docstring.
However, I do think we probably want some way of advancing the reactor time to handle things like the cooperator.
[...]
which tries advancing the reactor a tiny step
I think it's is agreeable that we could advance the Twisted reactor to drive the awaitable we're waiting on until completion.
But what tiny step do we choose? Let's say we loop 100x (as you proposed in another comment)
Duration(microseconds=1)-> 100us only triggers theCooperatorcases (_EPSILON)Duration(milliseconds=1)-> 100ms only triggers up to theFakeTransportstuffDuration(milliseconds=10)-> 1s triggers all but 1 test case we haveDuration(milliseconds=100)-> 10s triggers all of the test cases we have at the moment (this also matches theself._reactor.advance(0.1)we use forFakeChannel.await_result(...)- What do we suggest to someone when they have a test case that needs to advance the reactor more?
And if we add an arg for this like self.get_success(background_update_d, advance_by=Duration(milliseconds=50).as_secs()) it gets dirty to time things out like we already have with self.pump(by=xxx) because of the 100x:
Lines 715 to 718 in 2bb3aac
We could instead keep looping up to a real-time timeout like we have now. If we are advancing time, this would be my pick although picking a default will result in a bunch of empty cycles in the loop where it's doing nothing until we finally reach the scheduled time.
Overall, I feel like it's not worth it for the handful of tests we're trying to make easier.
Instead, perhaps we could adjust FakeTransport, Cooperator etc to schedule things with 0 instead of some small random small value. That would eliminate the majority of the cases (9/13) where we have to advance time.
This is especially true because we're adding a similar thing in
_wait_for_deferred(out of necessity).
Not sure what you mean here? There is no time advancement in _wait_for_deferred
Perhaps you're referring to FakeChannel.await_result(...) and #19879
There was a problem hiding this comment.
Instead, perhaps we could adjust
FakeTransport,Cooperatoretc to schedule things with0instead of some small random small value. That would eliminate the majority of the cases (9/13) where we have to advance time.
At the very least, I think we can move FakeTransport at least to _Epsilon. I'm not entirely sure why _Epsilon is non-zero, it's something that we copied from Twisted IIRC. In particular, I think we need to be sure that moving it to zero still allows IO to run. What I worry about is that if we have 0 we starve out the getting IO calls.
If we can get it down to only waiting for some microseconds we can (and I'm happy with not doing it by default for now), then I think its not going to cause any confusion.
This is especially true because we're adding a similar thing in
_wait_for_deferred(out of necessity).Not sure what you mean here? There is no time advancement in
_wait_for_deferredPerhaps you're referring to
FakeChannel.await_result(...)and #19879
I mean that to make things work with Rust we have to mess around and (real) sleep to make it work, and for the scheduler/FakeTransport/etc we're having to (fake) sleep to make it work.
There was a problem hiding this comment.
I'm not entirely sure why
_EPSILONis non-zero, it's something that we copied from Twisted IIRC
I also looked into this when patching the tests and there is just no documented reason in the code, commit, or issue on the Twisted side for the value of _EPSILON.
The docs on twisted.internet.interfaces.IReactorTime.callLater(...) don't guarantee anything about it scheduling it for the next iteration of the "loop" or even the end of the current stack (like a setTimeout(0) in JavaScript). As an example, the Clock we use in tests will happily continue calling any new scheduled calls as it loops and makes more (cascade). There is even a whole issue tracking this as "bug", twisted/twisted#5962. Aside: this does call into question why we ever had self.pump() everywhere as a single advance(0) is the same as calling it 100 times like self.pump() did (a non-zero value did do something).
I think they mean for callLater(...) to have semantics that it should be in "next reactor iteration" but its not spelled out so it would be unwise to rely on it. The Twisted codebase does have this comment around some callLater(0) usage which gives some credibility to this idea.
With a more "real" reactor implementation like ReactorBase, it does is distinguish _newTimedCalls when scheduling things with callLater(0) and only considers _newTimedCalls when you call runUntilCurrent(...) again.
It seems like we could get away with using 0 as the reactor we'd use during normal operation handles things appropriately and for tests, it's fine to just cram things to go as fast as possible.
But overall, I think a non-zero _EPSILON makes sense for things to behave as described in case of any new Twisted reactor implementation:
synapse/synapse/http/client.py
Lines 166 to 168 in 7bc2b93
If we can get it down to only waiting for some microseconds we can (and I'm happy with not doing it by default for now), then I think its not going to cause any confusion.
Can you re-phrase? I'm having trouble figuring out which direction you're leaning.
I moved _EPSILON to CLOCK_SCHEDULE_EPSILON thinking I would need to re-use it but decided we can just use callLater(0) for FakeTransport and FakeChannel.
There was a problem hiding this comment.
If we can get it down to only waiting for some microseconds we can (and I'm happy with not doing it by default for now), then I think its not going to cause any confusion.
Can you re-phrase? I'm having trouble figuring out which direction you're leaning.
I moved
_EPSILONtoCLOCK_SCHEDULE_EPSILONthinking I would need to re-use it but decided we can just usecallLater(0)forFakeTransportandFakeChannel.
In the context of adding an auto_advance: bool = False param to get_success, where if set we advance the reactor by a microsecond. If that causes the majority of the tests to pass that were previously relying on the by= parameter, then I think that would get us in a good place.
That way a) there's a easy/readable way of resolving deferreds that should resolve "immediately" (taking into account the cooperator etc), and b) we don't advance the time meaningfully (as we only advance some number of micrcoseconds, and all of our timing we actually want to test is significantly more than 1ms).
There was a problem hiding this comment.
Updated to advance by CLOCK_SCHEDULE_EPSILON by default ⏩
Overall, this only simplifies the 7 tests in tests/handlers/test_presence.py that involve the Cooperator but it's a decent upgrade in terms of things just working.
To note, the other test in tests/rest/admin/test_user.py that involves the Cooperator is a fire-and-forget case.
| sync_d = ensureDeferred( | ||
| worker_presence_handler.user_syncing( | ||
| self.user_id, self.device_id, True, PresenceState.ONLINE | ||
| ), | ||
| by=0.1, | ||
| ) | ||
| ) | ||
| # `user_syncing` proxies the presence write to the main process over an HTTP | ||
| # replication request. The request body is streamed by a `Cooperator` that uses | ||
| # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so | ||
| # we need to actually advance the clock for it to fire. | ||
| self.reactor.advance(Duration(microseconds=1).as_secs()) | ||
| self.get_success(sync_d) |
There was a problem hiding this comment.
This is the main pattern I'm recommending if you need to advance time by an non-zero increment. ensureDeferred works well but the name is a bit non-obvious to describe that we want to make the task run in the background on its own.
run_in_background(...) would also work but it's usage is a bit awkward. I guess we could use run_coroutine_in_background(...) instead 🤔
The difference between ensureDeferred(...) vs run_in_background(...)/run_coroutine_in_background(...) is all of the extra LoggingContext (log context) handling. It doesn't matter for tests though.
…te_room_membership_resume_after_restart`
| # XXX: There can be a few already dispatched database queries (from normal | ||
| # background tasks in Synapse) and the threadless `ThreadPool` that we use in | ||
| # tests uses *untracked* clock calls to pass database results back so `shutdown` | ||
| # doesn't cancel those calls. This is a quirk of our test infrastructure | ||
| # (threadless `ThreadPool`) so this kind of "hack" is fine. | ||
| self.reactor.advance(0) |
There was a problem hiding this comment.
The explanation here is slightly hand-wavey.
The other comment explanations are more concrete at-least in the fact that I've traced some scheduled reactor callback.
…ocess_join_after_server_leaves_room` `wait_for_background_updates` is not relevant
| # Process the leave and join in one go. | ||
| dir_handler.update_user_directory = True | ||
| dir_handler.notify_new_event() | ||
| self.wait_for_background_updates() |
There was a problem hiding this comment.
As far as I can tell, self.wait_for_background_updates() is totally bogus here. I assume the mistake here was because notify_new_event(...) uses run_as_background_process(...) but that's a totally separate thing (background updates != background process)
This made the test work because it does wait_for_background_updates(...) which does a get_success(..., by=0.1) which pumped and advanced the reactor/clock.
But we can replace it with something more precise.
Same thing we did in #19871
…FakeTransport.registerProducer`) pt. 2
```
[FAIL]
Traceback (most recent call last):
File "/home/runner/work/synapse/synapse/tests/crypto/test_keyring.py", line 617, in test_get_keys_from_perspectives
self.assertEqual(res.added_ts, self.reactor.seconds() * 1000)
File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.10/lib/python3.10/site-packages/twisted/trial/_synctest.py", line 444, in assertEqual
super().assertEqual(first, second, msg)
File "/opt/hostedtoolcache/Python/3.10.20/x64/lib/python3.10/unittest/case.py", line 845, in assertEqual
assertion_func(first, second, msg=msg)
File "/opt/hostedtoolcache/Python/3.10.20/x64/lib/python3.10/unittest/case.py", line 838, in _baseAssertEqual
raise self.failureException(msg)
twisted.trial.unittest.FailTest: 100000 != 100000.003
tests.crypto.test_keyring.PerspectivesKeyFetcherTestCase.test_get_keys_from_perspectives
```
…ning_task`
```
Traceback (most recent call last):
File "/home/runner/work/synapse/synapse/tests/util/test_task_scheduler.py", line 276, in test_cancel_running_task
self._test_cancel_task(task_id)
File "/home/runner/work/synapse/synapse/tests/util/test_task_scheduler.py", line 241, in _test_cancel_task
assert new_counter == current_counter
builtins.AssertionError:
tests.util.test_task_scheduler.TestTaskScheduler.test_cancel_running_task
```
| task.id, result={"counter": current_counter} | ||
| ) | ||
| await self.hs.get_clock().sleep(Duration(microseconds=1)) | ||
| await self.hs.get_clock().sleep(Duration(seconds=1)) |
There was a problem hiding this comment.
Had to update this because we were seeing the following failure after I updated get_success(...) to advance by CLOCK_SCHEDULE_EPSILON. I.e. the task was scheduled and finished before we could ever cancel it like the test wants. We increase the time the task takes so we can still cancel it.
Traceback (most recent call last):
File "/home/runner/work/synapse/synapse/tests/util/test_task_scheduler.py", line 276, in test_cancel_running_task
self._test_cancel_task(task_id)
File "/home/runner/work/synapse/synapse/tests/util/test_task_scheduler.py", line 241, in _test_cancel_task
assert new_counter == current_counter
builtins.AssertionError:
tests.util.test_task_scheduler.TestTaskScheduler.test_cancel_running_task
| self.assertIsNotNone(res) | ||
| assert res is not None | ||
| self.assertEqual(res.added_ts, self.reactor.seconds() * 1000) | ||
| self.assertEqual(res.added_ts, self.clock.time_msec()) |
There was a problem hiding this comment.
Updated this to actually use millisecond resolution. Previously, this test passed because the clock was using round numbers but we updated get_success(...) to advance by CLOCK_SCHEDULE_EPSILON, this needed to change.
[FAIL]
Traceback (most recent call last):
File "/home/runner/work/synapse/synapse/tests/crypto/test_keyring.py", line 617, in test_get_keys_from_perspectives
self.assertEqual(res.added_ts, self.reactor.seconds() * 1000)
File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.10/lib/python3.10/site-packages/twisted/trial/_synctest.py", line 444, in assertEqual
super().assertEqual(first, second, msg)
File "/opt/hostedtoolcache/Python/3.10.20/x64/lib/python3.10/unittest/case.py", line 845, in assertEqual
assertion_func(first, second, msg=msg)
File "/opt/hostedtoolcache/Python/3.10.20/x64/lib/python3.10/unittest/case.py", line 838, in _baseAssertEqual
raise self.failureException(msg)
twisted.trial.unittest.FailTest: 100000 != 100000.003
tests.crypto.test_keyring.PerspectivesKeyFetcherTestCase.test_get_keys_from_perspectives
…jwt_key`
```
[FAIL]
Traceback (most recent call last):
File "/home/runner/work/synapse/synapse/tests/handlers/test_oidc.py", line 984, in test_exchange_code_jwt_key
self.assertEqual(claims["iat"], start_time)
File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.14/lib/python3.14/site-packages/twisted/trial/_synctest.py", line 444, in assertEqual
super().assertEqual(first, second, msg)
File "/opt/hostedtoolcache/Python/3.14.6/x64/lib/python3.14/unittest/case.py", line 925, in assertEqual
assertion_func(first, second, msg=msg)
File "/opt/hostedtoolcache/Python/3.14.6/x64/lib/python3.14/unittest/case.py", line 918, in _baseAssertEqual
raise self.failureException(msg)
twisted.trial.unittest.FailTest: 1000 != 1000.000001
tests.handlers.test_oidc.OidcHandlerTestCase.test_exchange_code_jwt_key
```
| # timestamps. | ||
| self.reactor.advance(1000) | ||
| start_time = self.reactor.seconds() | ||
| start_time_s = int(self.reactor.seconds()) |
There was a problem hiding this comment.
Updated this to actually use second resolution. Previously, this test passed because the clock was using round numbers but we updated get_success(...) to advance by CLOCK_SCHEDULE_EPSILON, this needed to change.
[FAIL]
Traceback (most recent call last):
File "/home/runner/work/synapse/synapse/tests/handlers/test_oidc.py", line 984, in test_exchange_code_jwt_key
self.assertEqual(claims["iat"], start_time)
File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.14/lib/python3.14/site-packages/twisted/trial/_synctest.py", line 444, in assertEqual
super().assertEqual(first, second, msg)
File "/opt/hostedtoolcache/Python/3.14.6/x64/lib/python3.14/unittest/case.py", line 925, in assertEqual
assertion_func(first, second, msg=msg)
File "/opt/hostedtoolcache/Python/3.14.6/x64/lib/python3.14/unittest/case.py", line 918, in _baseAssertEqual
raise self.failureException(msg)
twisted.trial.unittest.FailTest: 1000 != 1000.000001
tests.handlers.test_oidc.OidcHandlerTestCase.test_exchange_code_jwt_key
Update
HomeserverTestCase.get_success(...)and friends to drive async Rust (Tokio runtime/thread pool). This means you can useget_success(...)anywhere regardless of what kind of work needs to be done.Spawning from adding some more async Rust things in #19846 and wanting something more standard instead of the custom
till_deferred_has_result(...)that has crept in to a few files.Alternative to #19867 spurred on by this comment from @erikjohnston
How does this work?
Previously,
get_success(...)just ran in a hot-loop advancing the Twisted reactor clock which didn't give any time for other threads to do some work or acquire the GIL if necessary (whenever there is a hand-off from Rust to Python, we need the GIL).Now,
get_success(...)loops until we see a result (until we hit the ~0.1s real-time timeout). In the loop, we calltime.sleep(0)which will "Suspend execution of the calling thread [...]" (CPU and GIL) to allow other threads to do some work. Then like before, we advance the Twisted reactor clock to run any scheduled callbacks which includes anything the other threads may have scheduled.Does this slow down the entire test suite?
Seems just as fast as before. There is minutes variance in what we had before and after but both are within the same range of each other.
develop)trial (3.10, sqlite, all)trial (3.10, postgres, 14, all)Dev notes
time.sleep(0)("Suspend execution of the calling thread [...]")os.sched_yield()("Voluntarily relinquish the CPU.")#19394 (comment) and #19734 (comment) discuss why you sometimes need to
self.reactor.advance(0)before you can actuallyself.reactor.advance(...)in some cases and reasoning for whypump(...)may have become a thing.Todo
till_deferred_has_resultwait_on_threadPull Request Checklist
EventStoretoEventWorkerStore.".code blocks.