Skip to content

Commit 27947cd

Browse files
authored
VirtualMachineInstance: Remove stopListen (#412)
1 parent 0e8a779 commit 27947cd

File tree

5 files changed

+36
-34
lines changed

5 files changed

+36
-34
lines changed

Sources/Containerization/LinuxProcess.swift

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,17 @@ public final class LinuxProcess: Sendable {
124124
}
125125

126126
extension LinuxProcess {
127-
func setupIO(streams: [VsockConnectionStream?]) async throws -> [FileHandle?] {
127+
func setupIO(listeners: [VsockListener?]) async throws -> [FileHandle?] {
128128
let handles = try await Timeout.run(seconds: 3) {
129129
try await withThrowingTaskGroup(of: (Int, FileHandle?).self) { group in
130130
var results = [FileHandle?](repeating: nil, count: 3)
131131

132-
for (index, stream) in streams.enumerated() {
133-
guard let stream = stream else { continue }
132+
for (index, listener) in listeners.enumerated() {
133+
guard let listener else { continue }
134134

135135
group.addTask {
136-
let first = await stream.first(where: { _ in true })
137-
stream.finish()
138-
try self.vm.stopListen(stream.port)
136+
let first = await listener.first(where: { _ in true })
137+
try listener.finish()
139138
return (index, first)
140139
}
141140
}
@@ -236,12 +235,12 @@ extension LinuxProcess {
236235
public func start() async throws {
237236
do {
238237
let spec = self.state.withLock { $0.spec }
239-
var streams = [VsockConnectionStream?](repeating: nil, count: 3)
238+
var listeners = [VsockListener?](repeating: nil, count: 3)
240239
if let stdin = self.ioSetup.stdin {
241-
streams[0] = try self.vm.listen(stdin.port)
240+
listeners[0] = try self.vm.listen(stdin.port)
242241
}
243242
if let stdout = self.ioSetup.stdout {
244-
streams[1] = try self.vm.listen(stdout.port)
243+
listeners[1] = try self.vm.listen(stdout.port)
245244
}
246245
if let stderr = self.ioSetup.stderr {
247246
if spec.process!.terminal {
@@ -250,11 +249,11 @@ extension LinuxProcess {
250249
message: "stderr should not be configured with terminal=true"
251250
)
252251
}
253-
streams[2] = try self.vm.listen(stderr.port)
252+
listeners[2] = try self.vm.listen(stderr.port)
254253
}
255254

256255
let t = Task {
257-
try await self.setupIO(streams: streams)
256+
try await self.setupIO(listeners: listeners)
258257
}
259258

260259
try await agent.createProcess(

Sources/Containerization/UnixSocketRelay.swift

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ package final class SocketRelay: Sendable {
8888
private struct State {
8989
var relaySources: [String: ConnectionSources] = [:]
9090
var t: Task<(), Never>? = nil
91+
var listener: VsockListener? = nil
9192
}
9293

9394
// `DispatchSourceRead` is thread-safe.
@@ -137,15 +138,15 @@ extension SocketRelay {
137138
t.cancel()
138139
$0.t = nil
139140
$0.relaySources.removeAll()
140-
}
141141

142-
switch configuration.direction {
143-
case .outOf:
144-
// If we created the host conn, lets unlink it also. It's possible it was
145-
// already unlinked if the relay failed earlier.
146-
try? FileManager.default.removeItem(at: self.configuration.destination)
147-
case .into:
148-
try self.vm.stopListen(self.port)
142+
switch configuration.direction {
143+
case .outOf:
144+
// If we created the host conn, lets unlink it also. It's possible it was
145+
// already unlinked if the relay failed earlier.
146+
try? FileManager.default.removeItem(at: self.configuration.destination)
147+
case .into:
148+
try $0.listener?.finish()
149+
}
149150
}
150151
}
151152

@@ -190,18 +191,20 @@ extension SocketRelay {
190191
let port = self.port
191192
let log = self.log
192193

193-
let connectionStream = try self.vm.listen(self.port)
194+
let listener = try self.vm.listen(self.port)
194195
log?.info(
195196
"listening on guest vsock",
196197
metadata: [
197198
"path": "\(hostPath)",
198199
"vport": "\(port)",
199200
])
201+
200202
self.state.withLock {
203+
$0.listener = listener
201204
$0.t = Task {
202205
do {
203-
defer { connectionStream.finish() }
204-
for await connection in connectionStream {
206+
defer { try? listener.finish() }
207+
for await connection in listener {
205208
try await self.handleGuestVsockConn(
206209
vsockConn: connection,
207210
hostConnectionPath: hostPath,

Sources/Containerization/VZVirtualMachineInstance.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,8 @@ extension VZVirtualMachineInstance: VirtualMachineInstance {
226226
}
227227
}
228228

229-
func listen(_ port: UInt32) throws -> VsockConnectionStream {
230-
let stream = VsockConnectionStream(port: port)
229+
func listen(_ port: UInt32) throws -> VsockListener {
230+
let stream = VsockListener(port: port, stopListen: self.stopListen)
231231
let listener = VZVirtioSocketListener()
232232
listener.delegate = stream
233233

@@ -239,7 +239,7 @@ extension VZVirtualMachineInstance: VirtualMachineInstance {
239239
return stream
240240
}
241241

242-
func stopListen(_ port: UInt32) throws {
242+
private func stopListen(_ port: UInt32) throws {
243243
try self.vm.removeListener(
244244
queue: queue,
245245
port: port

Sources/Containerization/VirtualMachineInstance.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ public protocol VirtualMachineInstance: Sendable {
4040
/// Dial a vsock port in the guest.
4141
func dial(_ port: UInt32) async throws -> FileHandle
4242
/// Listen on a host vsock port.
43-
func listen(_ port: UInt32) throws -> VsockConnectionStream
44-
/// Stop listening on a vsock port.
45-
func stopListen(_ port: UInt32) throws
43+
func listen(_ port: UInt32) throws -> VsockListener
4644
/// Start the virtual machine.
4745
func start() async throws
4846
/// Stop the virtual machine.

Sources/Containerization/VsockConnectionStream.swift renamed to Sources/Containerization/VsockListener.swift

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,27 @@ import Virtualization
2121
#endif
2222

2323
/// A stream of vsock connections.
24-
public final class VsockConnectionStream: NSObject, Sendable, AsyncSequence {
24+
public final class VsockListener: NSObject, Sendable, AsyncSequence {
2525
public typealias Element = FileHandle
2626

27-
/// A stream of connections dialed from the remote.
28-
private let connections: AsyncStream<FileHandle>
2927
/// The port the connections are for.
3028
public let port: UInt32
3129

30+
private let connections: AsyncStream<FileHandle>
3231
private let cont: AsyncStream<FileHandle>.Continuation
32+
private let stopListening: @Sendable (_ port: UInt32) throws -> Void
3333

34-
public init(port: UInt32) {
34+
package init(port: UInt32, stopListen: @Sendable @escaping (_ port: UInt32) throws -> Void) {
3535
self.port = port
3636
let (stream, continuation) = AsyncStream.makeStream(of: FileHandle.self)
3737
self.connections = stream
3838
self.cont = continuation
39+
self.stopListening = stopListen
3940
}
4041

41-
public func finish() {
42+
public func finish() throws {
4243
self.cont.finish()
44+
try self.stopListening(self.port)
4345
}
4446

4547
public func makeAsyncIterator() -> AsyncStream<FileHandle>.AsyncIterator {
@@ -49,7 +51,7 @@ public final class VsockConnectionStream: NSObject, Sendable, AsyncSequence {
4951

5052
#if os(macOS)
5153

52-
extension VsockConnectionStream: VZVirtioSocketListenerDelegate {
54+
extension VsockListener: VZVirtioSocketListenerDelegate {
5355
public func listener(
5456
_: VZVirtioSocketListener, shouldAcceptNewConnection conn: VZVirtioSocketConnection,
5557
from _: VZVirtioSocketDevice

0 commit comments

Comments
 (0)