diff --git a/Sources/Common/DemandBuffer.swift b/Sources/Common/DemandBuffer.swift index 5df9743..5204262 100644 --- a/Sources/Common/DemandBuffer.swift +++ b/Sources/Common/DemandBuffer.swift @@ -27,6 +27,11 @@ class DemandBuffer { private var completion: Subscribers.Completion? private var demandState = Demand() + /// The remaining demand, i.e. the number of values the subscriber still expects + var remainingDemand: Subscribers.Demand { + demandState.requested - demandState.sent + } + /// Initialize a new demand buffer for a provided downstream subscriber /// /// - parameter subscriber: The downstream subscriber demanding events @@ -70,7 +75,7 @@ class DemandBuffer { /// Signal to the buffer that the downstream requested new demand /// - /// - note: The buffer will attempt to flush as many events rqeuested + /// - note: The buffer will attempt to flush as many events requested /// by the downstream at this point func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand { flush(adding: demand) @@ -110,7 +115,7 @@ class DemandBuffer { return .none } - let sentDemand = demandState.requested - demandState.sent + let sentDemand = remainingDemand demandState.sent += sentDemand return sentDemand } diff --git a/Sources/Operators/ConcatMap.swift b/Sources/Operators/ConcatMap.swift new file mode 100644 index 0000000..43bec21 --- /dev/null +++ b/Sources/Operators/ConcatMap.swift @@ -0,0 +1,237 @@ +// +// ConcatMap.swift +// CombineExt +// +// Created by Daniel Peter on 22/11/2020. +// Copyright © 2020 Combine Community. All rights reserved. +// + +#if canImport(Combine) +import Combine + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publisher { + /// Transforms an output value into a new publisher, and flattens the stream of events from these multiple upstream publishers to appear as if they were coming from a single stream of events. + /// + /// Mapping to a new publisher will keep the subscription to the previous one alive until it completes and only then subscribe to the new one. This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed. + /// + /// - parameter transform: A transform to apply to each emitted value, from which you can return a new Publisher + /// + /// - returns: A publisher emitting the values of all emitted publishers in order. + func concatMap( + _ transform: @escaping (Self.Output) -> P + ) -> Publishers.ConcatMap where T == P.Output, P: Publisher, Self.Failure == P.Failure { + return Publishers.ConcatMap(upstream: self, transform: transform) + } +} + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publishers { + struct ConcatMap: Publisher where NewPublisher: Publisher, Upstream: Publisher, NewPublisher.Failure == Upstream.Failure { + public typealias Transform = (Upstream.Output) -> NewPublisher + public typealias Output = NewPublisher.Output + public typealias Failure = Upstream.Failure + + public let upstream: Upstream + public let transform: Transform + + public init( + upstream: Upstream, + transform: @escaping Transform + ) { + self.upstream = upstream + self.transform = transform + } + + public func receive(subscriber: S) + where Output == S.Input, Failure == S.Failure { + subscriber.receive( + subscription: Subscription( + upstream: upstream, + downstream: subscriber, + transform: transform + ) + ) + } + } +} + +// MARK: - Subscription +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.ConcatMap { + final class Subscription: Combine.Subscription where + Downstream.Input == NewPublisher.Output, + Downstream.Failure == Failure + { + private var sink: OuterSink? + + init( + upstream: Upstream, + downstream: Downstream, + transform: @escaping Transform + ) { + self.sink = OuterSink( + upstream: upstream, + downstream: downstream, + transform: transform + ) + } + + func request(_ demand: Subscribers.Demand) { + sink?.demand(demand) + } + + func cancel() { + sink = nil + } + } +} + +// MARK: - Sink +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.ConcatMap { + final class OuterSink: Subscriber where + Downstream.Input == NewPublisher.Output, + Downstream.Failure == Upstream.Failure + { + private let lock = NSRecursiveLock() + + private let downstream: Downstream + private let transform: Transform + + private var upstreamSubscription: Combine.Subscription? + private var innerSink: InnerSink? + + private var bufferedDemand: Subscribers.Demand = .none + + init( + upstream: Upstream, + downstream: Downstream, + transform: @escaping Transform + ) { + self.downstream = downstream + self.transform = transform + + upstream.subscribe(self) + } + + func demand(_ demand: Subscribers.Demand) { + lock.lock(); defer { lock.unlock() } + if let innerSink = innerSink { + innerSink.demand(demand) + } else { + bufferedDemand = demand + } + + upstreamSubscription?.requestIfNeeded(.unlimited) + } + + func receive(_ input: Upstream.Output) -> Subscribers.Demand { + lock.lock(); defer { lock.unlock() } + let transformedPublisher = transform(input) + + if let innerSink = innerSink { + innerSink.enqueue(publisher: transformedPublisher) + } else { + innerSink = InnerSink( + outerSink: self, + upstream: transformedPublisher, + downstream: downstream + ) + + innerSink?.demand(bufferedDemand) + } + + return .unlimited + } + + func receive(subscription: Combine.Subscription) { + lock.lock(); defer { lock.unlock() } + upstreamSubscription = subscription + } + + func receive(completion: Subscribers.Completion) { + lock.lock(); defer { lock.unlock() } + innerSink = nil + downstream.receive(completion: completion) + cancelUpstream() + } + + func cancelUpstream() { + lock.lock(); defer { lock.unlock() } + upstreamSubscription.kill() + } + + deinit { cancelUpstream() } + } + + final class InnerSink: CombineExt.Sink where + Downstream.Input == NewPublisher.Output, + Downstream.Failure == Upstream.Failure + { + private weak var outerSink: OuterSink? + private let lock: NSRecursiveLock = NSRecursiveLock() + + private var hasActiveSubscription: Bool + private var publisherQueue: [NewPublisher] + + init( + outerSink: OuterSink, + upstream: NewPublisher, + downstream: Downstream + ) { + self.outerSink = outerSink + self.hasActiveSubscription = false + self.publisherQueue = [] + + super.init( + upstream: upstream, + downstream: downstream + ) + } + + func enqueue(publisher: NewPublisher) { + lock.lock(); defer { lock.unlock() } + if hasActiveSubscription { + publisherQueue.append(publisher) + } else { + publisher.subscribe(self) + } + } + + override func receive(_ input: NewPublisher.Output) -> Subscribers.Demand { + buffer.buffer(value: input) + } + + override func receive(subscription: Combine.Subscription) { + lock.lock(); defer { lock.unlock() } + hasActiveSubscription = true + + super.receive(subscription: subscription) + subscription.requestIfNeeded(buffer.remainingDemand) + } + + override func receive(completion: Subscribers.Completion) { + lock.lock(); defer { lock.unlock() } + hasActiveSubscription = false + + switch completion { + case .finished: + if !publisherQueue.isEmpty { + publisherQueue.removeFirst().subscribe(self) + } + case let .failure(error): + buffer.complete(completion: .failure(error)) + outerSink?.receive(completion: completion) + } + } + } +} + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension Publishers.ConcatMap.Subscription: CustomStringConvertible { + var description: String { + "ConcatMap.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>" + } +} +#endif diff --git a/Tests/ConcatMapTests.swift b/Tests/ConcatMapTests.swift new file mode 100644 index 0000000..6c7bef3 --- /dev/null +++ b/Tests/ConcatMapTests.swift @@ -0,0 +1,182 @@ +// +// ConcatMapTests.swift +// CombineExtTests +// +// Created by Daniel Peter on 22/11/2020. +// Copyright © 2020 Combine Community. All rights reserved. +// + +#if !os(watchOS) +import XCTest +import Combine +import CombineExt + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +final class ConcatMapTests: XCTestCase { + private enum TestError: Swift.Error { + case failure + } + private typealias P = PassthroughSubject + private var cancellables: Set! + + override func setUp() { + super.setUp() + cancellables = [] + } + + func test_publishes_values_in_order() { + var receivedValues = [Int]() + let expectedValues = [1, 2, 3] + + let firstPublisher = P() + let secondPublisher = P() + let thirdPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { _ in }, + receiveValue: { value in receivedValues.append(value) } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) + + firstPublisher.send(1) + firstPublisher.send(completion: .finished) + + secondPublisher.send(2) + sut.send(thirdPublisher) + secondPublisher.send(completion: .finished) + + thirdPublisher.send(3) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_ignores_values_of_subsequent_while_previous_hasNot_completed() { + var receivedValues = [Int]() + let expectedValues = [1, 3] + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { _ in }, + receiveValue: { value in receivedValues.append(value) } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) + + firstPublisher.send(1) + secondPublisher.send(2) + firstPublisher.send(completion: .finished) + + secondPublisher.send(3) + secondPublisher.send(completion: .finished) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_publishes_values_of_subsequent_publisher_after_emptying_publisher_queue() { + var receivedValues = [Int]() + let expectedValues = [1, 2] + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { _ in }, + receiveValue: { value in receivedValues.append(value) } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + firstPublisher.send(1) + firstPublisher.send(completion: .finished) + + sut.send(secondPublisher) + secondPublisher.send(2) + secondPublisher.send(completion: .finished) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_synchronous_completion() { + var receivedValues = [Int]() + let expectedValues = [1, 2] + let firstPublisher = Just(1) + let secondPublisher = Just(2) + + let sut = PassthroughSubject, Never>() + + sut.concatMap { $0 } + .sink { value in receivedValues.append(value) } + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) + + XCTAssertEqual(expectedValues, receivedValues) + } + + func test_completes_when_upstream_completes() { + var receivedCompletion: Subscribers.Completion? + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { receivedCompletion = $0 }, + receiveValue: { _ in } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) + firstPublisher.send(completion: .finished) + XCTAssertNil(receivedCompletion) + secondPublisher.send(completion: .finished) + XCTAssertNil(receivedCompletion) + sut.send(completion: .finished) + XCTAssertNotNil(receivedCompletion) + } + + func test_completes_with_failure_if_publisher_fails() { + let expectedCompletion = Subscribers.Completion.failure(.failure) + var receivedCompletion: Subscribers.Completion? + + let firstPublisher = P() + let secondPublisher = P() + + let sut = PassthroughSubject() + + sut.concatMap { $0 } + .sink( + receiveCompletion: { receivedCompletion = $0 }, + receiveValue: { _ in } + ) + .store(in: &cancellables) + + sut.send(firstPublisher) + sut.send(secondPublisher) + firstPublisher.send(completion: .failure(.failure)) + XCTAssertEqual(receivedCompletion, expectedCompletion) + secondPublisher.send(completion: .finished) + XCTAssertEqual(receivedCompletion, expectedCompletion) + } +} +#endif