Skip to content

Commit a6bd8ef

Browse files
committed
updates for new post on machine learning with kafka streams.
1 parent 1428cfd commit a6bd8ef

20 files changed

+964
-10
lines changed

README.md

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#kafka-streams
22
This is the repository for the examples of using Kafka streams covered in the blog posts:
33

4-
* [Introducing The Kafka Processor API](codingjunkie.net/kafka-processor-part1/)
4+
* [Kafka Streams - The Processor API](http://codingjunkie.net/kafka-processor-part1/)
5+
* [Kafka Streams - The KStreams API](http://codingjunkie.net/kafka-streams-part2/)
6+
* [Machine Learning with Kafka Streams](http://codingjunkie.net/kafka-streams-machine-learning/)
7+
58

69
## Requirements to build this project
710

@@ -10,20 +13,20 @@ This is the repository for the examples of using Kafka streams covered in the bl
1013

1114
## Requirements to run the examples
1215

13-
1. [kafka](https://github.com/apache/kafka) version kafka_2.11-0.10.0.0-SNAPSHOT see the section marked "Running a task on a particular version of Scala"
16+
1. [kafka](https://github.com/apache/kafka) version kafka_2.11-0.10.1.0-SNAPSHOT see the section marked "Running a task on a particular version of Scala"
1417
2. The [json-data-generator](https://github.com/acesinc/json-data-generator) from [ACES,Inc](http://acesinc.net/)
1518

1619

1720
## Dependencies included in repository
1821
As the Kafka Streams/Processor API is a work in progress, this repo includes the following jar files known to work with the examples
1922

20-
1. kafka-streams-0.10.0.0-SNAPSHOT.jar
21-
2. kafka-clients-0.10.0.0-SNAPSHOT.jar
23+
1. kafka-streams-0.10.1.0-SNAPSHOT.jar
24+
2. kafka-clients-0.10.1.0-SNAPSHOT.jar
2225

2326
## Setup Instructions
2427

25-
#### Extact the kafka_2.11-0.10.0.0-SNAPSHOT.tgz file ####
26-
tar -xvzf kafka_2.11-0.10.0.0-SNAPSHOT.tgz
28+
#### Extact the kafka_2.11-0.10.1.0-SNAPSHOT.tgz file ####
29+
tar -xvzf kafka_2.11-0.10.1.0-SNAPSHOT.tgz
2730

2831
#### Install the Json-Data-Generator
2932
Download the latest [json-data-generator release](https://github.com/acesinc/json-data-generator/releases) and follow the install instructions [here](http://acesinc.net/introducing-a-streaming-json-data-generator/)
@@ -64,6 +67,13 @@ Start zookeeper and kafka
6467
java -jar json-data-generator-1.2.0 stock-transactions-config.json
6568
cd kafka-streams
6669
./gradlew runStockProcessor | runStockStreams
70+
71+
### Running the Twitter KStreams Language Classification Example ###
72+
rename src/main/resources/twitter-app.properties.template to twitter-app.properties
73+
fill out the properties file with all the required values
74+
75+
cd kafka-streams
76+
./gradlew runTwitterKstreamNLP
6777

6878
### Viewing the results of the purchase streaming examples ###
6979
cd kafka_install-dir/bin
@@ -72,4 +82,8 @@ Start zookeeper and kafka
7282
### Viewing the results of the stock-trading streaming examples ###
7383
cd kafka_install-dir/bin
7484
./kafka-console-consumer --topic [stocks-out|transaction-summary] --zookeeper localhost:2181
85+
86+
### Viewing the results of the Twitter KStreams Language Classification Example ###
87+
cd kafka_install-dir/bin
88+
./kafka-console-consumer --topic [english|french|spanish] --zookeeper localhost:2181
7589

bin/create-topics.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ KAFKA_HOME=$1
55
ZK_HOST=$2
66
ZK_PORT=$3
77

8-
topics="src-topic patterns rewards purchases stocks stocks-out transaction-summary"
8+
topics="src-topic patterns rewards purchases stocks stocks-out transaction-summary twitterData english french spanish"
99

1010
for topic in ${topics}; do
1111
echo "attempting to create topic ${topic}"

build.gradle

+9-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ task runStockStreams(type: JavaExec){
3232
main = "bbejeck.streams.stocks.StocksKafkaStreamsDriver"
3333
}
3434

35+
task runTwitterKstreamNLP(type: JavaExec) {
36+
classpath sourceSets.main.runtimeClasspath
37+
main = "bbejeck.streams.twitter.TwitterKStreamNLPDriver"
38+
}
39+
3540

3641

3742

@@ -51,6 +56,9 @@ dependencies {
5156
compile group: 'org.apache.commons', name: 'commons-lang3', version:'3.1'
5257
compile group: 'com.101tec', name:'zkclient', version:'0.7'
5358
compile group: 'com.yammer.metrics', name:'metrics-core', version:'2.2.0'
54-
compile files('libs/kafka-streams-0.10.0.0-SNAPSHOT.jar','libs/kafka-clients-0.10.0.0-SNAPSHOT.jar','libs/rocksdbjni-4.1.0.jar')
59+
compile group: 'com.twitter', name:'hbc-core', version:'2.2.0'
60+
compile group: 'de.julielab', name: 'aliasi-lingpipe', version:'4.1.0'
61+
compile group: 'org.apache.commons', name:'commons-lang3', version:'3.4'
62+
compile files('libs/kafka-streams-0.10.1.0-SNAPSHOT.jar','libs/kafka-clients-0.10.1.0-SNAPSHOT.jar','libs/rocksdbjni-4.1.0.jar')
5563
testCompile group: 'junit', name: 'junit', version:'4.11'
5664
}
-334 KB
Binary file not shown.
348 KB
Binary file not shown.

src/main/java/bbejeck/model/StockTransaction.java

+10
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,14 @@ public void setTimeStamp(Date timeStamp) {
7272
this.timeStamp = timeStamp;
7373
}
7474

75+
@Override
76+
public String toString() {
77+
return "StockTransaction{" +
78+
"symbol='" + symbol + '\'' +
79+
", type='" + type + '\'' +
80+
", shares=" + shares +
81+
", amount=" + amount +
82+
", timeStamp=" + timeStamp +
83+
'}';
84+
}
7585
}

src/main/java/bbejeck/model/StockTransactionCollector.java

+9
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,13 @@ public StockTransactionCollector add(StockTransaction transaction){
4242
return this;
4343
}
4444

45+
@Override
46+
public String toString() {
47+
return "StockTransactionCollector{" +
48+
"amount=" + amount +
49+
", tickerSymbol='" + tickerSymbol + '\'' +
50+
", sharesPurchased=" + sharesPurchased +
51+
", sharesSold=" + sharesSold +
52+
'}';
53+
}
4554
}
+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2016 Bill Bejeck
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package bbejeck.model;
18+
19+
/**
20+
* User: Bill Bejeck
21+
* Date: 4/21/16
22+
* Time: 8:38 PM
23+
*/
24+
public class Tweet {
25+
26+
private String id;
27+
private String text;
28+
private String language;
29+
30+
public String getId() {
31+
return id;
32+
}
33+
34+
public void setId(String id) {
35+
this.id = id;
36+
}
37+
38+
public String getText() {
39+
return text;
40+
}
41+
42+
public void setText(String text) {
43+
this.text = text;
44+
}
45+
46+
public String getLanguage() {
47+
return language;
48+
}
49+
50+
public void setLanguage(String language) {
51+
this.language = language;
52+
}
53+
54+
55+
@Override
56+
public String toString() {
57+
return "Tweet{" +
58+
"id='" + id + '\'' +
59+
", text='" + text + '\'' +
60+
", language='" + language + '\'' +
61+
'}';
62+
}
63+
}
+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2016 Bill Bejeck
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package bbejeck.nlp;
18+
19+
import com.aliasi.classify.Classification;
20+
import com.aliasi.classify.Classified;
21+
import com.aliasi.classify.DynamicLMClassifier;
22+
import com.aliasi.lm.NGramBoundaryLM;
23+
24+
import java.io.BufferedReader;
25+
import java.io.File;
26+
import java.io.FileInputStream;
27+
import java.io.IOException;
28+
import java.io.InputStreamReader;
29+
import java.util.ArrayList;
30+
import java.util.HashSet;
31+
import java.util.List;
32+
import java.util.Set;
33+
34+
/**
35+
* User: Bill Bejeck
36+
* Date: 4/20/16
37+
* Time: 10:01 PM
38+
*/
39+
public class Classifier {
40+
41+
private DynamicLMClassifier<NGramBoundaryLM> classifier;
42+
private int maxCharNGram = 3;
43+
private String trainingDataDelimiter;
44+
45+
public Classifier(String trainingDataDelimiter) {
46+
this.trainingDataDelimiter = trainingDataDelimiter;
47+
}
48+
49+
public Classifier(){
50+
this("#");
51+
}
52+
53+
public void train(File trainingData) {
54+
Set<String> categorySet = new HashSet<>();
55+
List<String[]> annotatedData = new ArrayList<>();
56+
fillCategoriesAndAnnotatedData(trainingData, categorySet, annotatedData);
57+
trainClassifier(categorySet, annotatedData);
58+
}
59+
60+
private void trainClassifier(Set<String> categorySet, List<String[]> annotatedData){
61+
String[] categories = categorySet.toArray(new String[0]);
62+
classifier = DynamicLMClassifier.createNGramBoundary(categories,maxCharNGram);
63+
for (String[] row: annotatedData) {
64+
String actualClassification = row[0];
65+
String text = row[1];
66+
Classification classification = new Classification(actualClassification);
67+
Classified<CharSequence> classified = new Classified<>(text,classification);
68+
classifier.handle(classified);
69+
}
70+
}
71+
72+
73+
private void fillCategoriesAndAnnotatedData(File trainingData,
74+
Set<String> categorySet,
75+
List<String[]> annotatedData) {
76+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(trainingData)))) {
77+
78+
String line = reader.readLine();
79+
while (line != null) {
80+
String[] data = line.split(trainingDataDelimiter);
81+
categorySet.add(data[0]);
82+
annotatedData.add(data);
83+
line = reader.readLine();
84+
}
85+
86+
} catch (IOException e){
87+
throw new RuntimeException(e);
88+
}
89+
}
90+
91+
92+
public String classify(String text){
93+
return classifier.classify(text.trim()).bestCategory().toLowerCase();
94+
}
95+
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2016 Bill Bejeck
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package bbejeck.nlp;
18+
19+
import com.aliasi.classify.Classification;
20+
import com.aliasi.classify.Classified;
21+
import com.aliasi.classify.DynamicLMClassifier;
22+
import com.aliasi.lm.NGramBoundaryLM;
23+
24+
import java.io.BufferedReader;
25+
import java.io.File;
26+
import java.io.FileInputStream;
27+
import java.io.InputStreamReader;
28+
import java.util.ArrayList;
29+
import java.util.HashSet;
30+
import java.util.List;
31+
import java.util.Set;
32+
33+
/**
34+
* User: Bill Bejeck
35+
* Date: 4/9/16
36+
* Time: 6:24 PM
37+
*/
38+
public class LingPipeTester {
39+
40+
public static void main(String[] args) throws Exception {
41+
File trainingData = new File("src/main/resources/kafkaStreamsTwitterTrainingData_clean.csv");
42+
int maxCharNGram = 3;
43+
44+
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(trainingData)));
45+
Set<String> categorySet = new HashSet<>();
46+
List<String[]> annotatedData = new ArrayList<>();
47+
String line = reader.readLine();
48+
while (line !=null){
49+
String[] data = line.split("#");
50+
categorySet.add(data[0]);
51+
annotatedData.add(data);
52+
line = reader.readLine();
53+
}
54+
System.out.println("read in all data");
55+
reader.close();
56+
String[] categories = categorySet.toArray(new String[0]);
57+
58+
DynamicLMClassifier<NGramBoundaryLM> classifier
59+
= DynamicLMClassifier.createNGramBoundary(categories,maxCharNGram);
60+
for (String[] row: annotatedData) {
61+
String truth = row[0];
62+
String text = row[1];
63+
Classification classification = new Classification(truth);
64+
Classified<CharSequence> classified = new Classified<>(text,classification);
65+
classifier.handle(classified);
66+
}
67+
System.out.println("training complete");
68+
69+
reader = new BufferedReader(new InputStreamReader(System.in));
70+
71+
System.out.println("enter text, I'll tell you the language");
72+
String text;
73+
while (!(text = reader.readLine()).equalsIgnoreCase("quit")) {
74+
Classification classification = classifier.classify(text);
75+
System.out.println("Entered -> " + text);
76+
System.out.println("lang -> " + classification.bestCategory());
77+
}
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2016 Bill Bejeck
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package bbejeck.processor.twitter;
18+
19+
import bbejeck.model.Tweet;
20+
import org.apache.kafka.streams.processor.AbstractProcessor;
21+
22+
/**
23+
* User: Bill Bejeck
24+
* Date: 4/21/16
25+
* Time: 9:10 PM
26+
*/
27+
public class TwitterClassificationProcessor extends AbstractProcessor<String, Tweet> {
28+
29+
30+
31+
@Override
32+
public void process(String s, Tweet tweet) {
33+
34+
}
35+
}

0 commit comments

Comments
 (0)