This is a sample that implements a custom Transform module used in the Kafka connector.
You can provision data from Source to Target through various pipeline channels of the Kafka streaming service platform. In particular, using Debezium as an open source for CDC (Change Data Capture), you can easily implement a streaming integration platform for various use cases.
git clone https://github.com/simplydemo/simplydemo-kafka-fullname-transform.git
mvn clean package -DskipTests=true
Below is the configuration of FullNameTransform, which merges the first_name and last_name column values of <target_table_name> and maps them to the full_name column value through io.debezium.connector.jdbc.JdbcSinkConnector
, one of the Kafka connectors, for messages received as my-topic.
{
"name": "sink-connector-mysql",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "my-topic",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.username": "<username>",
"connection.password": "<password>",
"table.name.format": "<target_table_name>",
"insert.mode": "upsert",
"schema.evolution": "basic",
"primary.key.mode": "record_key",
"primary.key.fields": "id",
"transforms": "fullName",
"transforms.fullName.type": "io.github.simplydemo.kafka.transforms.FullNameTransform"
}
}
- Copy the JAR file to the Kafka Connect plugin directory:
cp target/simplydemo-kafka-fullname-transform-0.0.1-SNAPSHOT-uber.jar /kafka/connect/plugins/
- Restart Kafka Connect: Restart the Kafka Connect worker to load the plugin
systemctl restart kafka-connect
- Verify plugin loaded: Check Kafka Connect logs to see if
FullNameTransform
is loaded
tail -f connect.log
For further reference, please consider the following sections: