Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🆕 Add a 'collect(until:)' operator #92

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions Sources/Operators/CollectUntilTrigger.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//
// CollectUntilTrigger.swift
// CombineExt
//
// Created by ferologics on 09/06/2021.
// Copyright © 2021 Combine Community. All rights reserved.
//

#if canImport(Combine)
import Combine

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {
func collect<Trigger:Publisher>(
until trigger: Trigger
) -> Publishers.CollectUntilTrigger<Self, Trigger> where
Trigger.Output == Void,
Trigger.Failure == Never
{
Publishers.CollectUntilTrigger(
upstream: self,
trigger: trigger
)
}
}

// MARK: - Publisher

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publishers {
struct CollectUntilTrigger<
Upstream: Publisher,
Trigger: Publisher
>: Publisher where
Trigger.Output == Void,
Trigger.Failure == Never
{
public typealias Output = [Upstream.Output]
public typealias Failure = Upstream.Failure

private let upstream: Upstream
private let trigger: Trigger

init(upstream: Upstream, trigger: Trigger) {
self.upstream = upstream
self.trigger = trigger
}

public func receive<S: Subscriber>(subscriber: S)
where Failure == S.Failure, Output == S.Input
{
subscriber.receive(
subscription: Subscription(
upstream: upstream,
downstream: subscriber,
trigger: trigger
)
)
}
}
}

// MARK: - Subscription

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension Publishers.CollectUntilTrigger {
final class Subscription<
Downstream: Subscriber
>: Combine.Subscription where
Downstream.Input == [Upstream.Output],
Downstream.Failure == Upstream.Failure
{
private var sink: Sink<Downstream>?
private var cancellable: Cancellable?

init(
upstream: Upstream,
downstream: Downstream,
trigger: Trigger
) {
self.sink = Sink(
upstream: upstream,
downstream: downstream
)

cancellable = trigger.sink { [self] in
_ = sink?.buffer.buffer(value: sink?.elements ?? [])
_ = sink?.buffer.demand(.max(1))
sink?.flush()
}
}

func request(_ demand: Subscribers.Demand) {
sink?.demand(demand)
}

func cancel() {
sink = nil
cancellable?.cancel()
cancellable = nil
}

var description: String {
return "CollectUntilTrigger.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>"
}
}
}

// MARK: - Sink

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension Publishers.CollectUntilTrigger {
final class Sink<
Downstream: Subscriber
>: CombineExt.Sink<Upstream, Downstream> where
Downstream.Input == [Upstream.Output],
Downstream.Failure == Upstream.Failure
{
private let lock = NSRecursiveLock()
var elements: [Upstream.Output] = []

override func receive(_ input: Upstream.Output) -> Subscribers.Demand {
lock.lock()
defer { lock.unlock() }
elements.append(input)
return .none
}

func flush() {
lock.lock()
defer { lock.unlock() }
elements = []
}
}
}

#endif

2 changes: 1 addition & 1 deletion Sources/Operators/Dematerialize.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public extension Publishers {
}
}

// MARK: - Subscrription
// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension Publishers.Dematerialize {
class Subscription<Downstream: Subscriber>: Combine.Subscription
Expand Down
42 changes: 42 additions & 0 deletions Tests/CollectUntilTriggerTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// CollectUntilTriggerTests.swift
// CombineExtTests
//
// Created by ferologics on 09/06/2021.
// Copyright © 2020 Combine Community. All rights reserved.
//

#if !os(watchOS)
import XCTest
import Combine

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
class CollectUntilTriggerTests: XCTestCase {
var subscription: AnyCancellable!

func test() {
// Given
let elements = [1,2,3,4,5]
var receivedElements = [Int]()
let elementsPublisher = PassthroughSubject<Int, Never>()
let trigger = PassthroughSubject<Void, Never>()

// When
subscription = elementsPublisher
.collect(until: trigger)
.sink { receivedElements = $0 }

for x in elements {
elementsPublisher.send(x)
}

// Then
XCTAssertTrue(receivedElements.isEmpty)
trigger.send(())
XCTAssertEqual(elements.count, receivedElements.count)
for (a, b) in zip(elements, receivedElements) {
XCTAssertEqual(a, b)
}
}
}
#endif