Skip to content

Commit 211a22d

Browse files
authored
Simplify stream transport states (#630)
* `ReadClosed`/`WriteClosed` -> `Closed` (they're always set together) * `atEof` doesn't need to check transport state * remove redundant while looping in `readStreamLoop` (it's done by `handleEintr`) * move error handling to `resumeRead` / `resumeWrite` and make usre it matches loop version * clear reader future on read cancellation (no need to hold on to memory) * document closeFd
1 parent d3a5959 commit 211a22d

4 files changed

Lines changed: 133 additions & 182 deletions

File tree

chronos/internal/asyncengine.nim

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,6 @@ proc raiseOsDefect*(error: OSErrorCode, msg = "") {.noreturn, noinline.} =
163163
raise (ref Defect)(msg: msg & "\n[" & $int(error) & "] " & osErrorMsg(error) &
164164
"\n" & getStackTrace())
165165

166-
func toPointer(error: OSErrorCode): pointer =
167-
when sizeof(int) == 8:
168-
cast[pointer](uint64(uint32(error)))
169-
else:
170-
cast[pointer](uint32(error))
171-
172166
func toException*(v: OSErrorCode): ref OSError = newOSError(v)
173167
# This helper will allow to use `tryGet()` and raise OSError for
174168
# Result[T, OSErrorCode] values.
@@ -672,30 +666,25 @@ elif defined(windows):
672666
## Closes a socket and ensures that it is unregistered.
673667
let loop = getThreadDispatcher()
674668
loop.handles.excl(fd)
675-
let
676-
param = toPointer(
677-
if closeFd(SocketHandle(fd)) == 0:
678-
OSErrorCode(0)
679-
else:
680-
osLastError()
681-
)
669+
# Because we don't set SO_LINGER, `closeFd` should always succeed.
670+
# Future API might be added to recover linger errors, but this is currently
671+
# not supported so we discard the return value.
672+
discard closeFd(SocketHandle(fd))
682673
if not(isNil(aftercb)):
683-
loop.callbacks.addLast(AsyncCallback(function: aftercb, udata: param))
674+
loop.callbacks.addLast(AsyncCallback(function: aftercb))
684675

685676
proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
686677
## Closes a (pipe/file) handle and ensures that it is unregistered.
687678
let loop = getThreadDispatcher()
688679
loop.handles.excl(fd)
689-
let
690-
param = toPointer(
691-
if closeFd(HANDLE(fd)) == 0:
692-
OSErrorCode(0)
693-
else:
694-
osLastError()
695-
)
680+
# Closing the handle should always succeed since async failures are reported
681+
# via IOCP (pending operations cancelled on closed) and the documentation
682+
# for CloseHandle does not specify and other specific conditions where it
683+
# might fail.
684+
discard closeFd(HANDLE(fd))
696685

697686
if not(isNil(aftercb)):
698-
loop.callbacks.addLast(AsyncCallback(function: aftercb, udata: param))
687+
loop.callbacks.addLast(AsyncCallback(function: aftercb))
699688

700689
proc unregisterAndCloseFd*(fd: AsyncFD): Result[void, OSErrorCode] =
701690
## Unregister from system queue and close asynchronous socket.
@@ -890,22 +879,20 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
890879
let loop = getThreadDispatcher()
891880

892881
proc continuation(udata: pointer) =
893-
let
894-
param = toPointer(
895-
if SocketHandle(fd) in loop.selector:
896-
let ures = unregister2(fd)
897-
if ures.isErr():
898-
discard closeFd(cint(fd))
899-
ures.error()
900-
else:
901-
if closeFd(cint(fd)) != 0:
902-
osLastError()
903-
else:
904-
OSErrorCode(0)
905-
else:
906-
osdefs.EBADF
907-
)
908-
if not(isNil(aftercb)): aftercb(param)
882+
if SocketHandle(fd) in loop.selector:
883+
discard unregister2(fd)
884+
# `closeFd` might fail if an I/O error occurs during an async I/O
885+
# operation, but on *most* posix systems this still results in the file
886+
# descriptor being closed regardless of the error.
887+
#
888+
# For sockets in parituclar, we don't set SO_LINGER meaning that close
889+
# happens immediately and does not wait for the socket to go through
890+
# its graceful shutdown sequence.
891+
#
892+
# We currently don't have an API for returning this error to the caller
893+
# so we discard it here - future work might expose it.
894+
discard closeFd(cint(fd))
895+
if not(isNil(aftercb)): aftercb(nil)
909896

910897
withData(loop.selector, cint(fd), adata) do:
911898
# We are scheduling reader and writer callbacks to be called

chronos/transports/common.nim

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,19 +121,17 @@ type
121121
TransportAbortedError* = object of TransportError
122122
## Remote client disconnected before server accepts connection
123123

124-
TransportState* = enum
124+
TransportState* {.pure.} = enum
125125
## Transport's state
126126
ReadPending, # Read operation pending (Windows)
127127
ReadPaused, # Read operations paused
128-
ReadClosed, # Read operations closed
129128
ReadEof, # Read at EOF
130129
ReadError, # Read error
131130
WritePending, # Writer operation pending (Windows)
132131
WritePaused, # Writer operations paused
133-
WriteClosed, # Writer operations closed
134132
WriteEof, # Remote peer disconnected
135133
WriteError # Write error
136-
134+
Closed # Transport was closed
137135
var
138136
AnyAddress* = TransportAddress(family: AddressFamily.IPv4, port: Port(0))
139137
## Default INADDR_ANY address for IPv4
@@ -568,11 +566,11 @@ proc anyAddressFix*(a: TransportAddress): TransportAddress =
568566
a
569567

570568
template checkClosed*(t: untyped) =
571-
if (ReadClosed in (t).state) or (WriteClosed in (t).state):
569+
if TransportState.Closed in (t).state:
572570
raise newException(TransportUseClosedError, "Transport is already closed!")
573571

574572
template checkClosed*(t: untyped, future: untyped) =
575-
if (ReadClosed in (t).state) or (WriteClosed in (t).state):
573+
if TransportState.Closed in (t).state:
576574
future.fail(newException(TransportUseClosedError,
577575
"Transport is already closed!"))
578576
return future

chronos/transports/datagram.nim

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,10 @@ when defined(windows):
229229
of ERROR_OPERATION_ABORTED:
230230
# CancelIO() interrupt or closeSocket() call.
231231
transp.state.incl(ReadPaused)
232-
if ReadClosed in transp.state and not(transp.future.finished()):
232+
if TransportState.Closed in transp.state and not(transp.future.finished()):
233233
# Stop tracking transport
234234
untrackCounter(DgramTransportTrackerName)
235-
# If `ReadClosed` present, then close(transport) was called.
235+
# If `Closed` present, then close(transport) was called.
236236
transp.future.complete()
237237
GC_unref(transp)
238238
break
@@ -243,7 +243,7 @@ when defined(windows):
243243
asyncSpawn transp.function(transp, remoteAddress)
244244
else:
245245
## Initiation
246-
if transp.state * {ReadEof, ReadClosed, ReadError} == {}:
246+
if transp.state * {ReadEof, ReadError, TransportState.Closed} == {}:
247247
transp.state.incl(ReadPending)
248248
let fd = SocketHandle(transp.fd)
249249
transp.rflag = 0
@@ -275,7 +275,7 @@ when defined(windows):
275275
else:
276276
# Transport closure happens in callback, and we not started new
277277
# WSARecvFrom session.
278-
if ReadClosed in transp.state and not(transp.future.finished()):
278+
if TransportState.Closed in transp.state and not(transp.future.finished()):
279279
# Stop tracking transport
280280
untrackCounter(DgramTransportTrackerName)
281281
transp.future.complete()
@@ -446,7 +446,7 @@ else:
446446
## This situation can be happen, when there events present
447447
## after transport was closed.
448448
return
449-
if ReadClosed in transp.state:
449+
if TransportState.Closed in transp.state:
450450
transp.state.incl({ReadPaused})
451451
else:
452452
while true:
@@ -479,7 +479,7 @@ else:
479479
## This situation can be happen, when there events present
480480
## after transport was closed.
481481
return
482-
if WriteClosed in transp.state:
482+
if TransportState.Closed in transp.state:
483483
transp.state.incl({WritePaused})
484484
else:
485485
if len(transp.queue) > 0:
@@ -655,9 +655,10 @@ proc close*(transp: DatagramTransport) =
655655
transp.future.complete()
656656
GC_unref(transp)
657657

658-
when defined(windows):
659-
if {ReadClosed, WriteClosed} * transp.state == {}:
660-
transp.state.incl({WriteClosed, ReadClosed})
658+
if TransportState.Closed notin transp.state:
659+
transp.state.incl(TransportState.Closed)
660+
661+
when defined(windows):
661662
if ReadPaused in transp.state:
662663
# If readDatagramLoop() is not running we need to finish in
663664
# continuation step.
@@ -666,10 +667,8 @@ proc close*(transp: DatagramTransport) =
666667
# If readDatagramLoop() is running, it will be properly finished inside
667668
# of readDatagramLoop().
668669
closeSocket(transp.fd)
669-
else:
670-
if {ReadClosed, WriteClosed} * transp.state == {}:
671-
transp.state.incl({WriteClosed, ReadClosed})
672-
closeSocket(transp.fd, continuation)
670+
else:
671+
closeSocket(transp.fd, continuation)
673672

674673
proc getTransportAddresses(
675674
local, remote: Opt[IpAddress],
@@ -1001,7 +1000,7 @@ proc join*(transp: DatagramTransport): Future[void] {.
10011000

10021001
proc closed*(transp: DatagramTransport): bool {.inline.} =
10031002
## Returns ``true`` if transport in closed state.
1004-
{ReadClosed, WriteClosed} * transp.state != {}
1003+
TransportState.Closed in transp.state
10051004

10061005
proc closeWait*(transp: DatagramTransport): Future[void] {.
10071006
async: (raises: []).} =

0 commit comments

Comments
 (0)