Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
1cd3d23
Initial wip using builder pattern
mpilquist Apr 8, 2025
5e5b39e
Generalize NeedAddress
mpilquist Apr 8, 2025
5e27b71
Dump builder pattern
mpilquist Apr 8, 2025
a86b18d
wip
mpilquist Apr 8, 2025
c06e969
wip
mpilquist Apr 9, 2025
eff4b58
wip
mpilquist Apr 9, 2025
89362b0
wip
mpilquist Apr 10, 2025
fcd9233
wip
mpilquist Apr 10, 2025
5e912ab
Drop JVM socket group implementations
mpilquist Apr 10, 2025
7081b7a
Unix socket tests
mpilquist Apr 10, 2025
dedda55
Unix socket tests
mpilquist Apr 10, 2025
17ade5c
Cleanup Bind
mpilquist Apr 10, 2025
e2fafad
s/Bind/ServerSocket/
mpilquist Apr 10, 2025
fad0e04
Push old localAddress back down to Socket
mpilquist Apr 10, 2025
598d3ed
Native
mpilquist Apr 11, 2025
b649d00
JS tests passing
mpilquist Apr 16, 2025
4a00566
JS duplication reduction
mpilquist Apr 16, 2025
e22041b
Scalafmt
mpilquist Apr 16, 2025
906d8d5
Fix compilation errors
mpilquist Apr 17, 2025
058e616
Implement getLocalAddressGen on JVM
mpilquist Apr 17, 2025
7648b71
Cleanup
mpilquist Apr 18, 2025
01322d5
Progress on fixing addresses
mpilquist Apr 18, 2025
42773ce
Fixed addresses
mpilquist Apr 18, 2025
6de7fbe
Scalafmt
mpilquist Apr 18, 2025
4399a1a
Test unix socket addresses
mpilquist Apr 18, 2025
18eb5d2
IP address tests
mpilquist Apr 18, 2025
1e9703e
Socket option cleanup
mpilquist Apr 18, 2025
f27f191
Socket option cleanup
mpilquist Apr 18, 2025
5285acb
Socket option cleanup
mpilquist Apr 18, 2025
78bc8cc
Deprecate old socket group methods
mpilquist Apr 18, 2025
a0ab96b
Address cleanup
mpilquist Apr 19, 2025
8c28c69
Add address and peerAddress, deprecate localAddress and remoteAddress
mpilquist Apr 20, 2025
6533318
Deprecate Socket#isOpen
mpilquist Apr 21, 2025
e7ca920
Remove some unnecesssary changes from net facade
mpilquist Apr 21, 2025
f4bc9c8
Cleanup in selecting ip sockets provider
mpilquist Apr 21, 2025
896738a
Unify Network implementations
mpilquist Apr 21, 2025
e47817c
Fix 2.12 compilation
mpilquist Apr 21, 2025
f3c5f70
Update to ip4s 3.7.0
mpilquist Apr 21, 2025
0ee5c57
Fix JS 2.12 compilation
mpilquist Apr 21, 2025
0807afb
Mima fixes
mpilquist Apr 21, 2025
91c25f6
Fix native 2.12 warnings
mpilquist Apr 21, 2025
861f7ea
Fix selecting socket address NPE
mpilquist Apr 21, 2025
7395a77
Fix JVM unix sockets test
mpilquist Apr 21, 2025
8a0df13
Fix site docs
mpilquist Apr 21, 2025
47815b2
Deprecate old UnixSockets
mpilquist Apr 21, 2025
5af859e
Set client socket options on JVM Unix
mpilquist Apr 22, 2025
e09f425
Remove explicit DNS lookups from JS IP socket connect & bind
mpilquist Apr 23, 2025
c063029
Scalafmt
mpilquist Apr 23, 2025
6c7f330
Make SO_REUSEPORT lazy loaded
mpilquist May 5, 2025
21af914
Merge branch 'main' into topic/net2
mpilquist May 9, 2025
3df2f3d
Revamped datagram support
mpilquist Jun 4, 2025
9ef0cf7
Remove accidentally added PeerCredentials
mpilquist Jun 4, 2025
72b1197
Fix test compilation
mpilquist Jun 4, 2025
7a83700
Fix mima warnings
mpilquist Jun 4, 2025
4248d68
Scalafmt
mpilquist Jun 4, 2025
e6aeb4d
Fix warnings
mpilquist Jun 4, 2025
e0d78b7
Fix warnings
mpilquist Jun 4, 2025
8542919
Add temp debug to UnixDatagramSuite
mpilquist Jun 5, 2025
dac60c1
Scalafmt
mpilquist Jun 5, 2025
d963953
Debug
mpilquist Jun 5, 2025
73f640d
Debug
mpilquist Jun 5, 2025
e02d9e5
Exclude UnixDatagramSuite from Linux due to bug in jnr-unixsocket
mpilquist Jun 5, 2025
8488321
Bridge deprecated datagram soccket options
mpilquist Jun 6, 2025
645f93f
Docs
mpilquist Jun 9, 2025
4d2924c
Scalafmt
mpilquist Jun 9, 2025
855070e
Scalafmt
mpilquist Jun 9, 2025
c4c9ee3
Add support for ip4s NetworkInterface
mpilquist Jun 22, 2025
562cbc5
Mima
mpilquist Jun 22, 2025
e8721b0
Bump to ip4s 3.8.0-RC1
mpilquist Jun 26, 2025
05e5e99
Bump to ip4s 3.8.0-RC1
mpilquist Jun 26, 2025
e6a8728
Merge branch 'main' into topic/net2
mpilquist Jun 26, 2025
426ceaa
Merge branch 'main' into topic/net2
mpilquist Jul 11, 2025
80642f9
Merge branch 'main' into topic/net2
mpilquist Aug 4, 2025
0bf4faf
Merge branch 'main' into topic/net2
mpilquist Aug 25, 2025
1153afa
Merge branch 'main' into topic/net2
mpilquist Aug 25, 2025
6e49f13
Scalafmt
mpilquist Aug 25, 2025
02d73e5
Downgrade GHA runner for macos to fix multicast tests
mpilquist Aug 25, 2025
b05bd18
Merge branch 'main' into topic/net2
mpilquist Aug 26, 2025
de6e955
Address deprecation warnings
mpilquist Aug 26, 2025
cf74c2d
Merge branch 'main' into topic/net2
mpilquist Sep 1, 2025
fe722ce
Update fromKeyStoreFile to take a Path and improve error message from…
mpilquist Sep 2, 2025
17f6852
Fix 2.12 compilation
mpilquist Sep 2, 2025
01f139d
Fix spinloop bug in TLSEngine
mpilquist Sep 3, 2025
57ab186
Drop .only tag
mpilquist Sep 3, 2025
ea03a6e
Change byte limit in test
mpilquist Sep 3, 2025
1d91556
Change explicit intercept to attempt to handle behavior differences o…
mpilquist Sep 3, 2025
fad230e
Rewrote test to be an example of a client that only sends partial han…
mpilquist Sep 4, 2025
39f1318
Merge pull request #3599 from typelevel/topic/fix-spinloop-in-tlsengine
mpilquist Sep 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ jobs:
name: Test I/O on macOS
strategy:
matrix:
os: [macos-latest]
os: [macos-14]
java: [temurin@17]
project: [ioJS, ioJVM, ioNative]
runs-on: ${{ matrix.os }}
Expand Down
91 changes: 90 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ ThisBuild / githubWorkflowAddedJobs +=
scalas = Nil,
sbtStepPreamble = Nil,
javas = List(githubWorkflowJavaVersions.value.head),
oses = List("macos-latest"),
oses = List(
"macos-14"
), // FIXME: macos-15 breaks sending multicast to local network - https://github.com/actions/runner-images/issues/10924
matrixAdds = Map("project" -> List("ioJS", "ioJVM", "ioNative")),
steps = githubWorkflowJobSetup.value.toList ++ List(
WorkflowStep.Run(List("brew install s2n"), cond = Some("matrix.project == 'ioNative'")),
Expand Down Expand Up @@ -272,6 +274,93 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[MissingTypesProblem](
"fs2.interop.flow.StreamSubscriber$State$WaitingOnUpstream$"
),
// Network refactor: #3563
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.connect"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.bind"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.bindAndAccept"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Socket.address"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Socket.peerAddress"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.address"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.supportedOptions"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.getOption"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.setOption"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.io.net.SocketCompanionPlatform#AsyncSocket.this"
),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SocketGroup$AbstractAsyncSocketGroup"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SocketGroupCompanionPlatform"),
ProblemFilters.exclude[MissingClassProblem](
"fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup"
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.address"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.net.tls.TLSSocket.supportedOptions"
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.getOption"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.setOption"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSockets"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSockets$"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSocketsImpl"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSockets"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSockets$"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSocketsImpl"),
ProblemFilters.exclude[MissingClassProblem](
"fs2.io.net.unixsocket.UnixSocketsCompanionPlatform$AsyncSocket"
),
ProblemFilters.exclude[MissingClassProblem](
"fs2.io.net.unixsocket.UnixSocketsCompanionPlatform$AsyncUnixSockets"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.net.SelectingSocket.apply"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SelectingSocketGroup"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.net.Socket.forAsync"),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.net.SocketOptionCompanionPlatform#Key.get"
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.net.Network.openDatagramSocket"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.net.FdPollingSocket.apply"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.FdPollingSocketGroup"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.FdPollingUnixSockets"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"fs2.io.net.AsynchronousDatagramSocketGroup#WriterDatagram.remote"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"fs2.io.net.AsynchronousDatagramSocketGroup#WriterDatagram.this"
),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.address"),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.net.DatagramSocket.supportedOptions"
),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.getOption"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.setOption"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.readGen"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.connect"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.disconnect"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.write"),
ProblemFilters.exclude[MissingClassProblem](
"fs2.io.net.DatagramSocketGroupCompanionPlatform$AsyncDatagramSocketGroup"
),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.bindDatagramSocket"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SocketGroup$"),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.net.SocketOptionCompanionPlatform#Key.fs2$io$net$SocketOptionCompanionPlatform$Key$$$outer"
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.net.DatagramSocketOption#Key.toSocketOption"
),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.join"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"fs2.io.net.DatagramSocketOption.multicastInterface"
),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.dns"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.interfaces"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.net.tls.TLSContext#Builder.fromKeyStoreFile"
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.net.tls.TLSContext#Builder.fs2$io$net$tls$TLSContextCompanionPlatform$BuilderPlatform$$$outer"
)
)

Expand Down
65 changes: 0 additions & 65 deletions io/js-jvm/src/main/scala/fs2/io/net/Network.scala

This file was deleted.

129 changes: 83 additions & 46 deletions io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,29 @@ import com.comcast.ip4s._

import scala.concurrent.duration._

class UdpSuite extends Fs2Suite with UdpSuitePlatform {
def sendAndReceive(socket: DatagramSocket[IO], toSend: Datagram): IO[Datagram] =
class UdpSuite extends Fs2Suite {
private def sendAndReceive(socket: DatagramSocket[IO], toSend: Datagram): IO[Datagram] =
socket
.write(toSend) >> socket.read.timeoutTo(1.second, IO.defer(sendAndReceive(socket, toSend)))

private def sendAndReceiveBytes(socket: DatagramSocket[IO], bytes: Chunk[Byte]): IO[Datagram] =
socket
.write(bytes) >> socket.read.timeoutTo(1.second, IO.defer(sendAndReceiveBytes(socket, bytes)))

group("udp") {
test("echo one") {
val msg = Chunk.array("Hello, world!".getBytes)
Stream
.resource(Network[IO].openDatagramSocket())
.resource(Network[IO].bindDatagramSocket(SocketAddress.Wildcard))
.flatMap { serverSocket =>
Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
val serverAddress = SocketAddress(ip"127.0.0.1", serverPort)
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client = Stream.resource(Network[IO].openDatagramSocket()).evalMap { clientSocket =>
sendAndReceive(clientSocket, Datagram(serverAddress, msg))
val serverAddress = SocketAddress(ip"127.0.0.1", serverSocket.address.asIpUnsafe.port)
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client =
Stream.resource(Network[IO].bindDatagramSocket(SocketAddress.Wildcard)).evalMap {
clientSocket =>
sendAndReceive(clientSocket, Datagram(serverAddress, msg))
}
client.concurrently(server)
}
client.concurrently(server)
}
.compile
.lastOrError
Expand All @@ -67,55 +71,88 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
.sorted

Stream
.resource(Network[IO].openDatagramSocket())
.resource(Network[IO].bindDatagramSocket())
.flatMap { serverSocket =>
Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
val serverAddress = SocketAddress(ip"127.0.0.1", serverPort)
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket =>
Stream
.emits(msgs.map(msg => Datagram(serverAddress, msg)))
.evalMap(msg => sendAndReceive(clientSocket, msg))
}
val clients = Stream
.constant(client)
.take(numClients.toLong)
.parJoin(numParallelClients)
clients.concurrently(server)
val serverAddress = SocketAddress(ip"127.0.0.1", serverSocket.address.asIpUnsafe.port)
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client = Stream.resource(Network[IO].bindDatagramSocket()).flatMap { clientSocket =>
Stream
.emits(msgs.map(msg => Datagram(serverAddress, msg)))
.evalMap(msg => sendAndReceive(clientSocket, msg))
}
val clients = Stream
.constant(client)
.take(numClients.toLong)
.parJoin(numParallelClients)
clients.concurrently(server)
}
.compile
.toVector
.map(_.map(p => new String(p.bytes.toArray)).sorted)
.assertEquals(expected)
}

test("multicast".ignore) {
// Fails often based on routing table of host machine
val group = mip"232.10.10.10"
val groupJoin = MulticastJoin.asm(group)
test("echo connected") {
val msg = Chunk.array("Hello, world!".getBytes)
Stream
.resource(
Network[IO].openDatagramSocket(
options = List(DatagramSocketOption.multicastTtl(1)),
protocolFamily = Some(v4ProtocolFamily)
)
)
.resource(Network[IO].bindDatagramSocket())
.flatMap { serverSocket =>
Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
val server = Stream
.exec(
v4Interfaces.traverse_(interface => serverSocket.join(groupJoin, interface))
) ++
serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket =>
Stream(Datagram(SocketAddress(group.address, serverPort), msg))
.through(clientSocket.writes)
.drain ++ Stream.eval(clientSocket.read)
}
client.concurrently(server)
val serverAddress = serverSocket.address.asIpUnsafe
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client = Stream.resource(Network[IO].bindDatagramSocket()).evalMap { clientSocket =>
clientSocket.connect(serverAddress) >> sendAndReceiveBytes(clientSocket, msg)
}
client.concurrently(server)
}
.compile
.lastOrError
.map(_.bytes)
.assertEquals(msg)
}

test("multicast") {
val group = mip"239.10.10.10"
val groupJoin = MulticastJoin.asm(group)
val msg = Chunk.array("Hello, world!".getBytes)
val outgoingInterface =
// Get first non-loopback interface with an IPv4 address
Network[IO].interfaces.getAll.map { interfaces =>
interfaces.values
.filterNot(_.isLoopback)
.flatMap(iface =>
iface.addresses.filter(_.address.fold(_ => true, _ => false)).as(iface)
)
.head
}
Stream
.eval(outgoingInterface)
.flatMap { out =>
Stream
.resource(
Network[IO]
.bindDatagramSocket(
options = List(SocketOption.multicastTtl(1), SocketOption.multicastInterface(out))
)
.evalMap { serverSocket =>
Network[IO].interfaces.getAll.flatMap { interfaces =>
interfaces.values.toList
.filter(iface =>
iface.addresses.exists(_.address.fold(_ => true, _ => false))
)
.traverse_(iface => serverSocket.join(groupJoin, iface))
.as(serverSocket)
}
}
)
.flatMap { serverSocket =>
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
val client =
Stream.resource(Network[IO].bindDatagramSocket()).flatMap { clientSocket =>
val to = SocketAddress(group.address, serverSocket.address.asIpUnsafe.port)
Stream.eval(clientSocket.write(msg, to) >> clientSocket.read)
}
client.concurrently(server)
}
}
.compile
.lastOrError
Expand Down
16 changes: 14 additions & 2 deletions io/js/src/main/scala/fs2/io/internal/facade/dgram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ private[io] object dgram {

def bind(options: BindOptions, cb: js.Function0[Unit]): Unit = js.native

def connect(port: Int, address: String, cb: js.Function1[js.UndefOr[js.Error], Unit]): Unit =
js.native

def disconnect(): Unit = js.native

def addMembership(multicastAddress: String, multicastInterface: String): Unit = js.native

def dropMembership(multicastAddress: String, multicastInterface: String): Unit = js.native
Expand All @@ -63,6 +68,9 @@ private[io] object dgram {

def close(cb: js.Function0[Unit]): Unit = js.native

def send(msg: Uint8Array, cb: js.Function1[js.Error, Unit]): Unit =
js.native

def send(msg: Uint8Array, port: Int, address: String, cb: js.Function1[js.Error, Unit]): Unit =
js.native

Expand All @@ -74,8 +82,12 @@ private[io] object dgram {

def setMulticastTTL(ttl: Int): Unit = js.native

def getRecvBufferSize: Int = js.native

def setRecvBufferSize(size: Int): Unit = js.native

def getSendBufferSize: Int = js.native

def setSendBufferSize(size: Int): Unit = js.native

def setTTL(ttl: Int): Unit = js.native
Expand All @@ -85,7 +97,7 @@ private[io] object dgram {
@js.native
trait AddressInfo extends js.Object {
def address: String = js.native
def family: Int = js.native
def family: String = js.native
def port: Int = js.native
}

Expand All @@ -97,7 +109,7 @@ private[io] object dgram {
@js.native
trait RemoteInfo extends js.Object {
def address: String = js.native
def family: Int = js.native
def family: String = js.native
def port: Int = js.native
def size: Int = js.native
}
Expand Down
Loading