Skip to content

Commit

Permalink
ARTEMIS-3163 Experimental support for Netty IO_URING incubator
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Mar 7, 2021
1 parent 57e85bb commit 4128da6
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void unableToCheckEpollAvailability(@Cause Throwable e);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 212080, value = "Unable to check IoUring availability ",
format = Message.Format.MESSAGE_FORMAT)
void unableToCheckIoUringAvailability(@Cause Throwable e);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 212072, value = "Failed to change channel state to ReadyForWriting ",
format = Message.Format.MESSAGE_FORMAT)
Expand All @@ -420,6 +425,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void unableToCheckEpollAvailabilitynoClass();

@LogMessage(level = Logger.Level.WARN)
@Message(id = 212079, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning",
format = Message.Format.MESSAGE_FORMAT)
void unableToCheckIoUringAvailabilitynoClass();

@LogMessage(level = Logger.Level.WARN)
@Message(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {0} of {1}",
format = Message.Format.MESSAGE_FORMAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,17 @@ public static final boolean isKQueueAvailable() {
return false;
}
}

public static boolean isIoUringAvailable() {
try {
return Env.isLinuxOs() && (boolean) (Class.forName("io.netty.incubator.channel.uring.IOUring")
.getMethod("isAvailable").invoke(null));
} catch (NoClassDefFoundError noClassDefFoundError) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass();
return false;
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class TransportConstants {

public static final String USE_KQUEUE_PROP_NAME = "useKQueue";

public static final String USE_IOURING_PROP_NAME = "useIoUring";

@Deprecated
/**
* @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME
Expand Down Expand Up @@ -202,6 +204,8 @@ public class TransportConstants {

public static final boolean DEFAULT_USE_KQUEUE = true;

public static final boolean DEFAULT_USE_IOURING = false;

public static final boolean DEFAULT_USE_INVM = false;

public static final boolean DEFAULT_USE_SERVLET = false;
Expand Down Expand Up @@ -374,6 +378,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
//noinspection deprecation
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.PROTOCOLS_PROP_NAME);
Expand Down Expand Up @@ -443,6 +448,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -112,6 +113,7 @@ public class NettyAcceptor extends AbstractAcceptor {
public static final String NIO_ACCEPTOR_TYPE = "NIO";
public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL";
public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
public static final String IOURING_ACCEPTOR_TYPE = "IO_URING";

static {
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use
Expand Down Expand Up @@ -148,6 +150,8 @@ public class NettyAcceptor extends AbstractAcceptor {

private final boolean useKQueue;

private final boolean useIoUring;

private final ProtocolHandler protocolHandler;

private final String host;
Expand Down Expand Up @@ -268,6 +272,7 @@ public NettyAcceptor(final String name,

useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);

backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
Expand Down Expand Up @@ -401,6 +406,21 @@ public ActiveMQThreadFactory run() {
acceptorType = KQUEUE_ACCEPTOR_TYPE;

logger.debug("Acceptor using native kqueue");
} else if (useIoUring && CheckDependencies.isIoUringAvailable()) {
channelClazz = (Class<? extends ServerChannel>) Class.forName("io.netty.incubator.channel.uring.IOUringServerSocketChannel",
true, ClientSessionFactoryImpl.class.getClassLoader());
eventLoopGroup = (EventLoopGroup) Class.forName("io.netty.incubator.channel.uring.IOUringEventLoopGroup",
true, ClientSessionFactoryImpl.class.getClassLoader())
.getConstructor(int.class, ThreadFactory.class)
.newInstance(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
@Override
public ActiveMQThreadFactory run() {
return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
}
}));
acceptorType = IOURING_ACCEPTOR_TYPE;

logger.debug("Acceptor using native io_uring");
} else {
channelClazz = NioServerSocketChannel.class;
eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
Expand Down

0 comments on commit 4128da6

Please sign in to comment.