Skip to content

Commit 3e005e6

Browse files
authored
RxKotlin 2.0 Release Candidate (#95)
* Dependencies updated * Initial implementation of rxKotlin for rxJava2 * Code cleaning. Code review related fixes. * Whitespaces removed. * gitignore update * subscribers replaced by named args extension * subscribeBy non-null params * empty* methods removed flowable extensions added minor fixes * dependencies updated * subscribeBy method for Flowable * tests updated * minor tests refactoring * * JoinToString method and tests ported from rxKotlin 1.0 * Subjects removed * Operators added * Minor formatting fix * More formatting fixes * Refactor "subscription" references to "disposable" * refactor `flowable.kt` and `observable.kt` to match 1.x changes * add `Flowable` support for operators * update `single` to match 1.x changes * rid Subject functions * update readme to reflect `onComplete` * Fix 2.x extension tests * add Array<T>.toFlowable() * update and fix compile errors for tests in 2.x * move swtichOnNext() * git commit -m 'refactor package domain to io.reactivex.rxkotlin' * optimize imports
1 parent 97f0555 commit 3e005e6

File tree

23 files changed

+245
-178
lines changed

23 files changed

+245
-178
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ fun main(args: Array<String>) {
1919
.subscribeBy( // named arguments for lambda Subscribers
2020
onNext = { println(it) },
2121
onError = { it.printStackTrace() },
22-
onCompleted = { println("Done!") }
22+
onComplete = { println("Done!") }
2323
)
2424

2525
}

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
buildscript {
2-
ext.kotlin_version = '1.0.6'
2+
ext.kotlin_version = '1.1.0'
33
repositories { jcenter() }
44
dependencies {
55
classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+',
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
#Sun Feb 12 16:33:43 SGT 2017
1+
#Sun Mar 05 07:47:21 SGT 2017
22
distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip
6+
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-all.zip

src/examples/kotlin/rx/lang/kotlin/examples/examples.kt renamed to src/examples/kotlin/io/reactivex/rxkotlin/examples/examples.kt

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
package rx.lang.kotlin.examples
1+
package io.reactivex.rxkotlin.examples
22

33
import io.reactivex.Observable
44
import io.reactivex.disposables.CompositeDisposable
5-
import rx.lang.kotlin.addTo
6-
import rx.lang.kotlin.combineLatest
7-
import rx.lang.kotlin.observable
8-
import rx.lang.kotlin.plusAssign
9-
import rx.lang.kotlin.subscribeBy
10-
import rx.lang.kotlin.toObservable
11-
import rx.lang.kotlin.zip
5+
import io.reactivex.rxkotlin.*
6+
import io.reactivex.rxkotlin.combineLatest
7+
import io.reactivex.rxkotlin.subscribeBy
8+
import io.reactivex.rxkotlin.toObservable
9+
import io.reactivex.rxkotlin.zip
1210
import java.net.URL
1311
import java.util.Scanner
1412
import java.util.concurrent.TimeUnit
@@ -50,37 +48,37 @@ fun main(args: Array<String>) {
5048
addToCompositeSubscription()
5149
}
5250

53-
private fun URL.toScannerObservable() = observable<String> { s ->
51+
private fun URL.toScannerObservable() = Observable.create<String> { s ->
5452
this.openStream().use { stream ->
5553
Scanner(stream).useDelimiter("\\A")
5654
.toObservable()
5755
.subscribe { s.onNext(it) }
5856
}
5957
}
6058

61-
fun syncObservable(): Observable<String> = observable { subscriber ->
59+
fun syncObservable(): Observable<String> = Observable.create { subscriber ->
6260
(0..75).toObservable()
6361
.map { "Sync value_$it" }
6462
.subscribe { subscriber.onNext(it) }
6563
}
6664

67-
fun asyncObservable(): Observable<String> = observable { subscriber ->
65+
fun asyncObservable(): Observable<String> = Observable.create { subscriber ->
6866
thread {
6967
(0..75).toObservable()
7068
.map { "Async value_$it" }
7169
.subscribe { subscriber.onNext(it) }
7270
}
7371
}
7472

75-
fun asyncWiki(vararg articleNames: String): Observable<String> = observable { subscriber ->
73+
fun asyncWiki(vararg articleNames: String): Observable<String> = Observable.create { subscriber ->
7674
thread {
7775
articleNames.toObservable()
7876
.flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() }
7977
.subscribe { subscriber.onNext(it) }
8078
}
8179
}
8280

83-
fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable<String> = observable { subscriber ->
81+
fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable<String> = Observable.create { subscriber ->
8482
thread {
8583
articleNames.toObservable()
8684
.flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() }

src/examples/kotlin/rx/lang/kotlin/examples/retrofit/retrofit.kt renamed to src/examples/kotlin/io/reactivex/rxkotlin/examples/retrofit/retrofit.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package rx.lang.kotlin.examples.retrofit
1+
package io.reactivex.rxkotlin.examples.retrofit
22

33
import io.reactivex.Observable
44
import retrofit.RestAdapter

src/main/kotlin/rx/lang/kotlin/completable.kt renamed to src/main/kotlin/io/reactivex/rxkotlin/completable.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package rx.lang.kotlin
1+
package io.reactivex.rxkotlin
22

33
import io.reactivex.Completable
44
import io.reactivex.functions.Action
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.reactivex.rxkotlin
2+
3+
import io.reactivex.disposables.CompositeDisposable
4+
import io.reactivex.disposables.Disposable
5+
6+
/**
7+
* disposable += observable.subscribe()
8+
*/
9+
operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
10+
add(disposable)
11+
}
12+
13+
/**
14+
* Add the subscription to a CompositeSubscription.
15+
* @param compositeDisposable CompositeDisposable to add this subscription to
16+
* @return this instance
17+
*/
18+
fun Disposable.addTo(compositeDisposable: CompositeDisposable): Disposable
19+
= apply { compositeDisposable.add(this) }
Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,16 @@
1-
package rx.lang.kotlin
1+
package io.reactivex.rxkotlin
22

3-
import io.reactivex.BackpressureStrategy
43
import io.reactivex.Flowable
5-
import io.reactivex.FlowableEmitter
6-
import io.reactivex.Single
74
import io.reactivex.functions.BiFunction
85

9-
fun <T : Any> flowable(
10-
strategy: BackpressureStrategy = BackpressureStrategy.BUFFER,
11-
body: (FlowableEmitter<in T>) -> Unit
12-
): Flowable<T> = Flowable.create(body, strategy)
136

14-
private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
15-
override fun iterator(): Iterator<T> = this@toIterable
16-
}
17-
18-
fun BooleanArray.toFlowable(): Flowable<Boolean> = Flowable.fromArray(*this.toTypedArray())
19-
fun ByteArray.toFlowable(): Flowable<Byte> = Flowable.fromArray(*this.toTypedArray())
20-
fun ShortArray.toFlowable(): Flowable<Short> = Flowable.fromArray(*this.toTypedArray())
21-
fun IntArray.toFlowable(): Flowable<Int> = Flowable.fromArray(*this.toTypedArray())
22-
fun LongArray.toFlowable(): Flowable<Long> = Flowable.fromArray(*this.toTypedArray())
23-
fun FloatArray.toFlowable(): Flowable<Float> = Flowable.fromArray(*this.toTypedArray())
24-
fun DoubleArray.toFlowable(): Flowable<Double> = Flowable.fromArray(*this.toTypedArray())
7+
fun BooleanArray.toFlowable(): Flowable<Boolean> = this.asIterable().toFlowable()
8+
fun ByteArray.toFlowable(): Flowable<Byte> = this.asIterable().toFlowable()
9+
fun ShortArray.toFlowable(): Flowable<Short> = this.asIterable().toFlowable()
10+
fun IntArray.toFlowable(): Flowable<Int> = this.asIterable().toFlowable()
11+
fun LongArray.toFlowable(): Flowable<Long> = this.asIterable().toFlowable()
12+
fun FloatArray.toFlowable(): Flowable<Float> = this.asIterable().toFlowable()
13+
fun DoubleArray.toFlowable(): Flowable<Double> = this.asIterable().toFlowable()
2514
fun <T : Any> Array<T>.toFlowable(): Flowable<T> = Flowable.fromArray(*this)
2615

2716
fun IntProgression.toFlowable(): Flowable<Int> =
@@ -30,14 +19,11 @@ fun IntProgression.toFlowable(): Flowable<Int> =
3019

3120
fun <T : Any> Iterator<T>.toFlowable(): Flowable<T> = toIterable().toFlowable()
3221
fun <T : Any> Iterable<T>.toFlowable(): Flowable<T> = Flowable.fromIterable(this)
33-
fun <T : Any> Sequence<T>.toFlowable(): Flowable<T> = Flowable.fromIterable(iterator().toIterable())
22+
fun <T : Any> Sequence<T>.toFlowable(): Flowable<T> = asIterable().toFlowable()
3423

3524
fun <T : Any> Iterable<Flowable<out T>>.merge(): Flowable<T> = Flowable.merge(this.toFlowable())
3625
fun <T : Any> Iterable<Flowable<out T>>.mergeDelayError(): Flowable<T> = Flowable.mergeDelayError(this.toFlowable())
3726

38-
inline fun <T : Any, R : Any> Flowable<T>.fold(initial: R, crossinline body: (R, T) -> R): Single<R>
39-
= reduce(initial) { a, e -> body(a, e) }
40-
4127
/**
4228
* Returns Flowable that wrap all values into [IndexedValue] and populates corresponding index value.
4329
* Works similar to [kotlin.withIndex]
@@ -56,20 +42,19 @@ fun <T : Any> Flowable<T>.withIndex(): Flowable<IndexedValue<T>>
5642
inline fun <T : Any, R : Any> Flowable<T>.flatMapSequence(crossinline body: (T) -> Sequence<R>): Flowable<R>
5743
= flatMap { body(it).toFlowable() }
5844

59-
fun <T : Any> Flowable<Flowable<T>>.switchOnNext(): Flowable<T> = Flowable.switchOnNext(this)
6045

6146
/**
6247
* Flowable.combineLatest(List<? extends Flowable<? extends T>> sources, FuncN<? extends R> combineFunction)
6348
*/
6449
@Suppress("UNCHECKED_CAST")
65-
inline fun <T, R> List<Flowable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Flowable<R>
50+
inline fun <T, R> Iterable<Flowable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Flowable<R>
6651
= Flowable.combineLatest(this) { combineFunction(it.asList().map { it as T }) }
6752

6853
/**
6954
* Flowable.zip(List<? extends Flowable<? extends T>> sources, FuncN<? extends R> combineFunction)
7055
*/
7156
@Suppress("UNCHECKED_CAST")
72-
inline fun <T, R> List<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Flowable<R>
57+
inline fun <T, R> Iterable<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Flowable<R>
7358
= Flowable.zip(this) { zipFunction(it.asList().map { it as T }) }
7459

7560
/**
@@ -78,6 +63,10 @@ inline fun <T, R> List<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>)
7863
inline fun <reified R : Any> Flowable<*>.cast(): Flowable<R> = cast(R::class.java)
7964

8065
/**
81-
* Filters the items emitted by an Observable, only emitting those of the specified type.
66+
* Filters the items emitted by an Flowable, only emitting those of the specified type.
8267
*/
83-
inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class.java)
68+
inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class.java)
69+
70+
private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
71+
override fun iterator(): Iterator<T> = this@toIterable
72+
}

src/main/kotlin/rx/lang/kotlin/maybe.kt renamed to src/main/kotlin/io/reactivex/rxkotlin/maybe.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package rx.lang.kotlin
1+
package io.reactivex.rxkotlin
22

33
import io.reactivex.Maybe
44
import java.util.concurrent.Callable
Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,16 @@
1-
package rx.lang.kotlin
1+
package io.reactivex.rxkotlin
22

33
import io.reactivex.Observable
4-
import io.reactivex.ObservableEmitter
5-
import io.reactivex.Single
64
import io.reactivex.functions.BiFunction
75

8-
fun <T : Any> observable(body: (ObservableEmitter<in T>) -> Unit): Observable<T> = Observable.create(body)
96

10-
private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
11-
override fun iterator(): Iterator<T> = this@toIterable
12-
}
13-
14-
fun BooleanArray.toObservable(): Observable<Boolean> = Observable.fromArray(*this.toTypedArray())
15-
fun ByteArray.toObservable(): Observable<Byte> = Observable.fromArray(*this.toTypedArray())
16-
fun ShortArray.toObservable(): Observable<Short> = Observable.fromArray(*this.toTypedArray())
17-
fun IntArray.toObservable(): Observable<Int> = Observable.fromArray(*this.toTypedArray())
18-
fun LongArray.toObservable(): Observable<Long> = Observable.fromArray(*this.toTypedArray())
19-
fun FloatArray.toObservable(): Observable<Float> = Observable.fromArray(*this.toTypedArray())
20-
fun DoubleArray.toObservable(): Observable<Double> = Observable.fromArray(*this.toTypedArray())
7+
fun BooleanArray.toObservable(): Observable<Boolean> = this.asIterable().toObservable()
8+
fun ByteArray.toObservable(): Observable<Byte> = this.asIterable().toObservable()
9+
fun ShortArray.toObservable(): Observable<Short> = this.asIterable().toObservable()
10+
fun IntArray.toObservable(): Observable<Int> = this.asIterable().toObservable()
11+
fun LongArray.toObservable(): Observable<Long> = this.asIterable().toObservable()
12+
fun FloatArray.toObservable(): Observable<Float> = this.asIterable().toObservable()
13+
fun DoubleArray.toObservable(): Observable<Double> = this.asIterable().toObservable()
2114
fun <T : Any> Array<T>.toObservable(): Observable<T> = Observable.fromArray(*this)
2215

2316
fun IntProgression.toObservable(): Observable<Int> =
@@ -26,14 +19,11 @@ fun IntProgression.toObservable(): Observable<Int> =
2619

2720
fun <T : Any> Iterator<T>.toObservable(): Observable<T> = toIterable().toObservable()
2821
fun <T : Any> Iterable<T>.toObservable(): Observable<T> = Observable.fromIterable(this)
29-
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = Observable.fromIterable(iterator().toIterable())
22+
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = asIterable().toObservable()
3023

3124
fun <T : Any> Iterable<Observable<out T>>.merge(): Observable<T> = Observable.merge(this.toObservable())
3225
fun <T : Any> Iterable<Observable<out T>>.mergeDelayError(): Observable<T> = Observable.mergeDelayError(this.toObservable())
3326

34-
inline fun <T : Any, R : Any> Observable<T>.fold(initial: R, crossinline body: (R, T) -> R): Single<R>
35-
= reduce(initial) { a, e -> body(a, e) }
36-
3727
/**
3828
* Returns Observable that wrap all values into [IndexedValue] and populates corresponding index value.
3929
* Works similar to [kotlin.withIndex]
@@ -52,20 +42,19 @@ fun <T : Any> Observable<T>.withIndex(): Observable<IndexedValue<T>>
5242
inline fun <T : Any, R : Any> Observable<T>.flatMapSequence(crossinline body: (T) -> Sequence<R>): Observable<R>
5343
= flatMap { body(it).toObservable() }
5444

55-
fun <T : Any> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)
5645

5746
/**
5847
* Observable.combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
5948
*/
6049
@Suppress("UNCHECKED_CAST")
61-
inline fun <T, R> List<Observable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Observable<R>
50+
inline fun <T, R> Iterable<Observable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Observable<R>
6251
= Observable.combineLatest(this) { combineFunction(it.asList().map { it as T }) }
6352

6453
/**
6554
* Observable.zip(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
6655
*/
6756
@Suppress("UNCHECKED_CAST")
68-
inline fun <T, R> List<Observable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Observable<R>
57+
inline fun <T, R> Iterable<Observable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Observable<R>
6958
= Observable.zip(this) { zipFunction(it.asList().map { it as T }) }
7059

7160
/**
@@ -77,3 +66,7 @@ inline fun <reified R : Any> Observable<*>.cast(): Observable<R> = cast(R::class
7766
* Filters the items emitted by an Observable, only emitting those of the specified type.
7867
*/
7968
inline fun <reified R : Any> Observable<*>.ofType(): Observable<R> = ofType(R::class.java)
69+
70+
private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
71+
override fun iterator(): Iterator<T> = this@toIterable
72+
}

0 commit comments

Comments
 (0)