Skip to content

Commit ad08350

Browse files
committed
Added the test cases
1 parent 7059618 commit ad08350

File tree

6 files changed

+179
-59
lines changed

6 files changed

+179
-59
lines changed

.gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,3 @@ project/plugins/project/
1818
setVariables.sh
1919
.idea/
2020
project/project/
21-
src/test/

build.sbt

+3
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,8 @@ libraryDependencies ++= Seq(
1212
"com.typesafe" % "config" % "1.3.1",
1313
"org.json4s" %% "json4s-native" % "3.5.3",
1414
"com.lightbend" %% "kafka-streams-scala" % kafkaStreamsScalaVersion,
15+
"com.madewithtea" %% "mockedstreams" % "1.5.0" % Test,
16+
"org.scalatest" %% "scalatest" % "3.0.4" % Test,
17+
"net.manub" %% "scalatest-embedded-kafka-streams" % "1.0.0" % Test
1518
)
1619

run.sh

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/usr/bin/env bash
2+
3+
if hash sbt 2>/dev/null; then
4+
echo "Sbt is already installed on the system"
5+
else
6+
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
7+
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
8+
sudo apt-get update
9+
sudo apt-get install sbt
10+
fi
11+
12+
sbt clean compile run

src/main/scala/in/internity/Boot.scala

+8-58
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,16 @@ import java.util.concurrent.TimeUnit
55

66
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
77
import org.apache.kafka.common.serialization.Serdes
8-
import org.apache.kafka.streams.kstream.{JoinWindows, Produced}
98
import org.apache.kafka.streams.{StreamsConfig, _}
10-
import org.json4s.DefaultFormats
11-
import org.json4s.native.JsonMethods.parse
12-
import org.json4s.native.Serialization.write
13-
14-
import scala.util.Try
159

1610
/**
1711
* @author Shivansh <[email protected]>
1812
* @since 8/1/18
1913
*/
20-
object Boot extends App {
21-
implicit val formats: DefaultFormats.type = DefaultFormats
14+
15+
16+
object Boot extends App with StreamExample {
17+
2218
val config: Properties = {
2319
val p = new Properties()
2420
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
@@ -29,60 +25,14 @@ object Boot extends App {
2925
}
3026

3127
val builder = new StreamsBuilderS()
32-
val produced = Produced.`with`(Serdes.String(), Serdes.String())
33-
34-
/**
35-
* This function consumes a topic and count the words in that stream
36-
* @param intopic
37-
* @return
38-
*/
39-
private def wordCount(intopic: String) = {
40-
val textLines: KStreamS[String, String] = builder.stream(intopic)
41-
textLines
42-
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
43-
.map((a, b) => (b, b))
44-
.groupByKey()
45-
.count("CountStore").toStream.mapValues(a => a.toString)
46-
}
47-
48-
/**
49-
* This function consumes a topic and makes convert the Json to Case class
50-
* and remould it into Some other type and again converts it into Json
51-
* @param intopic
52-
* @return
53-
*/
54-
def readAndWriteJson(intopic: String) = {
55-
val textLines: KStreamS[String, String] = builder.stream(intopic)
56-
textLines.mapValues(value => {
57-
val person = Try(parse(value).extract[Person]).toOption
58-
println("1::", person)
59-
val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
60-
println("2::", personNameAndEmail)
61-
write(personNameAndEmail)
62-
})
63-
}
64-
65-
/**
66-
* This function joins two streams and gives the resultant joined stream
67-
*
68-
* @param left
69-
* @param right
70-
* @return
71-
*/
72-
def joinTwoStreams(left: KStreamS[String, String], right: KStreamS[String, String]) = {
73-
left.join(right,
74-
(value1: String, value2: String) => s"""{"display":$value2,"click":$value1}""",
75-
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)))
76-
}
77-
7828

79-
val stream1 = wordCount("lines")
29+
val stream1 = wordCount(builder, "lines")
8030

81-
val newStream =stream1.branch((a, b)=>a.equals(""))
31+
val newStream = stream1.branch((a, b) => a.equals(""))
8232

8333
stream1.to("wordCount", produced)
84-
val stream2: KStreamS[String, String] = readAndWriteJson("person")
85-
stream2.to("wordCount", produced)
34+
val stream2: KStreamS[String, String] = readAndWriteJson(builder, "person")
35+
stream2.to("personMinimal", produced)
8636
val joinedStream = joinTwoStreams(stream1, stream2)
8737
joinedStream.to("combined", produced)
8838

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package in.internity
2+
3+
import java.util.concurrent.TimeUnit
4+
5+
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
6+
import org.apache.kafka.common.serialization.Serdes
7+
import org.apache.kafka.streams.kstream.{JoinWindows, Produced}
8+
import org.json4s.DefaultFormats
9+
import org.json4s.native.JsonMethods.parse
10+
import org.json4s.native.Serialization.write
11+
12+
import scala.util.Try
13+
14+
/**
15+
* @author Shivansh <[email protected]>
16+
* @since 11/1/18
17+
*/
18+
trait StreamExample {
19+
implicit val formats: DefaultFormats.type = DefaultFormats
20+
val produced = Produced.`with`(Serdes.String(), Serdes.String())
21+
22+
/**
23+
* This function consumes a topic and count the words in that stream
24+
*
25+
* @param intopic
26+
* @return
27+
*/
28+
def wordCount(builder: StreamsBuilderS, intopic: String): KStreamS[String, String] = {
29+
val textLines: KStreamS[String, String] = builder.stream(intopic)
30+
textLines
31+
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
32+
.map((a, b) => (b, b))
33+
.groupByKey()
34+
.count("CountStore").toStream.mapValues(a => a.toString)
35+
}
36+
37+
/**
38+
* This function consumes a topic and makes convert the Json to Case class
39+
* and remould it into Some other type and again converts it into Json
40+
*
41+
* @param intopic
42+
* @return
43+
*/
44+
def readAndWriteJson(builder: StreamsBuilderS, intopic: String) = {
45+
val textLines: KStreamS[String, String] = builder.stream(intopic)
46+
textLines.mapValues(value => {
47+
val person = Try(parse(value).extract[Person]).toOption
48+
val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
49+
write(personNameAndEmail)
50+
})
51+
}
52+
53+
/**
54+
* This function joins two streams and gives the resultant joined stream
55+
*
56+
* @param left
57+
* @param right
58+
* @return
59+
*/
60+
def joinTwoStreams(left: KStreamS[String, String], right: KStreamS[String, String]) = {
61+
left.join(right,
62+
(value1: String, value2: String) => s"""{"value2":$value2,"value1":$value1}""",
63+
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)))
64+
}
65+
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package in.internity
2+
3+
4+
import java.util.Properties
5+
6+
import com.lightbend.kafka.scala.streams.StreamsBuilderS
7+
import com.madewithtea.mockedstreams.MockedStreams
8+
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
9+
import org.apache.kafka.common.serialization.{Serdes, StringSerializer}
10+
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig}
11+
import org.json4s.native.JsonMethods._
12+
import org.json4s.native.Serialization.write
13+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers, WordSpec}
14+
15+
/**
16+
* @author Shivansh <[email protected]>
17+
* @since 11/1/18
18+
*/
19+
class StreamExampleSpec extends WordSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with StreamExample {
20+
21+
implicit val configs = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 7001)
22+
23+
implicit val keySerializer = new StringSerializer
24+
25+
override def beforeAll() = {
26+
EmbeddedKafka.start()
27+
}
28+
29+
override def afterAll(): Unit = {
30+
EmbeddedKafka.stop()
31+
}
32+
33+
val kafkaConf: Properties = {
34+
val p = new Properties()
35+
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-scala-streams-example")
36+
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9002")
37+
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
38+
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
39+
p
40+
}
41+
42+
43+
"StreamExample" must {
44+
"take words and produce Wordcount" in {
45+
val inputTopic = "word"
46+
val inputString = "Hi this is Shivansh and my twitter handle is @shiv4nsh"
47+
val res = MockedStreams().topology { builder: StreamsBuilder =>
48+
wordCount(new StreamSTest(builder), inputTopic).to("Output1")
49+
}.config(kafkaConf)
50+
.input[String, String](inputTopic, Serdes.String(), Serdes.String(), Seq(("", inputString)))
51+
.output[String, String]("Output1", Serdes.String(), Serdes.String(), 9)
52+
res.toList.size shouldBe inputString.split(" ").distinct.size
53+
}
54+
55+
"take one type of Json and return other" in {
56+
val inputTopic = "person"
57+
val inputString = Person("Shivansh", 23, "[email protected]")
58+
val res = MockedStreams().topology { builder: StreamsBuilder =>
59+
readAndWriteJson(new StreamSTest(builder), inputTopic).to("Output2")
60+
}.config(kafkaConf)
61+
.input[String, String](inputTopic, Serdes.String(), Serdes.String(), Seq(("", write(inputString))))
62+
.output[String, String]("Output2", Serdes.String(), Serdes.String(), 1)
63+
parse(res.toList.head._2).extract[PersonNameAndEmail] shouldBe PersonNameAndEmail("Shivansh", "[email protected]")
64+
}
65+
66+
//FixMe: Fix this test
67+
"join two streams" in {
68+
val inputTopic1 = "wordjoin"
69+
val inputTopic2 = "personjoin"
70+
val inputString = "Hi this is Shivansh and my twitter handle is @shiv4nsh"
71+
val inputPerson = Person("Shivansh", 23, "[email protected]")
72+
val res = MockedStreams().topology { builder: StreamsBuilder =>
73+
val streamSBuilder = new StreamSTest(builder)
74+
val stream1 = wordCount( new StreamSTest(builder), inputTopic1)
75+
val stream2 = readAndWriteJson( new StreamSTest(builder), inputTopic2)
76+
joinTwoStreams(stream1, stream2).to("joinedstreams")
77+
}.config(kafkaConf)
78+
.input[String, String](inputTopic1, Serdes.String(), Serdes.String(), Seq(("key", inputString)))
79+
.input[String, String](inputTopic2, Serdes.String(), Serdes.String(), Seq(("key", write(inputPerson))))
80+
.output[String, String]("joinedstreams", Serdes.String(), Serdes.String(), 10)
81+
val result = res.toList
82+
println("\n\n\n\n\n\n", result)
83+
res.toList.size shouldBe 10
84+
}
85+
}
86+
}
87+
88+
class StreamSTest(streamBuilder: StreamsBuilder) extends StreamsBuilderS {
89+
override val inner = streamBuilder
90+
}

0 commit comments

Comments
 (0)