Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zip_latest Stream adapter #891

Open
stephaneyfx opened this issue Mar 21, 2018 · 11 comments
Open

zip_latest Stream adapter #891

stephaneyfx opened this issue Mar 21, 2018 · 11 comments
Labels
A-stream Area: futures::stream C-feature-request

Comments

@stephaneyfx
Copy link
Contributor

I would like your opinion on adding a zip_latest adapter to StreamExt (a.k.a. combineLatest in some reactive frameworks, but I think that "combine" does not give any indication of how items are combined).

This adapter proves useful when zipping two streams that don't necessarily produce items at the same time and re-using the last produced item is desirable. For example, let's assume a system composed of two components A and B changing over time, but not necessarily simultaneously. The states of A and B can be represented by two streams SA and SB whose items are the new states of each component. The usual zip adapter produces a new item only when both SA and SB do, so it is not suitable to represent the state of the system as a whole. When SA produces a new item, if SB doesn't, the item freshly produced by SA should be paired with the latest item produced by SB (and vice versa). Such a stream can be used to represent the state of the system. With a diagram, this gives (self and other are Stream):

---a------b------c------> self
------0---1---2---------> other
------a0--b1--b2-c2-----> self.zip_latest(other)

Please let me know if this is something worth integrating to futures-util, in which case I can submit a PR.
Thank you for such a great crate!

@cramertj
Copy link
Member

I don't have any particular opposition to this, but I haven't seen it commonly requested. The presence of it in other frameworks makes it seem like others find it useful-- what is it typically used for?

@qm3ster
Copy link

qm3ster commented Mar 25, 2020

  1. This would require the items to be Clone, right? Or could they be provided downstream as borrows, the combinator forever keeping ownership?
  2. I currently solved my need for such an operator with this, based on async_stream::stream! and tokio::select!. It does look kinda verbose, but it gives you a lot of other opportunities, and doesn't impose any limits on the Item.
    I'm not sure that I would prefer a separate adapter that gives me a tuple, even when I'm working of signals of primitive types. (In RxJS I have at first had quite the number of problems with one of the inputs not having a value for a long time, blocking the entire stream. I don't think startWith on inputs or outputs is an improvement over setting default values or yielding something in the post-select! section when certain values are still not ready.

@taiki-e taiki-e added the A-stream Area: futures::stream label Dec 17, 2020
@CyberTianzun
Copy link

I don't have any particular opposition to this, but I haven't seen it commonly requested. The presence of it in other frameworks makes it seem like others find it useful-- what is it typically used for?

When I use data streams that arrive at different times, it always need this zip_latest to combine those.
i think it's commonly requested. (a.k.a. combineLatest in some reactive frameworks)

@qm3ster
Copy link

qm3ster commented Sep 15, 2021

@cnzx219 have you considered a declarative macro that expands to something like this:

async_stream::stream! {
    let mut it_a = <expression_one>;
    let mut it_b = <expression_two>;
    let mut a = None;
    let mut b = None;
    loop {
        tokio::select! {
            Some(next) = it_a.next() => { a = Some(next); }
            Some(next) = it_b.next() => { b = Some(next); }
            else => break,
        }
        if let (Some(a), Some(b)) = (a, b) {
            yield (a.clone(), b.clone())
        }
    }
}

@CyberTianzun
Copy link

@cnzx219 have you considered a declarative macro that expands to something like this:

async_stream::stream! {
    let mut a = None;
    let mut b = None;
    loop {
        tokio::select! {
            Some(next_a) = identifier_one.next() => { a = next_a.into(); }
            Some(next_b) = identifier_two.next() => { b = next_b.into(); }
            else => break,
        }
        if let (Some(a), Some(b)) = (a, b) {
            yield (a, b)
        }
    }
}

Thanks for your advise. But the zip_latest is totally different from that select!. select! focuses on the stream data that is ready first. zip_latest focuses on combine the latest value into a tuple.

@qm3ster
Copy link

qm3ster commented Sep 22, 2021

@cnzx219 no no, this is what zip_latest!(identifier_one, identifier_two) could expand to in your project - a piece of code that contains async_stream::stream! and tokio::select!.

The produced impl Stream has the following behavior:

  1. until there is a value from all inputs, it doesn't produce items
  2. once there has been a value from all inputs, it produces a full tuple whenever any of the inputs yields an item
  3. if all of the input streams close, it closes

which is the behavior of combineLatest

The problem is with lifetimes. Stream, like Iterator, doesn't allow Item to borrow from Self
You could emulate JS behavior when a subset of your items isn't Clone by using a reference counting pointer type:

use std::rc::Rc;
let zipped = zip_latest!(clone_stream, unclone_stream.map(Rc));

@CyberTianzun
Copy link

@cnzx219 no no, this is what zip_latest!(identifier_one, identifier_two) could expand to in your project - a piece of code that contains async_stream::stream! and tokio::select!.

The produced impl Stream has the following behavior:

  1. until there is a value from all inputs, it doesn't produce items
  2. once there has been a value from all inputs, it produces a full tuple whenever any of the inputs yields an item
  3. if all of the input streams close, it closes

which is the behavior of combineLatest

The problem is with lifetimes. Stream, like Iterator, doesn't allow Item to borrow from Self
You could emulate JS behavior when a subset of your items isn't Clone by using a reference counting pointer type:

use std::rc::Rc;
let zipped = zip_latest!(clone_stream, unclone_stream.map(Rc));

Thanks for your explanation. And I also know that there are multiple ways to implement this behavior. But I prefer the first one, it can work too. As in my first comments, I just supports that it's commonly requested.

@extremeandy
Copy link

Has anyone implemented this yet? I'd also be keen for a similar combinator which works on Vec<impl Stream<Item = T>>, as for combineLatest in Rx frameworks)

@stephaneyfx
Copy link
Contributor Author

@extremeandy zip_latest
I'm working on a zip_latest_all that should cover the Vec<impl Stream> case.

@qm3ster
Copy link

qm3ster commented Oct 12, 2022

@stephaneyfx if you are taking on this undertaking (:v), please consider including zip_with and zip_all_with, that take a mapping function from &A, &B(or &(A, B)) and &[T] respectively to the returned Item, with no Clone bound.

@stephaneyfx
Copy link
Contributor Author

zip_latest_all, zip_latest_with_all and zip_latest_with were added.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stream Area: futures::stream C-feature-request
Projects
None yet
Development

No branches or pull requests

6 participants