Skip to content

Commit

Permalink
clarify variable/function names
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Oct 6, 2022
1 parent 30f1470 commit 6272a95
Showing 1 changed file with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,17 @@ private static final class SpscBlockingQueue<T> implements BlockingQueue<T> {
getLong("io.servicetalk.concurrent.internal.blockingIterableYieldNs", 1024);

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<SpscBlockingQueue> pcIndexUpdater =
AtomicLongFieldUpdater.newUpdater(SpscBlockingQueue.class, "pcIndex");
private static final AtomicLongFieldUpdater<SpscBlockingQueue> producerConsumerIndexUpdater =
AtomicLongFieldUpdater.newUpdater(SpscBlockingQueue.class, "producerConsumerIndex");
private final Queue<T> spscQueue;
@Nullable
private Thread consumerThread;
/**
* high 32 bits == producer index (see {@link #pIndex(long)})
* low 32 bits == consumer index (see {@link #cIndex(long)}}
* @see #computePCIndex(int, int)
* high 32 bits == producer index (see {@link #producerIndex(long)})
* low 32 bits == consumer index (see {@link #consumerIndex(long)}}
* @see #combineIndexes(int, int)
*/
private volatile long pcIndex;
private volatile long producerConsumerIndex;

SpscBlockingQueue(Queue<T> spscQueue) {
this.spscQueue = requireNonNull(spscQueue);
Expand Down Expand Up @@ -513,10 +513,11 @@ public String toString() {

private void producerSignalAdded() {
for (;;) {
final long currIndex = pcIndex;
final int producer = pIndex(currIndex);
final int consumer = cIndex(currIndex);
if (pcIndexUpdater.compareAndSet(this, currIndex, computePCIndex(producer + 1, consumer))) {
final long currIndex = producerConsumerIndex;
final int producer = producerIndex(currIndex);
final int consumer = consumerIndex(currIndex);
if (producerConsumerIndexUpdater.compareAndSet(this, currIndex,
combineIndexes(producer + 1, consumer))) {
if (producer - consumer <= 0 && consumerThread != null) {
final Thread wakeThread = consumerThread;
consumerThread = null;
Expand All @@ -530,20 +531,22 @@ private void producerSignalAdded() {
private T take0(BiLongFunction<TimeUnit, T> taker, long timeout, TimeUnit unit) throws InterruptedException {
final Thread currentThread = Thread.currentThread();
for (;;) {
long currIndex = pcIndex;
final int producer = pIndex(currIndex);
final int consumer = cIndex(currIndex);
long currIndex = producerConsumerIndex;
final int producer = producerIndex(currIndex);
final int consumer = consumerIndex(currIndex);
if (producer == consumer) {
// Set consumerThread before pcIndex, to establish happens-before with producer thread.
consumerThread = currentThread;
if (pcIndexUpdater.compareAndSet(this, currIndex, computePCIndex(producer, consumer + 1))) {
if (producerConsumerIndexUpdater.compareAndSet(this, currIndex,
combineIndexes(producer, consumer + 1))) {
return taker.apply(timeout, unit);
}
} else {
final T item = spscQueue.poll();
if (item != null) {
while (!pcIndexUpdater.compareAndSet(this, currIndex, computePCIndex(producer, consumer + 1))) {
currIndex = pcIndex;
while (!producerConsumerIndexUpdater.compareAndSet(this, currIndex,
combineIndexes(producer, consumer + 1))) {
currIndex = producerConsumerIndex;
}
return item;
}
Expand All @@ -555,10 +558,11 @@ private T take0(BiLongFunction<TimeUnit, T> taker, long timeout, TimeUnit unit)

private void consumerSignalRemoved(final int i) {
for (;;) {
final long currIndex = pcIndex;
final int producer = pIndex(currIndex);
final int consumer = cIndex(currIndex);
if (pcIndexUpdater.compareAndSet(this, currIndex, computePCIndex(producer, consumer + i))) {
final long currIndex = producerConsumerIndex;
final int producer = producerIndex(currIndex);
final int consumer = consumerIndex(currIndex);
if (producerConsumerIndexUpdater.compareAndSet(this, currIndex,
combineIndexes(producer, consumer + i))) {
break;
}
}
Expand Down Expand Up @@ -616,16 +620,16 @@ private static void checkInterrupted() throws InterruptedException {
}
}

private static long computePCIndex(int producer, int consumer) {
private static long combineIndexes(int producer, int consumer) {
return ((long) producer << 32) | consumer;
}

private static int cIndex(long pcIndex) {
return (int) pcIndex;
private static int consumerIndex(long producerConsumerIndex) {
return (int) producerConsumerIndex;
}

private static int pIndex(long pcIndex) {
return (int) (pcIndex >>> 32);
private static int producerIndex(long producerConsumerIndex) {
return (int) (producerConsumerIndex >>> 32);
}

private interface BiLongFunction<T, R> {
Expand Down

0 comments on commit 6272a95

Please sign in to comment.