Skip to content

Commit

Permalink
PublisherAsBlockingIterable LinkedBlockingQueue -> LinkedTransferQueue (
Browse files Browse the repository at this point in the history
#2386)

Motivation:
LinkedBlockingQueue goes through LockSupport park and unpark methods
which can incur relatively expensive context switching if the EventLoop
thread has to unpark an application thread. This has been shown to be
a bottleneck as throughput increases.

Modifications:
- Use LinkedTransferQueue which does a `Thread.yield()` before parking
  on the consumer thread. This may use more CPU on the consumer thread
  but the assumption is there will be many more application threads than
  EventLoop threads and we want to minimize producer costs.

Results:

LinkedTransferQueue
```
Running 30s test @ http://localhost:8080/medium, using 'ServiceTalkGrpcBlockingClientStrAgg' client
  1024 threads and 1024 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency       -          -       -           -
    Req/Sec     0.01k        -     0.01k         -
  290977 requests in 30s
Requests/sec: 9699.23
Transfer/sec: -
OK: 290977
KO: 0
```

LinkedBlockingQueue
```
Running 30s test @ http://localhost:8080/medium, using 'ServiceTalkGrpcBlockingClientStrAgg' client
  1024 threads and 1024 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency       -          -       -           -
    Req/Sec     0.01k        -     0.01k         -
  256778 requests in 30s
Requests/sec: 8559.27
Transfer/sec: -
OK: 256778
KO: 0
```
  • Loading branch information
Scottmitch authored Oct 4, 2022
1 parent abc0d75 commit 1c463da
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
Expand All @@ -39,7 +39,7 @@
import static io.servicetalk.concurrent.api.SubscriberApiUtils.wrapNull;
import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.concurrent.internal.TerminalNotification.error;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
import static java.lang.Math.min;
import static java.lang.Thread.currentThread;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -101,7 +101,7 @@ private static final class SubscriberAndIterator<T> implements Subscriber<T>, Bl

SubscriberAndIterator(int queueCapacity) {
requestN = queueCapacity;
data = new LinkedBlockingQueue<>();
data = new LinkedTransferQueue<>();
}

@Override
Expand Down

0 comments on commit 1c463da

Please sign in to comment.