Skip to content

Commit

Permalink
make system property opt-in, protect against future package name changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Jul 1, 2022
1 parent e8aa5e7 commit 864c189
Showing 1 changed file with 48 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
*/
package io.servicetalk.utils.internal;

import org.jctools.queues.MpscChunkedArrayQueue;
import org.jctools.queues.MpscLinkedQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
import org.jctools.queues.SpscChunkedArrayQueue;
import org.jctools.queues.SpscUnboundedArrayQueue;
import org.jctools.queues.atomic.MpscGrowableAtomicArrayQueue;
import org.jctools.queues.atomic.MpscLinkedAtomicQueue;
import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
Expand All @@ -51,6 +56,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import static java.lang.Boolean.getBoolean;
import static java.lang.Math.max;
import static java.lang.Math.min;

Expand Down Expand Up @@ -280,6 +286,26 @@ public static <T> Queue<T> newUnboundedSpscQueue(final int initialCapacity) {

private static final class Queues {
private static final boolean USE_UNSAFE_QUEUES;
private static final boolean USE_UNPADDED_QUEUES;

static {
// Internal property, subject to removal after validation completed.
final String useUnpaddedQueuesName = "io.servicetalk.internal.queues.useUnpadded";
boolean useUnpaddedQueues = getBoolean(useUnpaddedQueuesName);
if (useUnpaddedQueues) {
Queue<Integer> queue = null;
try {
// org.jctools.queues.ea.unpadded classes may be moved to another package name in the future.
// Don't use them if they are not available due to failure to link/initialize the classes.
queue = new MpscLinkedUnpaddedQueue<>();
} catch (Throwable ignored) {
useUnpaddedQueues = false;
}
LOGGER.debug("jctools unpadded: {}available.", queue == null ? "un" : "");
}
LOGGER.debug("{}: {}", useUnpaddedQueuesName, useUnpaddedQueues);
USE_UNPADDED_QUEUES = useUnpaddedQueues;
}

private Queues() {
}
Expand Down Expand Up @@ -311,19 +337,27 @@ static <T> Queue<T> newMpscQueue(final int initialCapacity, final int maxCapacit
// up to the next power of two and so will overflow otherwise.
final int initialCap = max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity);
final int capacity = max(min(maxCapacity, MAX_ALLOWED_QUEUE_CAPACITY), MIN_MAX_MPSC_CAPACITY);
return USE_UNSAFE_QUEUES ? new MpscChunkedUnpaddedArrayQueue<>(initialCap, capacity)
: new MpscGrowableAtomicArrayQueue<>(initialCap, capacity);
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new MpscChunkedUnpaddedArrayQueue<>(initialCap, capacity) :
new MpscChunkedArrayQueue<>(initialCap, capacity)
: new MpscGrowableAtomicArrayQueue<>(initialCap, capacity);
}

static <T> Queue<T> newUnboundedMpscQueue(final int initialCapacity) {
return USE_UNSAFE_QUEUES ?
new MpscUnboundedUnpaddedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity)) :
new MpscUnboundedAtomicArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity));
USE_UNPADDED_QUEUES ?
new MpscUnboundedUnpaddedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity)) :
new MpscUnboundedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity))
: new MpscUnboundedAtomicArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity));
}

static <T> Queue<T> newUnboundedLinkedMpscQueue() {
return USE_UNSAFE_QUEUES ? new MpscLinkedUnpaddedQueue<>()
: new MpscLinkedAtomicQueue<>();
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new MpscLinkedUnpaddedQueue<>() :
new MpscLinkedQueue<>()
: new MpscLinkedAtomicQueue<>();
}

static <T> Queue<T> newSpscQueue(final int initialCapacity, final int maxCapacity) {
Expand All @@ -332,12 +366,18 @@ static <T> Queue<T> newSpscQueue(final int initialCapacity, final int maxCapacit
// up to the next power of two and so will overflow otherwise.
final int initialCap = max(MIN_ALLOWED_SPSC_CHUNK_SIZE, initialCapacity);
final int capacity = max(min(maxCapacity, MAX_ALLOWED_QUEUE_CAPACITY), MIN_MAX_SPSC_CAPACITY);
return USE_UNSAFE_QUEUES ? new SpscChunkedUnpaddedArrayQueue<>(initialCap, capacity)
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new SpscChunkedUnpaddedArrayQueue<>(initialCap, capacity) :
new SpscChunkedArrayQueue<>(initialCap, capacity)
: new SpscGrowableAtomicArrayQueue<>(initialCap, capacity);
}

static <T> Queue<T> newUnboundedSpscQueue(final int initialCapacity) {
return USE_UNSAFE_QUEUES ? new SpscUnboundedUnpaddedArrayQueue<>(initialCapacity)
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new SpscUnboundedUnpaddedArrayQueue<>(initialCapacity) :
new SpscUnboundedArrayQueue<>(initialCapacity)
: new SpscUnboundedAtomicArrayQueue<>(initialCapacity);
}
}
Expand Down

0 comments on commit 864c189

Please sign in to comment.