Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/main/java/kms/chapter02/Point.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kms.chapter02;

import java.util.concurrent.atomic.AtomicInteger;

public class Point {
private final AtomicInteger x = new AtomicInteger(0);
private final AtomicInteger y = new AtomicInteger(0);

void rightUp() {
x.incrementAndGet();
y.incrementAndGet();
}

int getX() {
return x.get();
}

int getY() {
return y.get();
}

@Override
public String toString() {
return String.format("%d, %d", getX(), getY());
}
}
31 changes: 31 additions & 0 deletions src/main/java/kms/chapter02/PointSample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package kms.chapter02;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class PointSample {

public static void main(String[] args) throws Exception {
final Point pos = new Point();

Runnable task = () -> {
for(int i = 0; i < 10000 ; i++) {
pos.rightUp();
}
};

ExecutorService executorService = Executors.newCachedThreadPool();

Future<Boolean> future1 = executorService.submit(task, true);
Future<Boolean> future2 = executorService.submit(task, true);

if(future1.get() && future2.get()) {
System.out.println(pos);
} else {
System.out.println("failed");
}

executorService.shutdown();
}
}
10 changes: 0 additions & 10 deletions src/main/kotlin/kms/Main.kt

This file was deleted.

76 changes: 76 additions & 0 deletions src/main/kotlin/kms/chapter01/L11_FlowableSample.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package kms.chapter01

import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.FlowableEmitter
import io.reactivex.FlowableOnSubscribe
import io.reactivex.schedulers.Schedulers
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

private fun main() {
val flowable = Flowable.create(
object : FlowableOnSubscribe<String> {

// FlowableEmitter 가 Subscriber 에게 데이터를 통지한다
// create 의 구현을 따라가면
// 에러가 발생하면 catch 해서 onError 로 전달하는 부분이 존재함
// 단, 치명적인 에러라면 다시 Throw 를 던짐
// https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
override fun subscribe(emitter: FlowableEmitter<String>) {
val strArr = arrayOf("Hello, World!", "안녕, RxJava!")
for (str in strArr) {
// 구동 해지 상태에서 종료하지 않고 onNext 를 진행해도 데이터를 통지하지 않는다
// 단, Rx 에서 해주는 것은 통지하지 않는 것이지 계속 진행은 하므로 직접 처리하는게 좋다
if (emitter.isCancelled) {
return
}
// 데이터를 통지한다
// 만약 null 을 전달하면 NullPointException 이 발생한다
emitter.onNext(str)
}
// onComplete 를 통지하면 그 이후엔 아무것도 통지하면 안된다
emitter.onComplete()
}
},
// BackpressureStrategy 에 따라 다른 Emitter 를 생성
BackpressureStrategy.BUFFER,
)
flowable
// 데이터를 받는 측의 쓰레드를 변경할 때 사용
.observeOn(Schedulers.computation())
// Flowable 는 Publisher 인터페이스를 구현했기 때문에 Subscriber 와의 상호작용을 외부에서 영향을 받지 않는다
.subscribe(object : Subscriber<String> {

// Subscriber 가 받을 데이터의 개수를 요청 및 구독 해지할 수 있는 인터페이스
// onNext에서 직접 배압을 처리하기 위해서 subscription 을 멤버 변수로 저장
private var subscription: Subscription? = null

override fun onSubscribe(s: Subscription?) {
subscription = s
// 요청 데이터의 개수를 MAX 로 처리하면 onNext 에서 더 이상 요청하지 않아도 됨
// onSubscribe 에서 request를 호출하지 않으면 데이터르 받을 수 없다
// request는 onSubscribe 의 가장 마지막에서 호출 해야함
subscription?.request(1L)
}

// Flowable 에서 데이터를 받으면 호출 되는 메서드
override fun onNext(data: String?) {
println("${Thread.currentThread().name}: $data")
subscription?.request(1L)
}

// 에러가 발생했거나 에러를 통지할 때 실행되는 메서드
// onError 이후에는 onNext 나 onComplete 가 실행되지 않는다
override fun onError(error: Throwable?) {
error?.printStackTrace()
}

// 모든 데이터의 통지를 끝내고 처리가 완료됐을 때 실행되는 메서드
override fun onComplete() {
println("${Thread.currentThread().name}: 완료")
}
})

Thread.sleep(500L)
}
46 changes: 46 additions & 0 deletions src/main/kotlin/kms/chapter01/L18_ObservableSample.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package kms.chapter01

import io.reactivex.Observable
import io.reactivex.ObservableOnSubscribe
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers

private fun main() {
val observable = Observable.create(
ObservableOnSubscribe { emitter ->
val strArr = arrayOf("Hello, World!", "안녕, RxJava!")

for (str in strArr) {
// 구독이 해제됐는지를 확인합니다.
if (emitter.isDisposed) {
return@ObservableOnSubscribe
}
emitter.onNext(str)
}
emitter.onComplete()
},
)

observable
.observeOn(Schedulers.computation())
.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
}

override fun onNext(item: String) {
// 배압 기능이 없기 때문에 데이터를 요청하지 않는다
println("${Thread.currentThread().name}: $item")
}

override fun onError(e: Throwable) {
e.printStackTrace()
}

override fun onComplete() {
println("${Thread.currentThread().name}: 완료")
}
})

Thread.sleep(500L)
}
29 changes: 29 additions & 0 deletions src/main/kotlin/kms/chapter01/L29_CompositeDisposableSample.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kms.chapter01

import io.reactivex.Flowable
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.schedulers.Schedulers

private fun main() {
val disposable = CompositeDisposable()
disposable.add(
Flowable.range(1, 3)
.doOnCancel { println("No.1 canceled") }
.observeOn(Schedulers.computation())
.subscribe {
Thread.sleep(100L)
println("No.1: $it")
},
)
disposable.add(
Flowable.range(1, 3)
.doOnCancel { println("No.2 canceled") }
.observeOn(Schedulers.computation())
.subscribe {
Thread.sleep(100L)
println("No.2: $it")
},
)
Thread.sleep(150L)
disposable.dispose()
}
24 changes: 24 additions & 0 deletions src/main/kotlin/kms/chapter01/L30_SingleSampel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package kms.chapter01

import io.reactivex.Single
import io.reactivex.SingleObserver
import io.reactivex.disposables.Disposable
import java.time.DayOfWeek
import java.time.LocalDate

private fun main() {
Single.create { emitter ->
emitter.onSuccess(LocalDate.now().dayOfWeek)
}.subscribe(object : SingleObserver<DayOfWeek> {
override fun onSubscribe(d: Disposable) {
}

override fun onSuccess(dayOfWeek: DayOfWeek) {
println(dayOfWeek)
}

override fun onError(e: Throwable) {
e.printStackTrace()
}
})
}
30 changes: 30 additions & 0 deletions src/main/kotlin/kms/chapter01/L31_MaybeSample.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package kms.chapter01

import io.reactivex.Maybe
import io.reactivex.MaybeObserver
import io.reactivex.disposables.Disposable
import java.time.DayOfWeek

private fun main() {
Maybe.create<DayOfWeek> { emitter ->
// 데이터를 통지할 때는 onSuccess 만 통지
// emitter.onSuccess(LocalDate.now().dayOfWeek)
// 데이터를 통지 하지 않을 때는 onComplete 만 통지
emitter.onComplete()
}.subscribe(object : MaybeObserver<DayOfWeek> {
override fun onSubscribe(d: Disposable) {
}

override fun onSuccess(dayOfWeek: DayOfWeek) {
println(dayOfWeek)
}

override fun onError(e: Throwable) {
e.printStackTrace()
}

override fun onComplete() {
println("완료")
}
})
}
26 changes: 26 additions & 0 deletions src/main/kotlin/kms/chapter01/L32_CompletableSample.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kms.chapter01

import io.reactivex.Completable
import io.reactivex.CompletableObserver
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers

private fun main() {
Completable.create { emitter ->
emitter.onComplete()
}.subscribeOn(Schedulers.computation())
.subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) {
}

override fun onComplete() {
println("완료")
}

override fun onError(e: Throwable) {
e.printStackTrace()
}
})

Thread.sleep(100L)
}
Loading