Skip to content

Commit

Permalink
ConnectablePayloadWriter: avoid static exception that may leak memo…
Browse files Browse the repository at this point in the history
…ry (#3047)

Motivation:

`ConnectablePayloadWriter` uses a static `IOException` instance as a
state and can propagate it to `Subscriber`. This opens a possibility for
a memory leak if `Subscriber` or intermediate operators attach a
suppressed exception to this static instance.

Modifications:
- Create `StacklessCancelledIOException` and always use a new instance
in case of cancellation;
- Keep `cancel()` noop if `outer.closed != null`;

Result:

`ConnectablePayloadWriter` can not leak memory via static exception
instance.
  • Loading branch information
idelpivnitskiy authored Aug 21, 2024
1 parent 8074930 commit ed0265c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.oio.api.PayloadWriter;

import org.slf4j.Logger;
Expand All @@ -40,7 +41,6 @@
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.concurrent.internal.TerminalNotification.error;
import static io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;

Expand Down Expand Up @@ -193,7 +193,7 @@ private void processClosed() throws IOException {
private void processClosed(TerminalNotification currClosed) throws IOException {
Object currState = stateUpdater.getAndSet(this, State.TERMINATED);
if (currState instanceof Subscriber &&
!ConnectedPublisher.CONNECTED_PUBLISHER_CANCELLED.equals(currClosed.cause())) {
!(currClosed.cause() instanceof StacklessCancelledIOException)) {
currClosed.terminate((Subscriber<?>) currState);
}
throw newAlreadyClosed(currClosed.cause());
Expand Down Expand Up @@ -275,8 +275,6 @@ private Subscriber<? super T> waitForSubscriberSlowPath() throws IOException {

private static final class ConnectedPublisher<T> extends Publisher<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectedPublisher.class);
private static final IOException CONNECTED_PUBLISHER_CANCELLED = unknownStackTrace(
new IOException("Connected Publisher cancel()"), ConnectablePayloadWriter.class, "cancel()");
private final ConnectablePayloadWriter<T> outer;

ConnectedPublisher(final ConnectablePayloadWriter<T> outer) {
Expand Down Expand Up @@ -329,7 +327,11 @@ public void request(final long n) {

@Override
public void cancel() {
if (closedUpdater.compareAndSet(outer, null, error(CONNECTED_PUBLISHER_CANCELLED))) {
if (outer.closed != null) {
return;
}
if (closedUpdater.compareAndSet(outer, null,
error(StacklessCancelledIOException.newCancelledException()))) {
terminateRequestN();
}
}
Expand Down Expand Up @@ -378,4 +380,23 @@ private void tryWakeupWriterThread() {
private enum State {
DISCONNECTED, CONNECTING, WAITING_FOR_CONNECTED, CONNECTED, TERMINATING, TERMINATED
}

private static final class StacklessCancelledIOException extends IOException {
private static final long serialVersionUID = 8114924638812910795L;

private StacklessCancelledIOException(final String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
// Don't fill in the stacktrace to reduce performance overhead
return this;
}

static StacklessCancelledIOException newCancelledException() {
return ThrowableUtils.unknownStackTrace(new StacklessCancelledIOException("Connected Publisher cancel()"),
ConnectedPublisher.class, "cancel()");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void terminate(PublisherSource.Subscriber<?> subscriber) {
if (this == COMPLETE) {
subscriber.onComplete();
} else {
assert cause != null;
subscriber.onError(cause);
}
}
Expand Down

0 comments on commit ed0265c

Please sign in to comment.