Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FoldWithStream stream adapter #989

Open
Ekleog opened this issue Apr 26, 2018 · 1 comment
Open

FoldWithStream stream adapter #989

Ekleog opened this issue Apr 26, 2018 · 1 comment
Labels
A-stream Area: futures::stream C-feature-request

Comments

@Ekleog
Copy link

Ekleog commented Apr 26, 2018

So in the series “here is a stream adapter I wanted and thus wrote so do you want it?”, now comes the FoldWithStream stream adapter, that:

  1. Consumes the stream
  2. Passes its first element as well as the remainder to a folding function
  3. Recovers the stream to be continued from the folding function
  4. Iterates

So I'm thinking for this one there might be better ways to do it. However, the best I managed with “standard” stream adapters is the stuff I deleted in Ekleog/kannader@b6e0aff … because it stack overflow'd.

So here is what I have written, if you think it'd be useful that I PR it I can do so. :)

enum NextStep<S: Stream, F: Future, Acc> {
    Stream(S, Acc),
    Future(F),
    Completed,
}

pub struct FoldWithStream<S, Acc, Fun, Ret>
where
    S: Stream,
    Fun: FnMut(Acc, S::Item, S) -> Ret,
    Ret: Future<Item = (S, Acc), Error = S::Error>,
{
    next: NextStep<S, Ret, Acc>,
    f: Fun,
}

impl<S, Acc, Fun, Ret> Future for FoldWithStream<S, Acc, Fun, Ret>
where
    S: Stream,
    Fun: FnMut(Acc, S::Item, S) -> Ret,
    Ret: Future<Item = (S, Acc), Error = S::Error>,
{
    type Item = Acc;
    type Error = S::Error;

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        loop {
            match mem::replace(&mut self.next, NextStep::Completed) {
                NextStep::Stream(mut s, acc) => {
                    match s.poll() {
                        Ok(Async::Ready(Some(i))) => {
                            self.next = NextStep::Future((self.f)(acc, i, s));
                        }
                        Ok(Async::Ready(None)) => return Ok(Async::Ready(acc)),
                        Ok(Async::NotReady) => {
                            self.next = NextStep::Stream(s, acc);
                            return Ok(Async::NotReady);
                        }
                        Err(e) => return Err(e),
                    }
                }
                    match f.poll() {
                        Ok(Async::Ready((s, acc))) => {
                            self.next = NextStep::Stream(s, acc);
                        }
                        Ok(Async::NotReady) => {
                            self.next = NextStep::Future(f);
                            return Ok(Async::NotReady);
                        }
                        Err(e) => return Err(e),
                    }
                }
                NextStep::Completed => panic!("attempted to poll FoldWithStream after completion"),
            }
        }
    }
}

pub trait StreamExt: Stream {
    fn fold_with_stream<Fun, Acc, Ret>(self, init: Acc, f: Fun)
        -> FoldWithStream<Self, Acc, Fun, Ret>
    where
        Self: Sized,
        Fun: FnMut(Acc, Self::Item, Self) -> Ret,
        Ret: Future<Item = (Self, Acc), Error = Self::Error>,
    {
        FoldWithStream {
            next: NextStep::Stream(self, init),
            f
        }
    }
}
@tikue
Copy link
Contributor

tikue commented May 19, 2018

I, too, have run into a case where I needed this exact combinator. I think it was when trying to thread a BiLock through a stream.

My gut feeling is that, if a stream becomes complex enough to require this behavior, it might be time to implement Stream with a bespoke struct. I also think that the core stream/future combinators should be limited to those that have widely applicable use. I think there's room for an external crate to experiment with things like this -- streamtools/futuretools, dual to itertools.

@taiki-e taiki-e added the A-stream Area: futures::stream label Dec 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stream Area: futures::stream C-feature-request
Projects
None yet
Development

No branches or pull requests

4 participants