Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update JCTools 3.3.0 -> 3.3.1-ea, use unpadded queues #2257

Merged
merged 4 commits into from
Jul 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jaxRsVersion=2.1.6
jerseyVersion=2.35

reactiveStreamsVersion=1.0.3
jcToolsVersion=3.3.0
jcToolsVersion=3.3.1-ea
jacksonVersion=2.13.2.2
# backward compatible with jackson 2.9+, we do not depend on any new features from later versions.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
import org.jctools.queues.atomic.SpscGrowableAtomicArrayQueue;
import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue;
import org.jctools.queues.ea.unpadded.MpscChunkedUnpaddedArrayQueue;
import org.jctools.queues.ea.unpadded.MpscLinkedUnpaddedQueue;
import org.jctools.queues.ea.unpadded.MpscUnboundedUnpaddedArrayQueue;
import org.jctools.queues.ea.unpadded.SpscChunkedUnpaddedArrayQueue;
import org.jctools.queues.ea.unpadded.SpscUnboundedUnpaddedArrayQueue;
import org.jctools.util.Pow2;
import org.jctools.util.UnsafeAccess;
import org.slf4j.Logger;
Expand All @@ -51,6 +56,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import static java.lang.Boolean.getBoolean;
import static java.lang.Math.max;
import static java.lang.Math.min;

Expand Down Expand Up @@ -280,6 +286,26 @@ public static <T> Queue<T> newUnboundedSpscQueue(final int initialCapacity) {

private static final class Queues {
private static final boolean USE_UNSAFE_QUEUES;
private static final boolean USE_UNPADDED_QUEUES;

static {
// Internal property, subject to removal after validation completed.
final String useUnpaddedQueuesName = "io.servicetalk.internal.queues.useUnpadded";
boolean useUnpaddedQueues = getBoolean(useUnpaddedQueuesName);
if (useUnpaddedQueues) {
Queue<Integer> queue = null;
try {
// org.jctools.queues.ea.unpadded classes may be moved to another package name in the future.
// Don't use them if they are not available due to failure to link/initialize the classes.
queue = new MpscLinkedUnpaddedQueue<>();
} catch (Throwable ignored) {
useUnpaddedQueues = false;
}
LOGGER.debug("jctools unpadded: {}available.", queue == null ? "un" : "");
}
LOGGER.debug("{}: {}", useUnpaddedQueuesName, useUnpaddedQueues);
USE_UNPADDED_QUEUES = useUnpaddedQueues;
}

private Queues() {
}
Expand Down Expand Up @@ -311,19 +337,27 @@ static <T> Queue<T> newMpscQueue(final int initialCapacity, final int maxCapacit
// up to the next power of two and so will overflow otherwise.
final int initialCap = max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity);
final int capacity = max(min(maxCapacity, MAX_ALLOWED_QUEUE_CAPACITY), MIN_MAX_MPSC_CAPACITY);
return USE_UNSAFE_QUEUES ? new MpscChunkedArrayQueue<>(initialCap, capacity)
: new MpscGrowableAtomicArrayQueue<>(initialCap, capacity);
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new MpscChunkedUnpaddedArrayQueue<>(initialCap, capacity) :
new MpscChunkedArrayQueue<>(initialCap, capacity)
: new MpscGrowableAtomicArrayQueue<>(initialCap, capacity);
}

static <T> Queue<T> newUnboundedMpscQueue(final int initialCapacity) {
return USE_UNSAFE_QUEUES ? new MpscUnboundedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity))
: new MpscUnboundedAtomicArrayQueue<>(
max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity));
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new MpscUnboundedUnpaddedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity)) :
new MpscUnboundedArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity))
: new MpscUnboundedAtomicArrayQueue<>(max(MIN_ALLOWED_MPSC_CHUNK_SIZE, initialCapacity));
}

static <T> Queue<T> newUnboundedLinkedMpscQueue() {
return USE_UNSAFE_QUEUES ? new MpscLinkedQueue<>()
: new MpscLinkedAtomicQueue<>();
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new MpscLinkedUnpaddedQueue<>() :
new MpscLinkedQueue<>()
: new MpscLinkedAtomicQueue<>();
}

static <T> Queue<T> newSpscQueue(final int initialCapacity, final int maxCapacity) {
Expand All @@ -332,12 +366,18 @@ static <T> Queue<T> newSpscQueue(final int initialCapacity, final int maxCapacit
// up to the next power of two and so will overflow otherwise.
final int initialCap = max(MIN_ALLOWED_SPSC_CHUNK_SIZE, initialCapacity);
final int capacity = max(min(maxCapacity, MAX_ALLOWED_QUEUE_CAPACITY), MIN_MAX_SPSC_CAPACITY);
return USE_UNSAFE_QUEUES ? new SpscChunkedArrayQueue<>(initialCap, capacity)
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new SpscChunkedUnpaddedArrayQueue<>(initialCap, capacity) :
new SpscChunkedArrayQueue<>(initialCap, capacity)
: new SpscGrowableAtomicArrayQueue<>(initialCap, capacity);
}

static <T> Queue<T> newUnboundedSpscQueue(final int initialCapacity) {
return USE_UNSAFE_QUEUES ? new SpscUnboundedArrayQueue<>(initialCapacity)
return USE_UNSAFE_QUEUES ?
USE_UNPADDED_QUEUES ?
new SpscUnboundedUnpaddedArrayQueue<>(initialCapacity) :
new SpscUnboundedArrayQueue<>(initialCapacity)
: new SpscUnboundedAtomicArrayQueue<>(initialCapacity);
}
}
Expand Down