Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions external-schemas/java/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Overview

This directory includes examples of how Pulsar clients work with external schemas.

# Prerequisites

- Java 1.8 or higher version
- Kafka client 8.0.0 or higher version
- Maven
142 changes: 142 additions & 0 deletions external-schemas/java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>java</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<pulsar.version>4.1.0</pulsar.version>
<kafka.version>8.0.0</kafka.version>
<kafka.client.version>8.0.0-ccs</kafka.client.version>
<lombok.version>1.18.38</lombok.version>
<log4j2.version>2.20.0</log4j2.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-serializer</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.1.Final</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j2.version}</version>
</dependency>

<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>json-sKema</artifactId>
<version>0.23.0</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jsonSchema</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>io.streamnative.schemas.external</groupId>
<artifactId>kafka-json-schema</artifactId>
<version>1.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.examples.exschema.json;

import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig;

import java.util.HashMap;
import java.util.Map;

public class Configurations {

protected static final String TOKEN = "<YOUR-TOKEN>";
protected static final String PULSAR_SERVICE_URL = "<PULSAR-SERVICE-URL>";
private static final String SCHEMA_REGISTRY_URL = "<SCHEMA-REGISTRY-URL>";

public static Map<String, Object> getSchemaRegistryConfigs() {
var map = new HashMap<String, Object>();
map.put(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
map.put(KafkaJsonSchemaSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
map.put(
KafkaJsonSchemaSerializerConfig.USER_INFO_CONFIG,
String.format("%s:%s", "public", TOKEN));
return map;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.examples.exschema.json;

import io.streamnative.schemas.external.KafkaSchemaFactory;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;

import static io.streamnative.examples.exschema.json.Configurations.PULSAR_SERVICE_URL;
import static io.streamnative.examples.exschema.json.Configurations.TOKEN;
import static io.streamnative.examples.exschema.json.Configurations.getSchemaRegistryConfigs;

@Slf4j
public class ExternalJsonConsumer {

public void consume() throws Exception {
String topic = "testExternalJsonSchema";

@Cleanup
PulsarClient client =
PulsarClient.builder()
.serviceUrl(PULSAR_SERVICE_URL)
.authentication(AuthenticationFactory.token(TOKEN))
.build();

KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(getSchemaRegistryConfigs());
Schema<User> schema = kafkaSchemaFactory.json(User.class);

@Cleanup
Consumer<User> consumer =
client.newConsumer(schema)
.topic(topic)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

for (int i = 0; i < 10; i++) {
Message<User> message = consumer.receive();
consumer.acknowledge(message);
log.debug(
"receive msg {} {}",
message.getValue().getClass().getName(),
message.getValue());
}
}

public static void main(String[] args) {
try {
new ExternalJsonConsumer().consume();
} catch (Exception e) {
log.error("Failed to consume messages", e);
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.examples.exschema.json;

import io.streamnative.schemas.external.KafkaSchemaFactory;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

import static io.streamnative.examples.exschema.json.Configurations.PULSAR_SERVICE_URL;
import static io.streamnative.examples.exschema.json.Configurations.TOKEN;
import static io.streamnative.examples.exschema.json.Configurations.getSchemaRegistryConfigs;

@Slf4j
public class ExternalJsonProducer {

public void produce() throws Exception {
String topic = "testExternalJsonSchema";

KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(getSchemaRegistryConfigs());
Schema<User> schema = kafkaSchemaFactory.json(User.class);

@Cleanup
PulsarClient client =
PulsarClient.builder()
.serviceUrl(PULSAR_SERVICE_URL)
.authentication(AuthenticationFactory.token(TOKEN))
.build();

@Cleanup Producer<User> producer = client.newProducer(schema).topic(topic).create();

for (int i = 0; i < 10; i++) {
producer.send(new User("name-" + i, 10 + i));
}
}

public static void main(String[] args) {
try {
new ExternalJsonProducer().produce();
} catch (Exception e) {
log.error("Failed to produce messages", e);
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.examples.exschema.json;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {

private String name;
private Integer age;
}
18 changes: 18 additions & 0 deletions external-schemas/java/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t:%C@%L] %-5level - %msg%n" />
</Console>
</Appenders>
<Loggers>
<Root level="warn">
<AppenderRef ref="Console" />
</Root>
<Logger name="org.eclipse.jetty" level="info"/>
<Logger name="io.streamnative" level="info"/>
<Logger name="org.apache.pulsar" level="info"/>
<Logger name="org.apache.bookkeeper" level="info"/>
<Logger name="org.apache.kafka" level="info"/>
</Loggers>
</Configuration>