Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ConcatMap operator #68

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Sources/Common/DemandBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ class DemandBuffer<S: Subscriber> {
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()

/// The remaining demand, i.e. the number of values the subscriber still expects
var remainingDemand: Subscribers.Demand {
demandState.requested - demandState.sent
}

/// Initialize a new demand buffer for a provided downstream subscriber
///
/// - parameter subscriber: The downstream subscriber demanding events
Expand Down Expand Up @@ -70,7 +75,7 @@ class DemandBuffer<S: Subscriber> {

/// Signal to the buffer that the downstream requested new demand
///
/// - note: The buffer will attempt to flush as many events rqeuested
/// - note: The buffer will attempt to flush as many events requested
/// by the downstream at this point
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
flush(adding: demand)
Expand Down Expand Up @@ -110,7 +115,7 @@ class DemandBuffer<S: Subscriber> {
return .none
}

let sentDemand = demandState.requested - demandState.sent
let sentDemand = remainingDemand
demandState.sent += sentDemand
return sentDemand
}
Expand Down
237 changes: 237 additions & 0 deletions Sources/Operators/ConcatMap.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
//
// ConcatMap.swift
// CombineExt
//
// Created by Daniel Peter on 22/11/2020.
// Copyright © 2020 Combine Community. All rights reserved.
//

#if canImport(Combine)
import Combine

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {
/// Transforms an output value into a new publisher, and flattens the stream of events from these multiple upstream publishers to appear as if they were coming from a single stream of events.
///
/// Mapping to a new publisher will keep the subscription to the previous one alive until it completes and only then subscribe to the new one. This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Mapping to a new publisher will keep the subscription to the previous one alive until it completes and only then subscribe to the new one. This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed.
/// Mapping to a new publisher will keep the subscription to the previous one alive until it completes and only then subscribe to the new one. This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed.

This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed.

Not sure about this - why would it emit at all if it's only subscribed to after the completion of the previous one?

Copy link
Author

@ohitsdaniel ohitsdaniel Apr 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new, inner publisher could potentially emit before being subscribed to (Like a non-deferred future, i.e. hot observable)?

Example:

let passthrough = PassthroughSubject<Int, Never>()

let mapped = passthrough.map { value in
    Future<Int, Never> { completion in
        completion(.success(value))
    }
    .delay(for: .seconds(1), scheduler: DispatchQueue.main)
  }
  .compactMap { $0 }

  let cancellable = mapped.sink(receiveValue: { value in
    print(value)
  })

  passthrough.send(1)
  passthrough.send(2)     

In this case, concatMap does not buffer 2 as the Publisher 1 hadn't completed when 2 was put onto the stream.

See test_ignores_values_of_subsequent_while_previous_hasNot_completed in CompactMapTests.swift

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Rx, the concatMap assumes that inner Observables are cold and that the closure creates the Observable that is being subscribed to. I think it's correct to make that assumption here as well.

///
/// - parameter transform: A transform to apply to each emitted value, from which you can return a new Publisher
///
/// - returns: A publisher emitting the values of all emitted publishers in order.
func concatMap<T, P>(
_ transform: @escaping (Self.Output) -> P
) -> Publishers.ConcatMap<P, Self> where T == P.Output, P: Publisher, Self.Failure == P.Failure {
return Publishers.ConcatMap(upstream: self, transform: transform)
ohitsdaniel marked this conversation as resolved.
Show resolved Hide resolved
}
}

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publishers {
struct ConcatMap<NewPublisher, Upstream>: Publisher where NewPublisher: Publisher, Upstream: Publisher, NewPublisher.Failure == Upstream.Failure {
public typealias Transform = (Upstream.Output) -> NewPublisher
public typealias Output = NewPublisher.Output
public typealias Failure = Upstream.Failure

public let upstream: Upstream
public let transform: Transform

public init(
upstream: Upstream,
transform: @escaping Transform
) {
self.upstream = upstream
self.transform = transform
}

public func receive<S: Subscriber>(subscriber: S)
where Output == S.Input, Failure == S.Failure {
subscriber.receive(
subscription: Subscription(
upstream: upstream,
downstream: subscriber,
transform: transform
)
)
}
}
}

// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension Publishers.ConcatMap {
final class Subscription<Downstream: Subscriber>: Combine.Subscription where
Downstream.Input == NewPublisher.Output,
Downstream.Failure == Failure
{
private var sink: OuterSink<Downstream>?

init(
upstream: Upstream,
downstream: Downstream,
transform: @escaping Transform
) {
self.sink = OuterSink(
upstream: upstream,
downstream: downstream,
transform: transform
)
}

func request(_ demand: Subscribers.Demand) {
sink?.demand(demand)
}

func cancel() {
sink = nil
}
}
}

// MARK: - Sink
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension Publishers.ConcatMap {
final class OuterSink<Downstream: Subscriber>: Subscriber where
Downstream.Input == NewPublisher.Output,
Downstream.Failure == Upstream.Failure
{
private let lock = NSRecursiveLock()

private let downstream: Downstream
private let transform: Transform

private var upstreamSubscription: Combine.Subscription?
private var innerSink: InnerSink<Downstream>?

private var bufferedDemand: Subscribers.Demand = .none

init(
upstream: Upstream,
downstream: Downstream,
transform: @escaping Transform
) {
self.downstream = downstream
self.transform = transform

upstream.subscribe(self)
}

func demand(_ demand: Subscribers.Demand) {
lock.lock(); defer { lock.unlock() }
if let innerSink = innerSink {
innerSink.demand(demand)
} else {
bufferedDemand = demand
}

upstreamSubscription?.requestIfNeeded(.unlimited)
}

func receive(_ input: Upstream.Output) -> Subscribers.Demand {
lock.lock(); defer { lock.unlock() }
let transformedPublisher = transform(input)

if let innerSink = innerSink {
innerSink.enqueue(publisher: transformedPublisher)
} else {
innerSink = InnerSink(
outerSink: self,
upstream: transformedPublisher,
downstream: downstream
)

innerSink?.demand(bufferedDemand)
}

return .unlimited
}

func receive(subscription: Combine.Subscription) {
lock.lock(); defer { lock.unlock() }
upstreamSubscription = subscription
}

func receive(completion: Subscribers.Completion<Upstream.Failure>) {
lock.lock(); defer { lock.unlock() }
innerSink = nil
downstream.receive(completion: completion)
cancelUpstream()
}

func cancelUpstream() {
lock.lock(); defer { lock.unlock() }
upstreamSubscription.kill()
}

deinit { cancelUpstream() }
}

final class InnerSink<Downstream: Subscriber>: CombineExt.Sink<NewPublisher, Downstream> where
Downstream.Input == NewPublisher.Output,
Downstream.Failure == Upstream.Failure
{
private weak var outerSink: OuterSink<Downstream>?
private let lock: NSRecursiveLock = NSRecursiveLock()

private var hasActiveSubscription: Bool
private var publisherQueue: [NewPublisher]

init(
outerSink: OuterSink<Downstream>,
upstream: NewPublisher,
downstream: Downstream
) {
self.outerSink = outerSink
self.hasActiveSubscription = false
self.publisherQueue = []

super.init(
upstream: upstream,
downstream: downstream
)
}

func enqueue(publisher: NewPublisher) {
lock.lock(); defer { lock.unlock() }
if hasActiveSubscription {
publisherQueue.append(publisher)
} else {
publisher.subscribe(self)
}
}

override func receive(_ input: NewPublisher.Output) -> Subscribers.Demand {
buffer.buffer(value: input)
}

override func receive(subscription: Combine.Subscription) {
lock.lock(); defer { lock.unlock() }
hasActiveSubscription = true

super.receive(subscription: subscription)
subscription.requestIfNeeded(buffer.remainingDemand)
}

override func receive(completion: Subscribers.Completion<Upstream.Failure>) {
lock.lock(); defer { lock.unlock() }
hasActiveSubscription = false

switch completion {
case .finished:
if !publisherQueue.isEmpty {
publisherQueue.removeFirst().subscribe(self)
}
case let .failure(error):
buffer.complete(completion: .failure(error))
outerSink?.receive(completion: completion)
}
}
}
}

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.ConcatMap.Subscription: CustomStringConvertible {
var description: String {
"ConcatMap.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>"
}
}
#endif
Loading