Skip to content

Commit

Permalink
Make all async sources subscribable (#3192)
Browse files Browse the repository at this point in the history
Motivation:

If all sources also implement their corresponding `*Source` interface,
they can be converted using `SourceAdapters` without allocating a
wrapper.

Modifications:

- Introduce internal `SubscribableSources` in `concurrent-api`.
- Use `SubscribableSources` as a base class for all operators.
- Use `SubscribableSources` equivalent from `concurrent-api-internal` in
all other modules where we missed `*Source` interface.

Result:

`SourceAdapters.toSource(...)` don't require allocations for our async
primitives.
  • Loading branch information
idelpivnitskiy authored Feb 19, 2025
1 parent 2a09aa2 commit e0d6fc6
Show file tree
Hide file tree
Showing 33 changed files with 140 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.servicetalk.concurrent.api.internal;

import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.TerminalNotification;
Expand Down Expand Up @@ -273,7 +272,7 @@ private Subscriber<? super T> waitForSubscriberSlowPath() throws IOException {
}
}

private static final class ConnectedPublisher<T> extends Publisher<T> {
private static final class ConnectedPublisher<T> extends SubscribablePublisher<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectedPublisher.class);
private final ConnectablePayloadWriter<T> outer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,19 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableCompletable;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;

/**
* A {@link Completable} that does not expect to receive a call to {@link #handleSubscribe(Subscriber)} since it
* overrides {@link Completable#handleSubscribe(Subscriber, CapturedContext, AsyncContextProvider)}.
*/
abstract class AbstractNoHandleSubscribeCompletable extends Completable implements CompletableSource {
abstract class AbstractNoHandleSubscribeCompletable extends SubscribableCompletable {

@Override
protected final void handleSubscribe(final Subscriber subscriber) {
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
}

@Override
public final void subscribe(final Subscriber subscriber) {
subscribeInternal(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribablePublisher;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;

Expand All @@ -25,16 +25,11 @@
*
* @param <T> Type of items emitted.
*/
abstract class AbstractNoHandleSubscribePublisher<T> extends Publisher<T> implements PublisherSource<T> {
abstract class AbstractNoHandleSubscribePublisher<T> extends SubscribablePublisher<T> {

@Override
protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
}

@Override
public final void subscribe(final Subscriber<? super T> subscriber) {
subscribeInternal(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableSingle;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;

Expand All @@ -25,16 +25,11 @@
*
* @param <T> Type of the result of the single.
*/
abstract class AbstractNoHandleSubscribeSingle<T> extends Single<T> implements SingleSource<T> {
abstract class AbstractNoHandleSubscribeSingle<T> extends SubscribableSingle<T> {

@Override
protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
deliverErrorFromSource(subscriber, new UnsupportedOperationException("Subscribe with no " +
CapturedContext.class.getSimpleName() + " is not supported for " + getClass()));
}

@Override
public final void subscribe(final Subscriber<? super T> subscriber) {
subscribeInternal(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;

Expand Down Expand Up @@ -211,8 +210,7 @@ int outstandingDemandLimit() {
}
}

private static final class DefaultGroupedPublisher<Key, T> extends GroupedPublisher<Key, T>
implements PublisherSource<T> {
private static final class DefaultGroupedPublisher<Key, T> extends GroupedPublisher<Key, T> {
private final GroupMulticastSubscriber<Key, T> groupSink;
private final CapturedContext capturedContext;
private final AsyncContextProvider contextProvider;
Expand All @@ -225,11 +223,6 @@ private static final class DefaultGroupedPublisher<Key, T> extends GroupedPublis
this.contextProvider = contextProvider;
}

@Override
public void subscribe(final Subscriber<? super T> subscriber) {
subscribeInternal(subscriber);
}

@Override
protected void handleSubscribe(Subscriber<? super T> sub) {
groupSink.subscriber(sub, true, capturedContext, contextProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableCompletable;
import io.servicetalk.concurrent.internal.DelayedCancellable;

import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static java.util.Objects.requireNonNull;

abstract class AbstractSubmitCompletable extends Completable implements CompletableSource {
abstract class AbstractSubmitCompletable extends SubscribableCompletable {
private final Executor runExecutor;

AbstractSubmitCompletable(final Executor runExecutor) {
Expand Down Expand Up @@ -59,9 +60,4 @@ protected final void handleSubscribe(final CompletableSource.Subscriber subscrib
}
cancellable.delayedCancellable(eCancellable);
}

@Override
public final void subscribe(final Subscriber subscriber) {
subscribeInternal(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableSingle;
import io.servicetalk.concurrent.internal.DelayedCancellable;

import java.util.concurrent.Callable;

import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static java.util.Objects.requireNonNull;

abstract class AbstractSubmitSingle<T> extends Single<T> implements SingleSource<T> {
abstract class AbstractSubmitSingle<T> extends SubscribableSingle<T> {
private final Executor runExecutor;

AbstractSubmitSingle(final Executor runExecutor) {
Expand Down Expand Up @@ -62,9 +62,4 @@ protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
}
cancellable.delayedCancellable(eCancellable);
}

@Override
public final void subscribe(final Subscriber<? super T> subscriber) {
subscribeInternal(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource.Subscriber;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableSingle;
import io.servicetalk.concurrent.internal.DelayedCancellable;

import java.util.ArrayList;
Expand All @@ -28,7 +28,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;

final class AmbSingles<T> extends Single<T> {
final class AmbSingles<T> extends SubscribableSingle<T> {
private final Single<? extends T>[] singles;

@SafeVarargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableCompletable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -240,12 +240,4 @@ public Completable onClosing() {
return onClosing;
}
}

private abstract static class SubscribableCompletable extends Completable implements CompletableSource {

@Override
public final void subscribe(final Subscriber subscriber) {
subscribeInternal(subscriber);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.concurrent.CompletableSource.Subscriber;
import io.servicetalk.concurrent.api.BufferStrategy.Accumulator;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableCompletable;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -143,7 +144,7 @@ public Publisher<Accumulator<T, B>> boundaries() {
CountingAccumulator<T, B> firstAccum =
new CountingAccumulator<>(state, accumulatorSupplier.get(), count);
state.beforeNewAccumulatorEmitted(firstAccum);
return Single.succeeded(firstAccum).concat(new Completable() {
return Single.succeeded(firstAccum).concat(new SubscribableCompletable() {
@Override
protected void handleSubscribe(final Subscriber subscriber) {
// We ignore cancel and expect ambWith to ignore if we delay emit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableCompletable;

import java.util.function.Supplier;

Expand All @@ -25,7 +25,7 @@
/**
* As returned by {@link Completable#defer(Supplier)}.
*/
final class CompletableDefer extends Completable implements CompletableSource {
final class CompletableDefer extends SubscribableCompletable {
private final Supplier<? extends Completable> completableFactory;

CompletableDefer(Supplier<? extends Completable> completableFactory) {
Expand All @@ -47,9 +47,4 @@ protected void handleSubscribe(Subscriber subscriber) {
// and also use the configured Executor for subscribing to the Completable returned from completableFactory.
completable.subscribeInternal(subscriber);
}

@Override
public void subscribe(final Subscriber subscriber) {
subscribeInternal(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource.Processor;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableCompletable;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.TerminalNotification;

Expand All @@ -27,12 +28,12 @@
* A {@link Completable} which is also a {@link Subscriber}. State of this {@link Completable} can be modified by using
* the {@link Subscriber} methods which is forwarded to all existing or subsequent {@link Subscriber}s.
*/
final class CompletableProcessor extends Completable implements Processor {
final class CompletableProcessor extends SubscribableCompletable implements Processor {
private final ClosableConcurrentStack<Subscriber> stack = new ClosableConcurrentStack<>();

@Override
protected void handleSubscribe(Subscriber subscriber) {
// We must subscribe before adding subscriber the the queue. Otherwise it is possible that this
// We must subscribe before adding subscriber the queue. Otherwise, it is possible that this
// Completable has been terminated and the subscriber may be notified before onSubscribe is called.
// We used a DelayedCancellable to avoid the case where the Subscriber will synchronously cancel and then
// we would add the subscriber to the queue and possibly never (until termination) dereference the subscriber.
Expand Down Expand Up @@ -70,9 +71,4 @@ public void onError(Throwable t) {
private void terminate(TerminalNotification terminalSignal) {
stack.close(terminalSignal::terminate);
}

@Override
public void subscribe(final Subscriber subscriber) {
subscribeInternal(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribableSingle;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,7 +29,7 @@
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static java.util.Objects.requireNonNull;

final class CompletionStageToSingle<T> extends Single<T> implements SingleSource<T> {
final class CompletionStageToSingle<T> extends SubscribableSingle<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(CompletionStageToSingle.class);
private final CompletionStage<? extends T> stage;

Expand Down Expand Up @@ -87,9 +87,4 @@ private CompletableFuture<? extends T> toCompletableFuture() {
return null;
}
}

@Override
public void subscribe(final Subscriber<? super T> subscriber) {
subscribeInternal(subscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.SingleSource.Subscriber;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribablePublisher;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;

Expand Down Expand Up @@ -219,9 +220,9 @@ public void onNext(@Nullable T next) {

@Nonnull
private Publisher<T> newTailPublisher() {
return new AbstractSynchronousPublisher<T>() {
return new SubscribablePublisher<T>() {
@Override
protected void doSubscribe(PublisherSource.Subscriber<? super T> newSubscriber) {
protected void handleSubscribe(PublisherSource.Subscriber<? super T> newSubscriber) {
final DelayedSubscription delayedSubscription = new DelayedSubscription();
// newSubscriber.onSubscribe MUST be called before making newSubscriber visible below with the CAS
// on maybeTailSubUpdater. Otherwise there is a potential for concurrent invocation on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.SubscribableSources.SubscribablePublisher;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;

import org.slf4j.Logger;
Expand Down Expand Up @@ -45,7 +46,7 @@
*
* @param <T> Type of items emitted to the {@link PublisherSource.Subscriber}.
*/
final class FromInputStreamPublisher<T> extends Publisher<T> implements PublisherSource<T> {
final class FromInputStreamPublisher<T> extends SubscribablePublisher<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(FromInputStreamPublisher.class);
// While sun.nio.ch.FileChannelImpl and java.io.InputStream.transferTo(...) use 8Kb chunks,
// we use 16Kb-32B because 16Kb is:
Expand Down Expand Up @@ -89,11 +90,6 @@ final class FromInputStreamPublisher<T> extends Publisher<T> implements Publishe
this.mapper = requireNonNull(mapper);
}

@Override
public void subscribe(final Subscriber<? super T> subscriber) {
subscribeInternal(subscriber);
}

@Override
protected void handleSubscribe(final Subscriber<? super T> subscriber) {
if (subscribedUpdater.compareAndSet(this, 0, 1)) {
Expand Down
Loading

0 comments on commit e0d6fc6

Please sign in to comment.