Skip to content

Commit bc330b6

Browse files
authored
Merge pull request #543 from AVSystem/more-monix-utils
Add Monix task utilities
2 parents 850be97 + 0d69f66 commit bc330b6

File tree

5 files changed

+176
-0
lines changed

5 files changed

+176
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.avsystem.commons
2+
package concurrent
3+
4+
import monix.eval.Task
5+
import monix.execution.Scheduler
6+
import org.scalatest.concurrent.ScalaFutures
7+
import org.scalatest.funsuite.AnyFunSuite
8+
import org.scalatest.matchers.should.Matchers
9+
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
10+
11+
import scala.concurrent.TimeoutException
12+
import scala.concurrent.duration._
13+
14+
class JvmTaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures {
15+
16+
import com.avsystem.commons.concurrent.TaskExtensions._
17+
18+
private implicit val scheduler: Scheduler = Scheduler.global
19+
20+
// This test does not work in SJS runtime (but the method itself does)
21+
test("lazyTimeout") {
22+
val result = Task.never.lazyTimeout(50.millis, "Lazy timeout").runToFuture.failed.futureValue
23+
result shouldBe a[TimeoutException]
24+
result.getMessage shouldBe "Lazy timeout"
25+
}
26+
}

core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala

+15
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ object ObservableExtensions extends ObservableExtensions {
2222
*/
2323
def headOptL: Task[Opt[T]] = obs.headOptionL.map(_.toOpt)
2424

25+
/**
26+
* Returns a [[monix.eval.Task Task]] which emits the first <b>non-null</b> item for which the predicate holds.
27+
*/
28+
def findOptL(p: T => Boolean): Task[Opt[T]] = obs.findL(e => e != null && p(e)).map(_.toOpt)
29+
2530
/** Suppress the duplicate elements emitted by the source Observable.
2631
*
2732
* WARNING: this requires unbounded buffering.
@@ -79,5 +84,15 @@ object ObservableExtensions extends ObservableExtensions {
7984
obs
8085
.foldLeftL(factory.newBuilder)(_ += _)
8186
.map(_.result())
87+
88+
/** Returns a [[monix.eval.Task Task]] that upon evaluation
89+
* will collect all items from the source into a [[Map]] instance
90+
* using provided functions to compute keys and values.
91+
*
92+
* WARNING: for infinite streams the process will eventually blow up
93+
* with an out of memory error.
94+
*/
95+
def mkMapL[K, V](keyFun: T => K, valueFun: T => V): Task[Map[K, V]] =
96+
obs.map(v => (keyFun(v), valueFun(v))).toL(Map)
8297
}
8398
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.avsystem.commons
2+
package concurrent
3+
4+
import com.avsystem.commons.concurrent.TaskExtensions.{TaskCompanionOps, TaskOps}
5+
import com.avsystem.commons.misc.Timestamp
6+
import monix.eval.Task
7+
import monix.reactive.Observable
8+
9+
import java.util.concurrent.TimeUnit
10+
import scala.concurrent.TimeoutException
11+
import scala.concurrent.duration.FiniteDuration
12+
13+
trait TaskExtensions {
14+
implicit def taskOps[T](task: Task[T]): TaskOps[T] = new TaskOps(task)
15+
16+
implicit def taskCompanionOps(task: Task.type): TaskCompanionOps.type = TaskCompanionOps
17+
}
18+
19+
object TaskExtensions extends TaskExtensions {
20+
final class TaskOps[T](private val task: Task[T]) extends AnyVal {
21+
/**
22+
* Similar to [[Task.timeoutWith]] but exception instance is created lazily (for performance)
23+
*/
24+
def lazyTimeout(after: FiniteDuration, msg: => String): Task[T] =
25+
task.timeoutTo(after, Task.defer(Task.raiseError(new TimeoutException(msg))))
26+
27+
/**
28+
* Similar to [[Task.tapEval]], accepts simple consumer function as an argument
29+
*/
30+
def tapL(f: T => Unit): Task[T] =
31+
task.map(_.setup(f))
32+
33+
/**
34+
* Similar to [[Task.tapError]], accepts [[PartialFunction]] as an argument
35+
*/
36+
def tapErrorL[B](f: PartialFunction[Throwable, B]): Task[T] =
37+
task.tapError(t => Task(f.applyOpt(t)))
38+
}
39+
40+
object TaskCompanionOps {
41+
import com.avsystem.commons.concurrent.ObservableExtensions.observableOps
42+
43+
/** A [[Task]] of [[Opt.Empty]] */
44+
def optEmpty[A]: Task[Opt[A]] = Task.pure(Opt.Empty)
45+
46+
def traverseOpt[A, B](opt: Opt[A])(f: A => Task[B]): Task[Opt[B]] =
47+
opt.fold(Task.optEmpty[B])(a => f(a).map(_.opt))
48+
49+
def fromOpt[A](maybeTask: Opt[Task[A]]): Task[Opt[A]] = maybeTask match {
50+
case Opt(task) => task.map(_.opt)
51+
case Opt.Empty => Task.optEmpty
52+
}
53+
54+
def traverseMap[K, V, A, B](map: Map[K, V])(f: (K, V) => Task[(A, B)]): Task[Map[A, B]] =
55+
Observable.fromIterable(map).mapEval({ case (key, value) => f(key, value) }).toL(Map)
56+
57+
def traverseMapValues[K, A, B](map: Map[K, A])(f: (K, A) => Task[B]): Task[Map[K, B]] =
58+
traverseMap(map)({ case (key, value) => f(key, value).map(key -> _) })
59+
60+
def currentTimestamp: Task[Timestamp] =
61+
Task.clock.realTime(TimeUnit.MILLISECONDS).map(Timestamp(_))
62+
63+
def usingNow[T](useNow: Timestamp => Task[T]): Task[T] =
64+
currentTimestamp.flatMap(useNow)
65+
}
66+
}

core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala

+26
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,29 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
2020
Observable.fromIterable(ints).headOptL.runToFuture.futureValue shouldBe ints.headOpt
2121
}
2222
}
23+
24+
test("headOptL - null handling") {
25+
Observable.fromIterable(Seq(null, "abc", "xyz")) .headOptL.runToFuture.futureValue shouldBe Opt.Empty
26+
}
27+
28+
test("findOptL") {
29+
forAll { ints: List[Int] =>
30+
Observable.fromIterable(ints).findOptL(_ > 1).runToFuture.futureValue shouldBe ints.findOpt(_ > 1)
31+
}
32+
}
33+
34+
test("findOptL - null handling") {
35+
Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.some("abc")
36+
Observable.fromIterable(Seq(null, null)).findOptL(_ => true).runToFuture.futureValue shouldBe Opt.Empty
37+
Observable.fromIterable(Seq(null, "abc", "xyz")).findOptL(_.startsWith("x")).runToFuture.futureValue shouldBe Opt.some("xyz")
38+
}
39+
2340
test("distinct") {
2441
forAll { ints: List[Int] =>
2542
Observable.fromIterable(ints).distinct.toListL.runToFuture.futureValue shouldBe ints.distinct
2643
}
2744
}
45+
2846
test("distinctBy") {
2947
forAll { ints: List[Int] =>
3048
val f: Int => Int = _ % 256
@@ -33,17 +51,20 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
3351
ints.foldLeft(MLinkedHashMap.empty[Int, Int])((map, v) => f(v) |> (key => map.applyIf(!_.contains(key))(_ += key -> v))).valuesIterator.toList
3452
}
3553
}
54+
3655
test("sortedL") {
3756
forAll { ints: List[Int] =>
3857
Observable.fromIterable(ints).sortedL.runToFuture.futureValue shouldBe ints.sorted
3958
}
4059
}
60+
4161
test("sortedByL") {
4262
forAll { ints: List[Int] =>
4363
val f: Int => Int = _ % 256
4464
Observable.fromIterable(ints).sortedByL(f).runToFuture.futureValue shouldBe ints.sortBy(f)
4565
}
4666
}
67+
4768
test("toL") {
4869
forAll { ints: List[(Int, Int)] =>
4970
def testFactory[T](factory: Factory[(Int, Int), T])(implicit position: Position) =
@@ -78,4 +99,9 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
7899
}
79100
}
80101

102+
test("mkMapL") {
103+
forAll { ints: List[Int] =>
104+
Observable.fromIterable(ints).mkMapL(_ % 3, _ + 2).runToFuture.futureValue shouldBe ints.mkMap(_ % 3, _ + 2)
105+
}
106+
}
81107
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.avsystem.commons
2+
package concurrent
3+
4+
import monix.eval.Task
5+
import monix.execution.Scheduler
6+
import org.scalatest.concurrent.ScalaFutures
7+
import org.scalatest.funsuite.AnyFunSuite
8+
import org.scalatest.matchers.should.Matchers
9+
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
10+
11+
class TaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures {
12+
import com.avsystem.commons.concurrent.TaskExtensions._
13+
14+
private implicit val scheduler: Scheduler = Scheduler.global
15+
16+
test("traverseOpt") {
17+
Task.traverseOpt(Opt.empty[Int])(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.Empty
18+
Task.traverseOpt(Opt.some(123))(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.some(123)
19+
}
20+
21+
test("fromOpt") {
22+
Task.fromOpt(Opt.empty[Task[Int]]).runToFuture.futureValue shouldBe Opt.Empty
23+
Task.fromOpt(Opt.some(Task.now(123))).runToFuture.futureValue shouldBe Opt.some(123)
24+
}
25+
26+
test("traverseMap") {
27+
forAll { data: List[(String, Int)] =>
28+
val map = data.toMap
29+
val expected = map.view.map({ case (key, value) => (key + key, value + 2) }).toMap
30+
val result = Task.traverseMap(map)({ case (key, value) => Task((key + key, value + 2)) }).runToFuture.futureValue
31+
result shouldBe expected
32+
}
33+
}
34+
35+
test("traverseMapValues") {
36+
forAll { data: List[(String, Int)] =>
37+
val map = data.toMap
38+
val expected = map.view.mapValues(value => value + 2).toMap
39+
val result = Task.traverseMapValues(map)({ case (key, value) => Task(value + 2) }).runToFuture.futureValue
40+
result shouldBe expected
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)