Skip to content

Commit b5880d8

Browse files
committed
Merge pull request #27 from Laimiux/lt/switchOnNext
Added extension method for static Observable.switchOnNext method
2 parents 99bb9fe + 5334bbf commit b5880d8

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

src/main/kotlin/rx/lang/kotlin/observables.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,5 @@ public inline fun <T> Observable<T>.subscribeWith( body : FunctionSubscriberModi
106106
modifier.body()
107107
return subscribe(modifier.subscriber)
108108
}
109+
110+
public fun <T> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)

src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616

1717
package rx.lang.kotlin
1818

19-
import rx.Observable
2019
import org.junit.Test
2120
import org.mockito.Mockito.*
2221
import org.mockito.Matchers.*
2322
import org.junit.Assert.*
24-
import rx.Notification
2523
import kotlin.concurrent.thread
26-
import rx.Subscriber
2724
import org.funktionale.partials.*
25+
import rx.*
26+
import rx.schedulers.TestScheduler
27+
import java.util.concurrent.TimeUnit
2828

2929
/**
3030
* This class contains tests using the extension functions provided by the language adaptor.
@@ -236,6 +236,41 @@ public class ExtensionTests : KotlinTests() {
236236
assertEquals(listOf(3, 6, 9), values[2])
237237
}
238238

239+
@Test
240+
public fun testSwitchOnNext() {
241+
val testScheduler = TestScheduler()
242+
val worker = testScheduler.createWorker()
243+
244+
val observable = observable<Observable<Long>> { s ->
245+
fun at(delay: Long, func : () -> Unit){
246+
worker.schedule({
247+
func()
248+
}, delay, TimeUnit.MILLISECONDS)
249+
}
250+
251+
val first = Observable.interval(5, TimeUnit.MILLISECONDS, testScheduler).take(3)
252+
at(0, { s.onNext(first) })
253+
254+
val second = Observable.interval(5, TimeUnit.MILLISECONDS, testScheduler).take(3)
255+
at(11, { s.onNext(second) })
256+
257+
at(40, { s.onCompleted() })
258+
}
259+
260+
observable.switchOnNext().subscribe(received)
261+
262+
val inOrder = inOrder(a)
263+
testScheduler.advanceTimeTo(10, TimeUnit.MILLISECONDS)
264+
inOrder.verify(a, times(1)).received(0L)
265+
inOrder.verify(a, times(1)).received(1L)
266+
267+
testScheduler.advanceTimeTo(40, TimeUnit.MILLISECONDS)
268+
inOrder.verify(a, times(1)).received(0L)
269+
inOrder.verify(a, times(1)).received(1L)
270+
inOrder.verify(a, times(1)).received(2L)
271+
inOrder.verifyNoMoreInteractions()
272+
}
273+
239274
val funOnSubscribe: (Int, Subscriber<in String>) -> Unit = { counter, subscriber ->
240275
subscriber.onNext("hello_$counter")
241276
subscriber.onCompleted()

0 commit comments

Comments
 (0)