Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Publisher.zip #2690

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Add Publisher.zip #2690

wants to merge 1 commit into from

Conversation

Scottmitch
Copy link
Member

Motivation:
The zip operator helps combine results from multiple Publishers into a single Publisher.

Motivation:
The zip operator helps combine results from multiple Publishers
into a single Publisher.
Copy link
Contributor

@bryce-anderson bryce-anderson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally it looks good other than the behavior question for streams of dissimilar size.

/**
* Create a new {@link Publisher} that emits the results of a specified zipper {@link BiFunction} to items emitted
* by {@code this} and {@code other}. If any of the {@link Publisher}s terminate with an error, the returned
* {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* {@link Publisher} will wait for termination till all the other {@link Publisher}s have been subscribed and
* {@link Publisher} will wait for termination until all the other {@link Publisher}s have been subscribed and

same with the others

Comment on lines +153 to +155
for (int i = 63; i >= zipperArity; --i) {
nonEmptyQueueIndexes |= (1L << i);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you could clear the bits from a starting ALL_NON_EMPTY_MASK as you do when polling elements from queues, since the arity will almost always be low single digits.

Comment on lines +77 to +79
if (maxOutstandingDemand <= 0) {
throw new IllegalArgumentException("maxOutstandingDemand: " + maxOutstandingDemand + " (expected>0)");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my minds eye we should return an error if the concurrency exceeds the MAX_CONCURRENCY so it doesn't happen lazily.

Comment on lines +198 to +218
public void onComplete() {
List<Integer> nonEmptyIndexes = new ArrayList<>();
for (int i = 0; i < array.length; ++i) {
if ((nonEmptyQueueIndexes & (1L << i)) != 0) {
nonEmptyIndexes.add(i);
}
}
if (nonEmptyIndexes.isEmpty()) {
subscriber.onComplete();
} else {
StringBuilder sb = new StringBuilder(20 + 68 + nonEmptyIndexes.size() * 4);
sb.append("Publisher indexes: [");
Iterator<Integer> itr = nonEmptyIndexes.iterator();
sb.append(itr.next()); // safe to call next(), already checked is not empty.
while (itr.hasNext()) {
sb.append(", ").append(itr.next());
}
sb.append("] had onNext signals queued when onComplete terminal signal received");
subscriber.onError(new IllegalStateException(sb.toString()));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the same behavior described in https://reactivex.io/documentation/operators/zip.html. It suggests that its okay to have extra signals emitted and they are simply dropped which is the behavior of most collection zip operators that I've seen. At a minimum, the behavior should be documented so users know they should have publishers of all the same length, which I think could be non-trivial in some cases.

Along these same lines, I don't think this implementation will ever terminate if one of the suppliers is an infinite stream. It seems pretty reasonable to have one source that is an infinite stream an the join it with another finite stream and expect a finite result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants