Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide the remaining buffer when the NIOAsyncSequenceProducer termin… #3111

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ public protocol NIOAsyncSequenceProducerDelegate: Sendable {
///
/// - Note: This is guaranteed to be called _exactly_ once.
func didTerminate()

/// This method is called once the ``NIOAsyncSequenceProducer`` is terminated.
///
/// Termination happens if:
/// - The ``NIOAsyncSequenceProducer/AsyncIterator`` is deinited.
/// - The ``NIOAsyncSequenceProducer`` deinited and no iterator is alive.
/// - The consuming `Task` is cancelled (e.g. `for await let element in`).
/// - The source finished and all remaining buffered elements have been consumed.
///
/// - Note: This is guaranteed to be called _exactly_ once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the note on the other didTerminate to indicate that it is no longer called, but if no implementation of didTerminate(remainingBuffer:) is provided this will be called from there in the default implementation.

func didTerminate(remainingBuffer: some Collection<Any>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to box the elements? Why isn't this just some Collection?

}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOAsyncSequenceProducerDelegate {
public func didTerminate(remainingBuffer: some Collection<Any>) { self.didTerminate() }
}

/// This is an `AsyncSequence` that supports a unicast `AsyncIterator`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,21 +448,21 @@ extension NIOThrowingAsyncSequenceProducer {

@inlinable
internal func sequenceDeinitialized() {
let delegate: Delegate? = self._state.withLockedValue {
let (delegate, buffer): (Delegate?, Deque<Element>) = self._state.withLockedValue {
let action = $0.stateMachine.sequenceDeinitialized()

switch action {
case .callDidTerminate:
case .callDidTerminate(let buffer):
let delegate = $0.delegate
$0.delegate = nil
return delegate
return (delegate, buffer)

case .none:
return nil
return (nil, .init())
}
}

delegate?.didTerminate()
delegate?.didTerminate(remainingBuffer: buffer.map { $0 })
}

@inlinable
Expand All @@ -474,22 +474,22 @@ extension NIOThrowingAsyncSequenceProducer {

@inlinable
internal func iteratorDeinitialized() {
let delegate: Delegate? = self._state.withLockedValue {
let (delegate, buffer): (Delegate?, Deque<Element>) = self._state.withLockedValue {
let action = $0.stateMachine.iteratorDeinitialized()

switch action {
case .callDidTerminate:
case .callDidTerminate(let buffer):
let delegate = $0.delegate
$0.delegate = nil

return delegate
return (delegate, buffer)

case .none:
return nil
return (nil, .init())
}
}

delegate?.didTerminate()
delegate?.didTerminate(remainingBuffer: buffer.map { $0 } )
}

@inlinable
Expand Down Expand Up @@ -557,7 +557,8 @@ extension NIOThrowingAsyncSequenceProducer {
break
}

delegate?.didTerminate()
// We don't have a buffer in this case
delegate?.didTerminate(remainingBuffer: [])
}

@inlinable
Expand Down Expand Up @@ -585,15 +586,15 @@ extension NIOThrowingAsyncSequenceProducer {

return element

case .returnFailureAndCallDidTerminate(let failure):
case .returnFailureAndCallDidTerminate(let failure, let buffer):
let delegate = unsafe.withValueAssumingLockIsAcquired {
let delegate = $0.delegate
$0.delegate = nil
return delegate
}
unsafe.unlock()

delegate?.didTerminate()
delegate?.didTerminate(remainingBuffer: buffer.map { $0 })

switch failure {
case .some(let error):
Expand Down Expand Up @@ -679,10 +680,10 @@ extension NIOThrowingAsyncSequenceProducer {
}

switch action {
case .callDidTerminate:
break
case .callDidTerminate(let buffer):
delegate?.didTerminate(remainingBuffer: buffer.map { $0 })

case .resumeContinuationWithCancellationErrorAndCallDidTerminate(let continuation):
case .resumeContinuationWithCancellationErrorAndCallDidTerminate(let continuation, let buffer):
// We have deprecated the generic Failure type in the public API and Failure should
// now be `Swift.Error`. However, if users have not migrated to the new API they could
// still use a custom generic Error type and this cast might fail.
Expand All @@ -696,12 +697,12 @@ extension NIOThrowingAsyncSequenceProducer {
} else {
continuation.resume(returning: nil)
}

delegate?.didTerminate(remainingBuffer: buffer.map { $0 })

case .none:
break
delegate?.didTerminate()
}

delegate?.didTerminate()
}
}
}
Expand Down Expand Up @@ -781,7 +782,7 @@ extension NIOThrowingAsyncSequenceProducer {
@usableFromInline
enum SequenceDeinitializedAction {
/// Indicates that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called.
case callDidTerminate
case callDidTerminate(buffer: Deque<Element>)
/// Indicates that nothing should be done.
case none
}
Expand All @@ -790,13 +791,14 @@ extension NIOThrowingAsyncSequenceProducer {
mutating func sequenceDeinitialized() -> SequenceDeinitializedAction {
switch self._state {
case .initial(_, iteratorInitialized: false),
.streaming(_, _, _, _, iteratorInitialized: false),
.sourceFinished(_, iteratorInitialized: false, _),
.cancelled(iteratorInitialized: false):
// No iterator was created so we can transition to finished right away.
self._state = .finished(iteratorInitialized: false)

return .callDidTerminate
return .callDidTerminate(buffer: .init())

case .streaming(_, let buffer, _, _, iteratorInitialized: false),
.sourceFinished(let buffer, iteratorInitialized: false, _):
return .callDidTerminate(buffer: buffer)

case .initial(_, iteratorInitialized: true),
.streaming(_, _, _, _, iteratorInitialized: true),
Expand Down Expand Up @@ -871,7 +873,7 @@ extension NIOThrowingAsyncSequenceProducer {
@usableFromInline
enum IteratorDeinitializedAction {
/// Indicates that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called.
case callDidTerminate
case callDidTerminate(buffer: Deque<Element>)
/// Indicates that nothing should be done.
case none
}
Expand All @@ -887,14 +889,17 @@ extension NIOThrowingAsyncSequenceProducer {
preconditionFailure("Internal inconsistency")

case .initial(_, iteratorInitialized: true),
.streaming(_, _, _, _, iteratorInitialized: true),
.sourceFinished(_, iteratorInitialized: true, _),
.cancelled(iteratorInitialized: true):
// An iterator was created and deinited. Since we only support
// a single iterator we can now transition to finish and inform the delegate.
self._state = .finished(iteratorInitialized: true)

return .callDidTerminate
return .callDidTerminate(buffer: .init())

case .streaming(_, let buffer, _, _, iteratorInitialized: true),
.sourceFinished(let buffer, iteratorInitialized: true, _):

return .callDidTerminate(buffer: buffer)

case .finished:
// We are already finished so there is nothing left to clean up.
Expand Down Expand Up @@ -1091,10 +1096,10 @@ extension NIOThrowingAsyncSequenceProducer {
@usableFromInline
enum CancelledAction {
/// Indicates that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called.
case callDidTerminate
case callDidTerminate(buffer: Deque<Element>)
/// Indicates that the continuation should be resumed with a `CancellationError` and
/// that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called.
case resumeContinuationWithCancellationErrorAndCallDidTerminate(CheckedContinuation<Element?, Error>)
case resumeContinuationWithCancellationErrorAndCallDidTerminate(CheckedContinuation<Element?, Error>, buffer: Deque<Element>)
/// Indicates that nothing should be done.
case none
}
Expand Down Expand Up @@ -1125,20 +1130,20 @@ extension NIOThrowingAsyncSequenceProducer {

return .none

case .streaming(_, _, .some(let continuation), _, let iteratorInitialized):
case .streaming(_, let buffer, .some(let continuation), _, let iteratorInitialized):
// We have an outstanding continuation that needs to resumed
// and we can transition to finished here and inform the delegate
self._state = .finished(iteratorInitialized: iteratorInitialized)

return .resumeContinuationWithCancellationErrorAndCallDidTerminate(continuation)
return .resumeContinuationWithCancellationErrorAndCallDidTerminate(continuation, buffer: buffer)

case .streaming(_, _, continuation: .none, _, let iteratorInitialized):
case .streaming(_, let buffer, continuation: .none, _, let iteratorInitialized):
// We may have elements in the buffer, which is why we have no continuation
// waiting. We must store the cancellation error to hand it out on the next
// next() call.
self._state = .cancelled(iteratorInitialized: iteratorInitialized)

return .callDidTerminate
return .callDidTerminate(buffer: buffer)

case .cancelled, .sourceFinished, .finished:
// If the source has finished, finishing again has no effect.
Expand All @@ -1159,7 +1164,7 @@ extension NIOThrowingAsyncSequenceProducer {
case returnElementAndCallProduceMore(Element)
/// Indicates that the `Failure` should be returned to the caller and
/// that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called.
case returnFailureAndCallDidTerminate(Failure?)
case returnFailureAndCallDidTerminate(Failure?, buffer: Deque<Element>)
/// Indicates that the next call to AsyncSequence got cancelled
case returnCancellationError
/// Indicates that the `nil` should be returned to the caller.
Expand Down Expand Up @@ -1248,7 +1253,7 @@ extension NIOThrowingAsyncSequenceProducer {
// We are returning the queued failure now and can transition to finished
self._state = .finished(iteratorInitialized: iteratorInitialized)

return .returnFailureAndCallDidTerminate(failure)
return .returnFailureAndCallDidTerminate(failure, buffer: buffer)
}

case .cancelled(let iteratorInitialized):
Expand Down
Loading
Loading