-
Notifications
You must be signed in to change notification settings - Fork 928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARTEMIS-3163 Experimental support for Netty IO_URING incubator #3479
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -395,6 +395,11 @@ public interface ActiveMQClientLogger extends BasicLogger { | |
format = Message.Format.MESSAGE_FORMAT) | ||
void unableToCheckEpollAvailability(@Cause Throwable e); | ||
|
||
@LogMessage(level = Logger.Level.WARN) | ||
@Message(id = 212080, value = "Unable to check IoUring availability ", | ||
format = Message.Format.MESSAGE_FORMAT) | ||
void unableToCheckIoUringAvailability(@Cause Throwable e); | ||
|
||
@LogMessage(level = Logger.Level.WARN) | ||
@Message(id = 212072, value = "Failed to change channel state to ReadyForWriting ", | ||
format = Message.Format.MESSAGE_FORMAT) | ||
|
@@ -420,6 +425,11 @@ public interface ActiveMQClientLogger extends BasicLogger { | |
format = Message.Format.MESSAGE_FORMAT) | ||
void unableToCheckEpollAvailabilitynoClass(); | ||
|
||
@LogMessage(level = Logger.Level.WARN) | ||
@Message(id = 212079, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please put in correct place in order or the code. So people know what the high watermark for next code is without needing to hunt. All others are in asc order, please follow |
||
format = Message.Format.MESSAGE_FORMAT) | ||
void unableToCheckIoUringAvailabilitynoClass(); | ||
|
||
@LogMessage(level = Logger.Level.WARN) | ||
@Message(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {0} of {1}", | ||
format = Message.Format.MESSAGE_FORMAT) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,4 +54,17 @@ public static final boolean isKQueueAvailable() { | |
return false; | ||
} | ||
} | ||
|
||
public static boolean isIoUringAvailable() { | ||
try { | ||
return Env.isLinuxOs() && (boolean) (Class.forName("io.netty.incubator.channel.uring.IOUring") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should just call IOUring.isAvailable() like we do with kqueue and epoll, dont do this via reflection There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not available via maven afaik so it's up to the users to compile it from source and use it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its available. These is actually in the docs on the netty github There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow ok, will remove the reflection then, thanks to have checked! |
||
.getMethod("isAvailable").invoke(null)); | ||
} catch (NoClassDefFoundError noClassDefFoundError) { | ||
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass(); | ||
return false; | ||
} catch (Throwable e) { | ||
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e); | ||
return false; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,8 @@ public class TransportConstants { | |
|
||
public static final String USE_KQUEUE_PROP_NAME = "useKQueue"; | ||
|
||
public static final String USE_IOURING_PROP_NAME = "useIoUring"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your recent comments Franz noted flagging everything with caution that its experimental...this might be a simple spot to trivially bang the point home to a user, making the option name reflect it...e.g "useIoUringExperimental" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I disagree, i think what franz has here is fine/better, and that should just be clear in the docs, else we end up with mangled config , it will be off by default anyhow so someone will have to have read the docs to know its there of which it will be mentioned very clearly this is an incubator feature, and explicitly turned it on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having experimental bits using explicit non-final options or even APIs prior to completion to signal their status isnt mangled config IMO. Its fairly common, e.g the JDK has many instances of this. The explicit incubator GAV for the netty bit is another example. |
||
|
||
@Deprecated | ||
/** | ||
* @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME | ||
|
@@ -202,6 +204,8 @@ public class TransportConstants { | |
|
||
public static final boolean DEFAULT_USE_KQUEUE = true; | ||
|
||
public static final boolean DEFAULT_USE_IOURING = false; | ||
|
||
public static final boolean DEFAULT_USE_INVM = false; | ||
|
||
public static final boolean DEFAULT_USE_SERVLET = false; | ||
|
@@ -374,6 +378,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { | |
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); | ||
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); | ||
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); | ||
allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME); | ||
//noinspection deprecation | ||
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); | ||
allowableAcceptorKeys.add(TransportConstants.PROTOCOLS_PROP_NAME); | ||
|
@@ -443,6 +448,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { | |
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); | ||
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); | ||
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); | ||
allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME); | ||
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME); | ||
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); | ||
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,7 @@ | |
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
|
@@ -112,6 +113,7 @@ public class NettyAcceptor extends AbstractAcceptor { | |
public static final String NIO_ACCEPTOR_TYPE = "NIO"; | ||
public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL"; | ||
public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE"; | ||
public static final String IOURING_ACCEPTOR_TYPE = "IO_URING"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just IOURING we dont add under score seperator for KQueue or EPoll. Just also avoids any escaping issues on urls or other things in config urls or admin console There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps Franz wanted to avoid a ring of IOUs...but more probably its just that io_uring actually has the underscore in its name whilst I dont think the others do. Reasonable point about URL based config...though I thought it was just a boolean there? |
||
|
||
static { | ||
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use | ||
|
@@ -148,6 +150,8 @@ public class NettyAcceptor extends AbstractAcceptor { | |
|
||
private final boolean useKQueue; | ||
|
||
private final boolean useIoUring; | ||
|
||
private final ProtocolHandler protocolHandler; | ||
|
||
private final String host; | ||
|
@@ -268,6 +272,7 @@ public NettyAcceptor(final String name, | |
|
||
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); | ||
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); | ||
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); | ||
|
||
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); | ||
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); | ||
|
@@ -401,6 +406,21 @@ public ActiveMQThreadFactory run() { | |
acceptorType = KQUEUE_ACCEPTOR_TYPE; | ||
|
||
logger.debug("Acceptor using native kqueue"); | ||
} else if (useIoUring && CheckDependencies.isIoUringAvailable()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect io uring to be primary option later with fallback to epoll and then nio, as such maybe this should be first in order of preference. Its off by default but this way going forwards correct preference order would exist. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes :) correct There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Damn karaf and its feature checks :O it works for reflection too? :O There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can probably add some metadata to artemis-server-osgi let it know the io.netty.incubator.channel.uring package usages are optional. |
||
channelClazz = (Class<? extends ServerChannel>) Class.forName("io.netty.incubator.channel.uring.IOUringServerSocketChannel", | ||
true, ClientSessionFactoryImpl.class.getClassLoader()); | ||
eventLoopGroup = (EventLoopGroup) Class.forName("io.netty.incubator.channel.uring.IOUringEventLoopGroup", | ||
true, ClientSessionFactoryImpl.class.getClassLoader()) | ||
.getConstructor(int.class, ThreadFactory.class) | ||
.newInstance(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() { | ||
@Override | ||
public ActiveMQThreadFactory run() { | ||
return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()); | ||
} | ||
})); | ||
acceptorType = IOURING_ACCEPTOR_TYPE; | ||
|
||
logger.debug("Acceptor using native io_uring"); | ||
} else { | ||
channelClazz = NioServerSocketChannel.class; | ||
eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put in correct order place in file based on id. See other comment for why