Skip to content

Commit c2d06ed

Browse files
committed
Update Lesson 2
1 parent 58855b0 commit c2d06ed

File tree

131 files changed

+793
-72
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

131 files changed

+793
-72
lines changed

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
- **5** [Топики](lesson-05)
88
- **6** [Мониторинг](lesson-06)
99
- **7** [Безопасность](lesson-07)
10-
- **8** [Producer API](lesson-08)
11-
- **9** [Consumer API](lesson-09)
12-
- **10** [Admin API](lesson-10)
10+
- **8** [Admin API](lesson-10)
11+
- **9** [Producer API](lesson-08)
12+
- **10** [Consumer API](lesson-09)
1313
- **11** [Транзакци](lesson-11)
1414
- **12** [Kafka Streams](lesson-12)
1515
- **18** [Kafka Connect](lesson-18)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Consumer
2+
Base Kafka Consumer on Clojure
3+
4+
## Usage
5+
$ java -jar consumer.jar
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
(defproject ru.vzaigrin.examples.kafka.base.clj.consumer "1.0"
2+
:description "Base Kafka Consumer on Clojure"
3+
:url "https://gitflic.ru/project/vzaigrin/examples/Kafka/Base/Clojure/consumer"
4+
:license {:name "Apache License, Version 2.0"
5+
:url "http://www.apache.org/licenses/LICENSE-2.0"}
6+
:dependencies [[org.clojure/clojure "1.11.1"],
7+
[org.apache.kafka/kafka-clients "3.4.0"]
8+
[ch.qos.logback/logback-classic "1.4.7"]
9+
[org.slf4j/slf4j-api "2.0.7"]]
10+
:main ^:skip-aot ru.vzaigrin.examples.kafka.base.clj.consumer
11+
:target-path "target/%s"
12+
:profiles {:uberjar {:aot :all
13+
:jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
(ns ru.vzaigrin.examples.kafka.base.clj.consumer
2+
(:gen-class)
3+
(:import
4+
(java.time Duration)
5+
(java.util Properties)
6+
(org.apache.kafka.clients.consumer ConsumerConfig KafkaConsumer)))
7+
8+
(defn -main
9+
"Base Kafka Consumer"
10+
[]
11+
(let [topic "test"
12+
props (doto (Properties.)
13+
(.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:29092")
14+
(.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.LongDeserializer")
15+
(.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringDeserializer")
16+
(.put ConsumerConfig/GROUP_ID_CONFIG "g1")
17+
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG "earliest")
18+
(.put ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG "false"))]
19+
(with-open [consumer (KafkaConsumer. props)]
20+
(.subscribe consumer [topic])
21+
(while true
22+
(doseq [msg (.poll consumer (Duration/ofSeconds 1))]
23+
(printf "%d\t%d\t%d\t%s\n"
24+
(.partition msg)
25+
(.offset msg)
26+
(.key msg)
27+
(.value msg)))))))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Producer
2+
Base Kafka Producer on Clojure
3+
4+
## Installation
5+
Download from https://gitflic.ru/project/vzaigrin/examples/Kafka/Base/Clojure/producer
6+
7+
## Usage
8+
$ java -jar producer-1.0-standalone.jar
9+
10+
## License
11+
Copyright © 2023 Vadim Zaigrin
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
(defproject ru.vzaigrin.examples.kafka.base.clj.producer "1.0"
2+
:description "Base Kafka Producer on Clojure"
3+
:url "https://gitflic.ru/project/vzaigrin/examples/Kafka/Base/Clojure/producer"
4+
:license {:name "Apache License, Version 2.0"
5+
:url "http://www.apache.org/licenses/LICENSE-2.0"}
6+
:dependencies [[org.clojure/clojure "1.11.1"],
7+
[org.apache.kafka/kafka-clients "3.4.0"]
8+
[ch.qos.logback/logback-classic "1.4.7"]
9+
[org.slf4j/slf4j-api "2.0.7"]]
10+
:main ^:skip-aot ru.vzaigrin.examples.kafka.base.clj.producer
11+
:target-path "target/%s"
12+
:profiles {:uberjar {:aot :all
13+
:jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
(ns ru.vzaigrin.examples.kafka.base.clj.producer
2+
(:gen-class)
3+
(:import
4+
(java.util Properties)
5+
(org.apache.kafka.clients.producer KafkaProducer ProducerConfig ProducerRecord)))
6+
7+
(defn -main
8+
"Base Kafka Producer"
9+
[]
10+
(let [topic "test"
11+
props (doto (Properties.)
12+
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:9092")
13+
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.InetgerSerializer")
14+
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))]
15+
(with-open [producer (KafkaProducer. props)]
16+
(doseq [i (range 1 1001)] (.send producer (ProducerRecord. topic i (str "Message " i))))
17+
(.flush producer))))
13.8 MB
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>ru.example.kafka</groupId>
8+
<artifactId>Consumer</artifactId>
9+
<version>1.0</version>
10+
11+
<properties>
12+
<maven.compiler.source>17</maven.compiler.source>
13+
<maven.compiler.target>17</maven.compiler.target>
14+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15+
</properties>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>org.apache.kafka</groupId>
20+
<artifactId>kafka-clients</artifactId>
21+
<version>3.4.0</version>
22+
</dependency>
23+
<dependency>
24+
<groupId>ch.qos.logback</groupId>
25+
<artifactId>logback-core</artifactId>
26+
<version>1.4.7</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>ch.qos.logback</groupId>
30+
<artifactId>logback-classic</artifactId>
31+
<version>1.4.7</version>
32+
</dependency>
33+
</dependencies>
34+
35+
<build>
36+
<plugins>
37+
<plugin>
38+
<artifactId>maven-compiler-plugin</artifactId>
39+
<version>3.11.0</version>
40+
</plugin>
41+
</plugins>
42+
</build>
43+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package ru.example.kafka;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.apache.kafka.clients.consumer.ConsumerRecords;
6+
import org.apache.kafka.clients.consumer.KafkaConsumer;
7+
import org.apache.kafka.common.errors.WakeupException;
8+
import org.apache.kafka.common.serialization.IntegerDeserializer;
9+
import org.apache.kafka.common.serialization.StringDeserializer;
10+
import java.time.Duration;
11+
import java.util.Collections;
12+
import java.util.Properties;
13+
14+
public class Consumer {
15+
public static void main(String[] args) {
16+
final String BOOTSTRAP_SERVERS = "localhost:9092";
17+
final String topic = "test";
18+
final String GROUP_ID = "g1";
19+
final String OFFSET_RESET = "earliest";
20+
21+
Properties props = new Properties();
22+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
23+
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
24+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
25+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
26+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
27+
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
28+
29+
try {
30+
consumer.subscribe(Collections.singleton(topic));
31+
while (true) {
32+
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
33+
for (ConsumerRecord<Integer, String> data : records) {
34+
String message = String.format("%d\t%d\t%d\t%s", data.partition(), data.offset(), data.key(), data.value());
35+
System.out.println(message);
36+
}
37+
}
38+
} catch (WakeupException e) {
39+
System.out.println(e.getLocalizedMessage());
40+
} finally {
41+
consumer.close();
42+
}
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Manifest-Version: 1.0
2+
Main-Class: ru.example.kafka.Consumer
3+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<configuration>
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<!-- encoders are assigned the type
5+
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
6+
<encoder>
7+
<pattern>%d{dd.MM.yyyy HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n</pattern>
8+
</encoder>
9+
</appender>
10+
11+
<root level="info">
12+
<appender-ref ref="STDOUT" />
13+
</root>
14+
</configuration>
13.9 MB
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>ru.example.kafka</groupId>
8+
<artifactId>Producer</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>17</maven.compiler.source>
13+
<maven.compiler.target>17</maven.compiler.target>
14+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15+
</properties>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>org.apache.kafka</groupId>
20+
<artifactId>kafka-clients</artifactId>
21+
<version>3.4.0</version>
22+
</dependency>
23+
<dependency>
24+
<groupId>ch.qos.logback</groupId>
25+
<artifactId>logback-core</artifactId>
26+
<version>1.4.7</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>ch.qos.logback</groupId>
30+
<artifactId>logback-classic</artifactId>
31+
<version>1.4.7</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.slf4j</groupId>
35+
<artifactId>slf4j-simple</artifactId>
36+
<version>2.0.5</version>
37+
</dependency>
38+
</dependencies>
39+
40+
<build>
41+
<plugins>
42+
<plugin>
43+
<artifactId>maven-compiler-plugin</artifactId>
44+
<version>3.11.0</version>
45+
</plugin>
46+
</plugins>
47+
</build>
48+
49+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package ru.example.kafka;
2+
3+
import org.apache.kafka.clients.producer.*;
4+
import org.apache.kafka.common.serialization.IntegerSerializer;
5+
import org.apache.kafka.common.serialization.StringSerializer;
6+
import java.util.Properties;
7+
import java.util.concurrent.TimeUnit;
8+
9+
public class Producer {
10+
public static void main(String[] args) throws InterruptedException {
11+
final String BOOTSTRAP_SERVERS = "localhost:9092";
12+
String topic = "test";
13+
14+
Properties props = new Properties();
15+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
16+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
17+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
18+
org.apache.kafka.clients.producer.Producer<Integer, String> producer = new KafkaProducer<>(props);
19+
20+
try {
21+
for (int i = 0; i < 1000; i++) {
22+
ProducerRecord<Integer, String> data = new ProducerRecord<>(topic, i, "Message " + i);
23+
producer.send(data);
24+
System.out.println("Send: Message " + i);
25+
TimeUnit.SECONDS.sleep(2);
26+
}
27+
} finally {
28+
producer.flush();
29+
producer.close();
30+
}
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Manifest-Version: 1.0
2+
Main-Class: ru.example.kafka.Producer
3+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<configuration>
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<!-- encoders are assigned the type
5+
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
6+
<encoder>
7+
<pattern>%d{dd.MM.yyyy HH:mm:ss.SSS} [%thread] %-5level %logger{20} - %msg%n</pattern>
8+
</encoder>
9+
</appender>
10+
11+
<root level="info">
12+
<appender-ref ref="STDOUT" />
13+
</root>
14+
</configuration>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
ThisBuild / organization := "ru.example"
2+
ThisBuild / version := "1.0"
3+
ThisBuild / scalaVersion := "2.13.10"
4+
5+
ThisBuild / libraryDependencies ++= Seq(
6+
"org.apache.kafka" % "kafka-clients" % "3.4.0",
7+
"ch.qos.logback" % "logback-classic" % "1.4.7"
8+
)
9+
10+
lazy val root = (project in file("."))
11+
.settings(name := "Consumer")
12+
.settings(assembly / mainClass := Some("ru.example.kafka.Consumer"))
13+
.settings(assembly / assemblyJarName := "consumer.jar")
14+
.settings(assembly / assemblyMergeStrategy := {
15+
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
16+
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
17+
case "module-info.class" => MergeStrategy.first
18+
case "version.conf" => MergeStrategy.discard
19+
case "reference.conf" => MergeStrategy.concat
20+
case x: String if x.contains("UnusedStubClass.class") => MergeStrategy.first
21+
case _ => MergeStrategy.first
22+
})
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?xml version = "1.0" encoding="UTF-8"?>
2+
<configuration>
3+
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
4+
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
5+
<pattern>%-6relative %-5level %logger {35} - %msg \n</pattern>
6+
</encoder>
7+
</appender>
8+
9+
<logger name="fr.ps.eng" level="${APP_LOG_LVL:-INFO}" additivity="false">
10+
<appender-ref ref="console"/>
11+
</logger>
12+
13+
<logger name="org.apache.kafka" level="WARN" additivity="false">
14+
<appender-ref ref="console"/>
15+
</logger>
16+
17+
<logger name="io.confluent" level="WARN" additivity="false">
18+
<appender-ref ref="console"/>
19+
</logger>
20+
21+
<root level="${ROOT_LOG_LVL:-DEBUG}">
22+
<appender-ref ref="console"/>
23+
</root>
24+
25+
</configuration>

0 commit comments

Comments
 (0)