Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visit all futures in FuturesUnordered #605

Open
tailhook opened this issue Oct 8, 2017 · 8 comments
Open

Visit all futures in FuturesUnordered #605

tailhook opened this issue Oct 8, 2017 · 8 comments
Labels
A-stream Area: futures::stream C-feature-request

Comments

@tailhook
Copy link
Contributor

tailhook commented Oct 8, 2017

My use case is: I have a set of futures each following its own stream which it processes and sends result over its own channel. But when the configuration is updated I need to visit every future and check if everything in the future's state is up to date.

The proposed API:

impl FuturesUnordered<F> {
  fn visit<G: FnMut(&mut F)>(&mut self, f: G) { ... }
}

Or alternatively, just allow to recover futures from the collection:

impl FuturesUnordered<F> {
  fn drain(&mut self) -> impl Iterator<F> { ... }
}

(note the latter would require mem::replace for FuturesUnordered for the use case, but is probably acceptable too)

Currently, the simplest solution I could find is:

  1. Put a Shared<()> into each future
  2. When Shared is ready, yield internal state as a result of the future
  3. Receive state from FuturesUnordered::poll and reconstruct the future with a new Shared<()> future (waiting for next update)

The alternative strategy might be:

  1. Add an id for each future and an input channel for config
  2. Add a collection of outgoing channels by id
  3. Wrap the collection (2) into an Arc<Mutex<>> so the future can remove itself from the collection

Both strategies look overly complicated and have large overhead compared to a visitor over a collection we already have.

Another thing to note: both strategies actually represent pub-sub kind of pattern. Which would be useful for futures on its own. But presumably, it will always have additional overhead.

What do you think?

@carllerche
Copy link
Member

Is there a reason why adding an iterator that iterates over the inner futures doesn't work (vs a callback)?

@tailhook
Copy link
Contributor Author

tailhook commented Oct 9, 2017

Ah, yes, an iterator over mutable references should work too.

@alexcrichton
Copy link
Member

Various iterators (iter_mut/into_iter/drain/etc) sound like a good idea to me!

@srijs
Copy link
Contributor

srijs commented Nov 3, 2017

@tailhook Just wanted to let you know that FuturesUnordered::iter_mut() just landed in master via #618.

@tailhook
Copy link
Contributor Author

tailhook commented Nov 3, 2017

@srijs, thank you! Now I'm waiting for a release :) This ticket might be closed indeed.

@tailhook tailhook closed this as completed Nov 3, 2017
@tailhook
Copy link
Contributor Author

Oh, sorry. I've missed one detail when reading comments: the original proposal was to use a closure because everything done in that closure must wake up an inner future not just the parent future holding the FuturesUnordered object. (The drain proposal would work because it would push futures back to the new collection).

So while iter_mut might be actually useful, it doesn't solve my task.

@tailhook tailhook reopened this Nov 12, 2017
@srijs
Copy link
Contributor

srijs commented Nov 12, 2017

@tailhook Ah, I see. So to clarify, you'd like to iterate over the futures in FuturesUnordered, and also have the possibility to make FuturesUnordered poll a future you've iterated over?

Since a new release hasn't been cut yet, we might have the chance to still change the API to support what you'd like to have.

If the futures_unordered::IterMut, instead of returning the futures directly returned a wrapper struct that permitted 're-scheduling' the future, would that work for your use-case?

Specifically, I'm thinking of something like futures_unordered::EntryMut<F> which implements AsRef<F> and AsMut<F>, but also provides some sort of ::wakeup(&mut self) to achieve a wake-up of the future as you described.

EDIT: Alternatively, the Drop impl for EntryMut could re-schedule if we'd want this as the default behaviour.

Not 100% sure if that's feasible yet, but I can take a look later today. What do you think?

@tailhook
Copy link
Contributor Author

Basically, I want Timeout::new().unwprap().poll().unwrap() to wakeup task after a timeout. This could be done in two ways: (1) by using a closure and running closure in the same context as poll(), and (2) by rescheduling future, so it's polled right after iteration (as you describe).

I think using Drop for wake up is too implicit. But otherwise, wrapper struct with a method should work. I'm not sure whether it's fine to keep iter_mut name.

Also, if we're looking for a similar operation in stdlib we can use retain which takes a closure too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stream Area: futures::stream C-feature-request
Projects
None yet
Development

No branches or pull requests

5 participants