refactor: pipeline flow in check.rs (streams and fewer queues)#2136
refactor: pipeline flow in check.rs (streams and fewer queues)#2136katrinafyi wants to merge 37 commits intolycheeverse:masterfrom
check.rs (streams and fewer queues)#2136Conversation
…tate from its environment
--> lychee-bin/src/commands/check.rs:80:29
|
80 | let recursive_sink_go = async move |(), (guard, req)| -> Result<(), Never> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: required for `Unfold<(), {async closure@check.rs:80:29}, ...>` to implement `futures::Sink<(WaitGuard, lychee_lib::Request)>`
= note: required for `SendAll<'_, Pin<&mut Unfold<(), {async closure@check.rs:80:29}, ...>>, ...>` to implement `futures::Future`
= note: the full name for the type has been written to '/home/rina/progs/lychee/target/debug/deps/lychee-d3a56bf874abf3ea.long-type-17452620968307805126.txt'
= note: consider using `--verbose` to print the full type name to the console
any intermediate channels are hidden away in `.partition_result()` and we just get two nice streams out of it. this avoids the need for separate request error and early return bypass channels, making the initial input request processing and early return logic much simpler.
thread 'cli::test_formatted_file_output' (3775) panicked at /rustc/e408947bfd200af42db322daf0fadfe7e26d3bd1/library/core/src/ops/function.rs:250:5:
Unexpected failure.
code=2
stderr=```""```
command=`"/home/runner/work/lychee/lychee/target/debug/lychee" "--output" "006f0eb8-1b4e-46df-bffc-4cbead8df7f0.json" "--format" "json" "/home/runner/work/lychee/lychee/fixtures/TEST.md"`
code=2
stdout=""
stderr=""
ea8ffa9 to
5e9e9ef
Compare
| let (send_req, recv_req) = mpsc::channel(max_concurrency); | ||
| let (send_resp, recv_resp) = mpsc::channel(max_concurrency); | ||
| let (waiter, wait_guard) = WaitGroup::new(); | ||
| #[allow(clippy::match_bool, reason = "more readable and compact")] |
There was a problem hiding this comment.
Love the additional context in reason. I will try to adapt that pattern in my own code.
| params: CommandParams<impl Stream<Item = Result<Request, RequestError>>>, | ||
| ) -> Result<(ResponseStats, Cache, ExitCode, Arc<HostPool>), ErrorKind> { |
There was a problem hiding this comment.
These types, especially the response type, get quite unwieldy. What do you think about introducing a struct for that? Of course, the tricky part is naming it. But maybe that's an opportunity to align the response types for all commands. 🤔 If it bloats the PR too much, we can also do it later.
There was a problem hiding this comment.
Interesting module. Could even be a standalone crate at some point. Something like itertools, but for streams. I checked StreamExt and it doesn't provide that functionality yet.
| /// [online]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html | ||
| pub trait StreamExt: Stream { |
There was a problem hiding this comment.
I'm confused. This links to StreamExt, but the upstream version doesn't have these methods? Unless I overlooked them? 🤔 Should we name the trait differently in case we'll import StreamExt in the future?
There was a problem hiding this comment.
ah, I was just linking it as a related trait, not because it implements these functions. the idea being "see here" for basic combinators, and we have these extra ones. I'll reword
they don't clash because you can import ...::StreamExt as _ to get both in scope, as we do in this file. but I could also rename it if you want to avoid that entirely.
There was a problem hiding this comment.
What would be kinda cool is if there was an itertools, but for streams. So like "streamtools" or so. But, of course, that's not a great name for the trait itself. Just trying to nerd-snipe you into building that crate. 😆
| Err(Err(fatal_error)) => Err(Err((guard, fatal_error))), | ||
| }, | ||
| ) | ||
| .partition_result::<(WaitGuard, Request), Result<_, _>>(); |
There was a problem hiding this comment.
Not a huge fan of partition_result here because it introduces a fairly subtle and brittle coupling between multiple streams that must be polled concurrently to avoid deadlock.
We’re not just using it once, but nesting it (requests -> partition_result -> partition_result), which effectively builds a small network of channels + driver futures. This works today because all resulting streams are ultimately polled together via select/buffer_unordered, but the correctness relies on that structure staying intact. A future refactor that, for example, stops polling one branch (even unintentionally) could cause the whole pipeline to stall I think. 😬
The main issue is that this invariant is non-local and easy to break:
- both sides of every
partition_resultmust always be polled, and - nesting compounds the risk and makes reasoning harder
How about instead of partitioning twice, we classify once into an enum and then split with filter_map? Something like:
enum Initial {
Valid(WaitGuard, Request),
RequestError(WaitGuard, Response),
Fatal(WaitGuard, ErrorKind),
}Then map into Initial and create the three streams via filter_map.
let classified = requests.map(|(request, guard)| {
match request.map_err(RequestError::into_response) {
Ok(req) => Initial::Valid(guard, req),
Err(Ok(resp)) => Initial::RequestError(guard, resp),
Err(Err(err)) => Initial::Fatal(guard, err),
}
});This removes the co-polling requirement, and is much harder to misuse in the future.
(Coincidentally, it also gets rid of some of the ad-hoc tuples involving WaitGuards.)
I haven't fully thought this through, but I wanted to mention it somewhere so that we don't forget to discuss this before merging.
There was a problem hiding this comment.
yeah, it's a really good point. I've been deep in async lately so I thought the constraint was not too onerous, but maybe it could cause trouble.
when working with streams, I think it's just much nicer to have separate Stream<Request>, Stream<...> rather than one Stream<Initial>. whenever you transform the Stream<Initial>, you have to deal with all the cases simultaneously at each step. this is not insurmountable, but it certainly ties together the paths more than I would like.
I can imagine bandaids like must_use or trying to panic if one is dropped early, but these can never catch all cases. deadlocks in async rust are tricky, and not splitting the streams is an obvious way (the only way?) to fully prevent deadlock.
we could also make the channels unbounded (or very large bounded). but it's more bandaids :(
There was a problem hiding this comment.
I did some research on how the ecosystem handles this.
The most relevant crates:
- https://github.com/drewkett/split-stream-by does a binary stream split via predicate/map. Clean API, but has the same co-polling requirement as our current
partition_result: both output streams must be polled or the one-element buffer fills and stalls. Only handles binary splits, so three-way classification would still require nesting. - https://crates.io/crates/stream-partition provides N-way split via a shared partitioner struct. More flexible, but the shared mutable state doesn't really fit our pipeline style.
- https://crates.io/crates/and-then-concurrent is not a splitting crate, but documents exactly why this problem is hard: whenever you fan out a stream, one branch can block waiting for the other to be driven. Their fix is
FuturesUnordered; the general ecosystem fix is channels.
The broader pattern seems to be: spawn a task, classify into unbounded senders, receivers implement Stream. No co-polling required, but we lose backpressure on the classification step.
Alterantively, can't we just do a single binary split on Result<Request, RequestError> -- valid vs. error -- and handle the fatal/non-fatal distinction imperatively on the error side via into_response? That reduces the nested partition_result chain to one split with one co-polling invariant to uphold, and fatal errors (of which there should be zero or one in practice) are just propagated normally rather than requiring a third stream. So like
let (valid_requests, error_requests) = requests
.split_by_map(|result| match result {
Ok(req) => Either::Left(req),
Err(e) => Either::Right(e),
});There was a problem hiding this comment.
Thanks for doing the research! I think that's a good summary of the related projects, it's a tricky problem. I agree that the N-way split wouldn't fit this use case, and the conflict is between wanting backpressure but also the user invariant of co-polling.
I'm not sure what you mean when you say "propagated normally". Are you suggesting that the RequestError stream should just turn into a Response stream (for the non-fatal errors) and the fatal errors should go somewhere else?
If so, it might be similar to an older commit where the fatal errors are just sent into a SetOnce instead of splitting into a stream. But, really, this is just an inlined version of partition_result and it has the same co-poll requirement, it's just even more hidden because there's no .partition_result function that directly returns a tuple.
I was thinking about it a bit and the co-polling requirement might be okay in practice, because the pipeline is defined statically so the polling hierarchy is all fixed and unchanging. This means that if it works in one case, it's likely to work in other cases because the pipeline structure doesn't change. There is also only one main .await in the function, so everything is polled simultaneously by necessity. We should try and keep that structure.
If the nested partitions are confusing, we can also split up the map that returns a nested Result into two separate map calls and two separate partition_result calls.
There was a problem hiding this comment.
Yeah, you might be right. Maybe if we carefully document the invariants, it could be fine. There's a risk that we will otherwise forget those details and hit a deadlock while refactoring. That would be pretty sad. But I don't have any better idea for structuring the code right now. It's just a hard problem.
I'm not sure what you mean when you say "propagated normally". Are you suggesting that the RequestError stream should just turn into a Response stream (for the non-fatal errors) and the fatal errors should go somewhere else?
Yeah, exactly. I forgot about the older commit. You're right that it's not helping much.
Is there a way to make sure that streams get polled at all? Can we use #[must_use] in that case? If so, that would eliminate the biggest footgun imho.
There was a problem hiding this comment.
...well, we probably already have #[must_use] on the stream itself because of the futures crate, I guess. So there ain't much we can do on top of that.
There was a problem hiding this comment.
I was working on the ergonomics a little bit. My goal was to make the API as robust as possible.
Here's the diff:
rina-forks/lychee@refactor-check...lycheeverse:lychee:refactor-check-must-use
The PartitionedStream<T, SenderFut> newtype is the core of the change. Because type aliases can't carry #[must_use], the only way to get the lint is a newtype wrapper. The Stream and FusedStream impls are forwarded via a manual poll_next using map_unchecked_mut. There's one field and no movement, so the unsafe is sound. (But I could understand why we don't want to use unsafe for that.)
Added a comment that pin_project would be cleaner if we ever wanted to add the dependency.
At the end of the day, this only helps with not forgetting that we use the return value.
Other than that, just a bunch of comments we could copy over to your branch.
There was a problem hiding this comment.
Oh! Thanks for the commits. I did run into the type aliases issue just now. However, it looks like the #[must_use] does propagate from the inner stream type (which already has #[must_use] so I don't think the newtype is strictly necessary. It would be necessary if we wanted to add must_use to a type which didn't already have it.
I Just saw this so I've already tweaked the comments a little bit, but I'll copy some out.
I also did these experiments with must_use and it appears it does propagate through tuples as well. However, it doesn't warn in some cases you might want it to and mostly applies to dropping the value on the LHS of a ;.
// unused variable warning only
let (a, b) = stream::empty::<Result<(), ()>>().partition_result();
// must_use warning
stream::empty::<Result<(), ()>>().partition_result();
stream::empty::<Result<(), ()>>().partition_result().0;
// no warning even if we throw away one side
let _ = stream::empty::<Result<(), ()>>().partition_result().0;
// no warning
let (_a, _b) = stream::empty::<Result<(), ()>>().partition_result();There was a problem hiding this comment.
Ok, idk, it should be fine then? Seems like the newtype doesn't improve the ergonomics and we can spare ourselves the unsafe then. :)
There was a problem hiding this comment.
Yeah. I have explored down the path of custom Future/Stream before, but every time I've abandoned it and walked it back. It's just way too complicated ;-;
However... if we ever do it, pin_project and the delegate macros would be the way to go, I think.
katrinafyi
left a comment
There was a problem hiding this comment.
thanks for the detailed review! I've just replied to a few now and I'll do the code changes when I have time :)
| Err(Err(fatal_error)) => Err(Err((guard, fatal_error))), | ||
| }, | ||
| ) | ||
| .partition_result::<(WaitGuard, Request), Result<_, _>>(); |
There was a problem hiding this comment.
yeah, it's a really good point. I've been deep in async lately so I thought the constraint was not too onerous, but maybe it could cause trouble.
when working with streams, I think it's just much nicer to have separate Stream<Request>, Stream<...> rather than one Stream<Initial>. whenever you transform the Stream<Initial>, you have to deal with all the cases simultaneously at each step. this is not insurmountable, but it certainly ties together the paths more than I would like.
I can imagine bandaids like must_use or trying to panic if one is dropped early, but these can never catch all cases. deadlocks in async rust are tricky, and not splitting the streams is an obvious way (the only way?) to fully prevent deadlock.
we could also make the channels unbounded (or very large bounded). but it's more bandaids :(
| /// [online]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html | ||
| pub trait StreamExt: Stream { |
There was a problem hiding this comment.
ah, I was just linking it as a related trait, not because it implements these functions. the idea being "see here" for basic combinators, and we have these extra ones. I'll reword
they don't clash because you can import ...::StreamExt as _ to get both in scope, as we do in this file. but I could also rename it if you want to avoid that entirely.
Nice benchmark! I looked into it a bit. The peak RSS increase is most plausibly explained by the combination of a few factors. The fact that we're using an unbounded channel means no backpressure from the sender side, so if the input stream outpaces the checker, items can queue without bound. :/ The most practical fix I can see is to remove entries from So, like, this is a hacky way to do it to explain what I mean: async fn lock_uri_mutex(&self, uri: Uri) -> tokio::sync::OwnedMutexGuard<()> {
let uri_mutex = self
.active_requests
.entry(uri.clone())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
.clone();
// Remove immediately so the map doesn't accumulate stale entries.
// The Arc keeps the mutex alive until the guard is dropped.
self.active_requests.remove(&uri);
uri_mutex.lock_owned().await
}(Not sure if the code is exactly correct, but you get the idea.) After Though we have to be careful! That introduces a race where two concurrent requests for the same URI each get their own mutex and both proceed. The proper fix is probably to remove the entry after the request completes (i.e. after I would be in favor of the latter option. Caching should happen closer to the hosts. That's because the The unbounded channel is worth noting too. It's worth checking whether a bounded channel with backpressure (sized to |
Co-authored-by: Matthias Endler <matthias@endler.dev>
|
Hmm about the memory usage, I don't know if I do see the point about |
Conflicts: lychee-lib/Cargo.toml
056507e to
3b4a2a7
Compare
|
I think I've got to all of the review comments now. I didn't copy over all the comments from rina-forks/lychee@refactor-check...lycheeverse:lychee:refactor-check-must-use, but I think I got the important points. Let me know if anything was missed. |
Conflicts: lychee-bin/src/commands/check.rs
01d4e27 to
d209224
Compare
|
Has this failing test case been seen in other PRs? It seems flaky but it also seems somewhat common in this PR, so it makes me worry. I can't reproduce on my computer. |
mre
left a comment
There was a problem hiding this comment.
This seems fine to me. Only added one minor remark.
I would like to test this locally before merging. Will try to run it next and report back if I found any issues. :)
Co-authored-by: Matthias Endler <matthias@endler.dev>
15d3b47 to
ff41167
Compare
Yes, that has happened before. It's flaky because WayBackMachine is flaky. It doesn't always respond on time. Not sure what to do here. Maybe we should mark with |
|
While testing, I did not find any bugs with this branch specifically, but I noticed that we panic on the following call: lychee --max-concurrency=0 https://en.wikipedia.org/wiki/Tony_Hoare
thread 'main' (32032398) panicked at lychee-bin/src/commands/check.rs:90:60:
mpsc bounded channel requires buffer > 0
note: run with `RUST_BACKTRACE=1` environment variable to display a backtraceI think we should show a more user-friendly message. If you like, you can add a check for that, but maybe it's too big for this PR, and we rather create a separate PR for that. Both is fine with me. |
This starts to make the outline of the architecture diagram in #1898 (comment), by rewriting

check.rsto use streams as much as possible and fewer queues. The red arrows show the pipeline as implemented by this PR.Where the arrows go over or skip some nodes in the diagram, it's either not implemented yet (like outcome interpreter), no longer necessary (outcome queue isn't needed, I think), or not moved into that position yet (extractor is invoked somewhere else atm). The recursion channel in this PR is also a stub and not used yet.
The pipeline logic is written in the
fn checkfunction which calls out to helper functions where available. The function is big atm (130 lines), but I think it reads really well from top to bottom and the flow of the diagram is clear. The flow is mostly top to bottom, aside from the recursion channel (which goes back to the top). As one example, compared to the old code, I think the splits and merges in the data flow are much more obvious. The splits correspond topartition_resultcalls and the joins arestream::selectcalls.I've kept the scope focused on the check.rs part in this PR so we can discuss it first before doing further changes. This PR should do no functionality changes, it just changes how things are wired together inside check.rs.
Some notes about this new stream approach vs old channels:
.partition_results()stream extension function.impl Streamon local variables to annotate types. i've put extra type annotations in some places to try and make it clearer.buffer_unorderedfunction to run async tasks concurrently.for fun, i also did some rudimentary benchmarks with poop:
i can't really explain why peak memory goes up but everything else looks good. tested on 1.5M local links.