Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ public enum IOMode {
/**
* Native KQUEUE via JNI, MacOS/BSD only
*/
KQUEUE
KQUEUE,
/**
* Prefer to use native EPOLL on Linux (or KQUEUE on MacOS) if available. Then, fallback to NIO.
*/
AUTO
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueIoHandler;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.kqueue.KQueueSocketChannel;
Expand All @@ -35,7 +37,7 @@

/**
* Utilities for creating various Netty constructs based on whether we're using NIO, EPOLL,
* or KQUEUE.
* , KQUEUE, or AUTO.
*/
public class NettyUtils {

Expand Down Expand Up @@ -71,6 +73,15 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String
case NIO -> NioIoHandler.newFactory();
case EPOLL -> EpollIoHandler.newFactory();
case KQUEUE -> KQueueIoHandler.newFactory();
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollIoHandler.newFactory();
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueIoHandler.newFactory();
} else {
yield NioIoHandler.newFactory();
}
}
};
return new MultiThreadIoEventLoopGroup(numThreads, threadFactory, handlerFactory);
}
Expand All @@ -81,6 +92,15 @@ public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
case NIO -> NioSocketChannel.class;
case EPOLL -> EpollSocketChannel.class;
case KQUEUE -> KQueueSocketChannel.class;
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollSocketChannel.class;
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueSocketChannel.class;
} else {
yield NioSocketChannel.class;
}
}
};
}

Expand All @@ -90,6 +110,15 @@ public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode)
case NIO -> NioServerSocketChannel.class;
case EPOLL -> EpollServerSocketChannel.class;
case KQUEUE -> KQueueServerSocketChannel.class;
case AUTO -> {
if (JavaUtils.isLinux && Epoll.isAvailable()) {
yield EpollServerSocketChannel.class;
} else if (JavaUtils.isMac && KQueue.isAvailable()) {
yield KQueueServerSocketChannel.class;
} else {
yield NioServerSocketChannel.class;
}
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public String getModuleName() {
return module;
}

/** IO mode: NIO, EPOLL, or KQUEUE */
/** IO mode: NIO, EPOLL, KQUEUE, or AUTO */
public String ioMode() {
String defaultIOMode = conf.get(SPARK_NETWORK_DEFAULT_IO_MODE_KEY, "NIO");
return conf.get(SPARK_NETWORK_IO_MODE_KEY, defaultIOMode).toUpperCase(Locale.ROOT);
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,7 @@ class ShuffleNettyKQueueSuite extends ShuffleNettySuite {
override def shouldRunTests: Boolean = Utils.isMac
override def ioMode: IOMode = IOMode.KQUEUE
}

class ShuffleNettyAutoSuite extends ShuffleNettySuite {
override def ioMode: IOMode = IOMode.AUTO
}