Skip to content

Commit e518eed

Browse files
6 - RPC Client Streaming
Update README.md take n (#5) - Checking the last steps Update README.md (#6) - small adjustments Adds travis file and minor explanation about RFM (#7)
1 parent 4294dda commit e518eed

File tree

8 files changed

+386
-37
lines changed

8 files changed

+386
-37
lines changed

.travis.yml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
language: scala
2+
3+
scala:
4+
- 2.12.4
5+
6+
jdk:
7+
- oraclejdk8
8+
9+
before_cache:
10+
- du -h -d 1 $HOME/.ivy2/
11+
- du -h -d 2 $HOME/.sbt/
12+
- find $HOME/.sbt -name "*.lock" -type f -delete
13+
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -type f -delete
14+
15+
cache:
16+
directories:
17+
- $HOME/.sbt/cache
18+
- $HOME/.sbt/1.0
19+
- $HOME/.sbt/boot/
20+
- $HOME/.sbt/boot/scala*
21+
- $HOME/.sbt/launchers
22+
- $HOME/.ivy2/cache
23+
- $HOME/.ivy2
24+
25+
script:
26+
- sbt ++$TRAVIS_SCALA_VERSION "app/clean" "app/compile"
27+
- sbt ++$TRAVIS_SCALA_VERSION "server/clean" "server/compile"

README.md

+287-28
Large diffs are not rendered by default.

app/src/main/scala/AppRFMClient.scala

+28-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@ object AppRFMClient extends Implicits {
1818
implicit val rfmClient: RFMAnalysisService.Client[IO] =
1919
RFMAnalysisService.client[IO](channel)
2020

21-
val (segments: IO[SegmentList], stream: Observable[UserEvent]) =
22-
(rfmClient.segments(Empty), rfmClient.userEvents(Empty))
21+
val (segments: IO[SegmentList], stream: Observable[UserEvent], ack: IO[Ack]) =
22+
(
23+
rfmClient.segments(Empty),
24+
rfmClient.userEvents(Empty),
25+
rfmClient.orderStream(ordersStreamObs)
26+
)
2327

2428
println(s"Segments: \n${segments.unsafeRunSync().list.mkString("\n")}\n")
29+
println(s"Client Streaming: \n${ack.unsafeRunSync()}\n")
2530
Await.ready(
2631
stream
2732
.map { u =>
@@ -33,4 +38,25 @@ object AppRFMClient extends Implicits {
3338
Duration.Inf)
3439
}
3540

41+
private[this] def ordersStreamObs: Observable[Order] = {
42+
val orderList: List[Order] = (1 to 1000).map { customerId =>
43+
import com.fortysevendeg.scalacheck.datetime.GenDateTime
44+
import org.joda.time.{DateTime, Period}
45+
import org.scalacheck._
46+
import com.fortysevendeg.scalacheck.datetime.instances.joda.jodaForPeriod
47+
48+
(for {
49+
date <- GenDateTime.genDateTimeWithinRange(DateTime.parse("2017-12-01"), Period.days(22))
50+
orderId <- Gen.uuid
51+
total <- Gen.choose[Int](5, 200)
52+
} yield
53+
Order(
54+
customerId,
55+
CustomerData(date.toString, orderId.toString, total)
56+
)).sample.get
57+
}.toList
58+
59+
Observable.fromIterable(orderList)
60+
}
61+
3662
}

build.sbt

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ lazy val commonSettings: Seq[Def.Setting[_]] = Seq(
55
addCompilerPlugin("org.scalameta" % "paradise" % "3.0.0-M10" cross CrossVersion.full),
66
libraryDependencies ++= Seq(
77
"io.frees" %% "frees-core" % freesV,
8-
"io.frees" %% "frees-async-cats-effect" % freesV,
8+
"io.frees" %% "frees-async-cats-effect" % freesV,
99
"io.frees" %% "frees-rpc" % "0.4.1",
10-
"org.scalameta" %% "scalameta" % "1.8.0"),
10+
"org.scalameta" %% "scalameta" % "1.8.0",
11+
"joda-time" % "joda-time" % "2.9.9",
12+
"com.github.tototoshi" %% "scala-csv" % "1.3.5"),
1113
scalacOptions ++= Seq("-Xplugin-require:macroparadise", "-Ywarn-unused-import"),
1214
scalacOptions in(Compile, console) ~= (_ filterNot (_ contains "paradise")) // macroparadise plugin doesn't work in repl yet.
1315
)
@@ -29,7 +31,6 @@ lazy val `data-generator` =
2931
.dependsOn(`functional-microservices`)
3032
.settings(
3133
libraryDependencies ++= Seq(
32-
"joda-time" % "joda-time" % "2.9.9",
3334
"io.monix" %% "monix" % "3.0.0-M2",
3435
"org.scalacheck" %% "scalacheck" % "1.13.4",
3536
"com.47deg" %% "scalacheck-toolbox-datetime" % "0.2.3"

server/src/main/scala/implicits.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import cats.{~>, Applicative}
55
import freestyle.rpc.server.handlers.GrpcServerHandler
66
import freestyle.rpc.server._
77
import freestyle.rpc.server.implicits._
8-
import scalaexchange.services.protocol.RFMAnalysisService
8+
import monix.eval.Task
99

10+
import scalaexchange.services.protocol.RFMAnalysisService
1011
import scalaexchange.services.runtime.RFMAnalysisServiceHandler
1112

1213
trait Implicits extends scalaexchange.CommonImplicits {
1314

14-
implicit def rfmAnalisysServiceHandler[F[_]: Applicative]: RFMAnalysisServiceHandler[F] =
15+
implicit def rfmAnalisysServiceHandler[F[_]: Applicative](
16+
implicit T2F: Task ~> F): RFMAnalysisServiceHandler[F] =
1517
new RFMAnalysisServiceHandler[F]
1618

1719
val grpcConfigs: List[GrpcConfig] = List(

services/src/main/scala/protocol.scala

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ object protocol {
1818

1919
final case class SegmentList(list: List[Segment])
2020

21+
final case class Ack(result: String)
22+
2123
@service
2224
trait RFMAnalysisService[F[_]] {
2325

@@ -27,6 +29,10 @@ object protocol {
2729
@stream[ResponseStreaming.type]
2830
def userEvents(empty: Empty.type): F[Observable[UserEvent]]
2931

32+
@rpc(Avro)
33+
@stream[RequestStreaming.type]
34+
def orderStream(orders: Observable[Order]): F[Ack]
35+
3036
}
3137

3238
}

services/src/main/scala/runtime/RFMAnalysisServiceHandler.scala

+26-2
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@ package scalaexchange
22
package services
33
package runtime
44

5-
import cats.Applicative
5+
import java.io.File
6+
7+
import cats.{~>, Applicative}
8+
import com.github.tototoshi.csv._
69
import freestyle.rpc.protocol._
10+
import monix.eval.Task
11+
import monix.execution.Scheduler
712
import monix.reactive.Observable
813

914
import scalaexchange.datagenerator.StreamingService
1015
import scalaexchange.services.protocol._
1116

12-
class RFMAnalysisServiceHandler[F[_]: Applicative] extends RFMAnalysisService[F] {
17+
class RFMAnalysisServiceHandler[F[_]: Applicative](implicit S: Scheduler, T2F: Task ~> F)
18+
extends RFMAnalysisService[F] {
1319

1420
private[this] val segmentList: List[Segment] = List(
1521
Segment("Champions", 4, 5, 4, 5, 4, 5),
@@ -26,9 +32,27 @@ class RFMAnalysisServiceHandler[F[_]: Applicative] extends RFMAnalysisService[F]
2632

2733
private[this] val streamingService = new StreamingService
2834

35+
private[this] val outPath: String = "orders.csv"
36+
2937
override def segments(empty: Empty.type): F[protocol.SegmentList] =
3038
Applicative[F].pure(SegmentList(segmentList))
3139

3240
override def userEvents(empty: Empty.type): F[Observable[UserEvent]] =
3341
Applicative[F].pure(streamingService.userEventsStream)
42+
43+
override def orderStream(orders: Observable[Order]): F[Ack] = T2F {
44+
val f: File = new File(outPath)
45+
val writer: CSVWriter = CSVWriter.open(f)
46+
47+
orders
48+
.foreachL { order =>
49+
writer.writeRow(
50+
List(order.data.date, order.data.orderId, order.customerId, order.data.total)
51+
)
52+
}
53+
.map { _ =>
54+
writer.close()
55+
Ack(" 👍 ")
56+
}
57+
}
3458
}

src/main/scala/models.scala

+4
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,7 @@ case object UnprocessedCheckout extends EventType
66
case object Login extends EventType
77

88
case class UserEvent(userId: Int, eventType: EventType, date: String)
9+
10+
case class CustomerData(date: String, orderId: String, total: Int)
11+
12+
case class Order(customerId: Int, data: CustomerData)

0 commit comments

Comments
 (0)