-
Notifications
You must be signed in to change notification settings - Fork 9
Add Debezium CDC examples for Postgres & SQL Server & MySQL #93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bobbyiliev
wants to merge
9
commits into
main
Choose a base branch
from
postgres-debezium-cdc-demo
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 3 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
b1fa445
Add Debezium + Postgres CDC example
bobbyiliev 5d5a0cd
Add SQL Server + Debezium v2.x example
bobbyiliev bebd32b
Add MySQL + Debezium v2 demo
bobbyiliev 56bb346
Minor changes
bobbyiliev 9175af5
Update integrations/debezium/mysql/README.md
bobbyiliev db51ea1
Update integrations/debezium/postgres/README.md
bobbyiliev e4f7ddd
Update integrations/debezium/sqlserver/README.md
bobbyiliev 522e808
WIP MongoDB guide
bobbyiliev 2d6153e
Merge branch 'postgres-debezium-cdc-demo' of https://github.com/Mater…
bobbyiliev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| # Materialize + Debezium Examples | ||
|
|
||
| ## Overview | ||
|
|
||
| This is a collection of demos that show how to use the [Debezium](https://materialize.com/docs/ingest-data/debezium/) connector with Materialize. | ||
|
|
||
| ## Demos | ||
|
|
||
| | Demo | Description | Materialize Docs | | ||
| | ---------------------------------- | --------------------------------------------------------------------------- | ------------------------------------------------------------------------------------ | | ||
| | [Postgres](postgres) | Connect to a Postgres database and stream changes to Kafka/Redpanda | [Postgres](https://materialize.com/docs/ingest-data/cdc-postgres-kafka-debezium/) | | ||
| | [MySQL](mysql) | Connect to a MySQL database and stream changes to Kafka/Redpanda | [MySQL](https://materialize.com/docs/ingest-data/cdc-mysql/) | | ||
| | [SQL server](sqlserver) | Connect to a SQL server database and stream changes to Kafka/Redpanda | TODO | | ||
| <!-- | TODO: [MongoDB](mongodb) | Connect to a MongoDB database and stream changes to Kafka/Redpanda | TODO | --> | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| - [Docker](https://docs.docker.com/get-docker/) | ||
| - [Docker Compose](https://docs.docker.com/compose/install/) | ||
| - [Materialize](https://console.materialize.com/) account | ||
|
|
||
| ## Running the demos | ||
|
|
||
| For each demo, follow the instructions in the demo's README. All demos assume that you have `psql`, `docker` and a publicly accessible Linux environment. | ||
|
|
||
| ## Notes | ||
|
|
||
| Beginning with Debezium 2.0.0, Confluent Schema Registry support is not included in the Debezium containers. To enable the Confluent Schema Registry for a Debezium container, install the following Confluent Avro converter JAR files into the Connect plugin directory: | ||
|
|
||
| - `kafka-connect-avro-converter` | ||
|
|
||
| - `kafka-connect-avro-data` | ||
|
|
||
| - `kafka-avro-serializer` | ||
|
|
||
| - `kafka-schema-serializer` | ||
|
|
||
| - `kafka-schema-registry-client` | ||
|
|
||
| - `common-config` | ||
|
|
||
| - `common-utils` |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| # Debezium + MySQL + Materialize | ||
|
|
||
| Before trying this out, you will need the following: | ||
|
|
||
| - [Materialize account](https://materialize.com/register/). | ||
| - A publicly accessible Linux server with [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed. | ||
|
|
||
| ## Running the demo | ||
|
|
||
| If you want to try it right now, follow these steps: | ||
|
|
||
| 1. Clone the project on your Linux server and run: | ||
|
|
||
| ```shell session | ||
| git clone https://github.com/MaterializeInc/demos.git | ||
| cd demos/integrations/debezium/mysql | ||
| ``` | ||
|
|
||
| 1. After cloning the project, you will need to set the `EXTERNAL_IP` environment variable to the IP address of your Linux server. For example: | ||
|
|
||
| ```shell session | ||
| export EXTERNAL_IP=$(hostname -I | awk '{print $1}') | ||
|
|
||
| # Check the value of EXTERNAL_IP | ||
| echo $EXTERNAL_IP | ||
| ``` | ||
|
|
||
| 1. Bring up the Docker Compose containers in the background. | ||
|
|
||
| ```shell session | ||
| docker compose up -d --build | ||
| ``` | ||
|
|
||
| **This may take several minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from MySQL into Redpanda. | ||
|
|
||
| 1. Confirm that everything is running as expected: | ||
|
|
||
| ```shell session | ||
| docker compose ps | ||
| ``` | ||
|
|
||
| 1. Exec in to the redpanda container to look around using redpanda's amazing [rpk](https://docs.redpanda.com/docs/reference/rpk/) CLI. | ||
|
|
||
| ```shell session | ||
| docker compose exec redpanda /bin/bash | ||
|
|
||
| rpk debug info | ||
|
|
||
| rpk topic list | ||
| ``` | ||
| 1. Connect to Materialize | ||
|
|
||
| If you already have `psql` installed on your machine, use the provided connection string to connect: | ||
|
|
||
| Example: | ||
|
|
||
| ```shell session | ||
| psql "postgres://user%40domain.com@materialize_host:6875/materialize" | ||
| ``` | ||
|
|
||
| Otherwise, you can find the steps to install and use your CLI of choice under [Supported tools](https://materialize.com/docs/integrations/sql-clients/#supported-tools). | ||
|
|
||
| 1. Now that you're in the Materialize CLI, define the connection to the Redpanda broker and the schema registry: | ||
bobbyiliev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| ```sql | ||
| -- Create Redpanda connection | ||
| CREATE CONNECTION redpanda_connection | ||
| TO KAFKA ( | ||
| BROKER '<your_server_ip:9092>'); | ||
|
|
||
| -- Create Registry connection | ||
| CREATE CONNECTION schema_registry | ||
| TO CONFLUENT SCHEMA REGISTRY ( | ||
| URL 'http://<your_server_ip:8081>'); | ||
| ``` | ||
|
|
||
| 1. Next, define all of the tables in `demo` as sources: | ||
|
|
||
| ```sql | ||
| CREATE SOURCE users | ||
| FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.users') | ||
| FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry | ||
| ENVELOPE DEBEZIUM | ||
| WITH (SIZE = '3xsmall'); | ||
|
|
||
| CREATE SOURCE roles | ||
| FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.roles') | ||
| FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry | ||
| ENVELOPE DEBEZIUM | ||
| WITH (SIZE = '3xsmall'); | ||
|
|
||
| CREATE SOURCE reviews | ||
| FROM KAFKA CONNECTION redpanda_connection (TOPIC 'mysql_repl.demo.reviews') | ||
| FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry | ||
| ENVELOPE DEBEZIUM | ||
| WITH (SIZE = '3xsmall'); | ||
| ``` | ||
|
|
||
| Because the three sources are pulling message schema data from the registry, materialize knows the column types to use for each attribute. | ||
|
|
||
| 1. Create a materialized view that has only the VIP users: | ||
|
|
||
| ```sql | ||
| CREATE MATERIALIZED VIEW vip_users AS | ||
| SELECT | ||
| users.id, | ||
| users.name, | ||
| users.email, | ||
| users.role_id, | ||
| roles.name AS role_name | ||
| FROM users | ||
| JOIN roles ON users.role_id = roles.id | ||
| WHERE users.role_id = 4; | ||
| ``` | ||
|
|
||
| 1. Create a materialized view that has only the bad reviews: | ||
|
|
||
| ```sql | ||
| CREATE MATERIALIZED VIEW bad_reviews AS | ||
| SELECT | ||
| reviews.user_id, | ||
| reviews.comment, | ||
| reviews.rating, | ||
| reviews.created_at, | ||
| reviews.updated_at | ||
| FROM reviews | ||
| WHERE reviews.rating < 4; | ||
| ``` | ||
|
|
||
| 1. Create a materialized view that filters all VIP users with bad reviews: | ||
|
|
||
| ```sql | ||
| CREATE MATERIALIZED VIEW vip_users_with_bad_reviews AS | ||
| SELECT | ||
| vip_users.name, | ||
| vip_users.email, | ||
| vip_users.role_name, | ||
| bad_reviews.rating, | ||
| bad_reviews.comment | ||
| FROM vip_users | ||
| JOIN bad_reviews ON vip_users.id = bad_reviews.user_id; | ||
| ``` | ||
|
|
||
| 1. Query the materialized view: | ||
|
|
||
| ```sql | ||
| SELECT * FROM vip_users_with_bad_reviews; | ||
| ``` | ||
|
|
||
| Or use the `SUBSCRIBE` command to stream the results: | ||
|
|
||
| ```sql | ||
| COPY (SUBSCRIBE vip_users_with_bad_reviews) TO STDOUT; | ||
| ``` | ||
|
|
||
| ## Cleanup | ||
|
|
||
| To stop the services and remove the containers, run: | ||
|
|
||
| ```shell session | ||
| docker compose down | ||
| ``` | ||
|
|
||
| In Materialize, run: | ||
|
|
||
| ```sql | ||
| DROP CONNECTION redpanda_connection CASCADE; | ||
| DROP CONNECTION schema_registry CASCADE; | ||
| ``` | ||
|
|
||
| ## Helpful resources: | ||
|
|
||
| * [`CREATE CONNECTION`](https://materialize.com/docs/sql/create-connection/) | ||
| * [`MySQL CDC using Kafka and Debezium`](https://materialize.com/docs/ingest-data/cdc-mysql/) | ||
| * [`CREATE MATERIALIZED VIEW`](https://materialize.com/docs/sql/create-materialized-view) | ||
|
|
||
| ## Community | ||
|
|
||
| If you have any questions or comments, please join the [Materialize Slack Community](https://materialize.com/s/chat)! | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| FROM debezium/connect-base:2.4 | ||
|
|
||
| # | ||
| # Set up the plugins directory ... | ||
| # | ||
| ENV CONFLUENT_VERSION=7.0.1 \ | ||
| AVRO_VERSION=1.10.1 \ | ||
| GUAVA_VERSION=31.0.1-jre | ||
|
|
||
| RUN docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" fd03a1436f29d39e1807e2fb6f8e415a && \ | ||
| docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" d27f30e9eca4ef1129289c626e9ce1f1 && \ | ||
| docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" c72420603422ef54d61f493ca338187c && \ | ||
| docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" 9c510db58119ef66d692ae172d5b1204 && \ | ||
| docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" 7449df1f5c9a51c3e82e776eb7814bf1 && \ | ||
| docker-maven-download confluent common-config "$CONFLUENT_VERSION" aab5670de446af5b6f10710e2eb86894 && \ | ||
| docker-maven-download confluent common-utils "$CONFLUENT_VERSION" 74bf5cc6de2748148f5770bccd83a37c && \ | ||
| docker-maven-download central org/apache/avro avro "$AVRO_VERSION" 35469fee6d74ecbadce4773bfe3a204c && \ | ||
| docker-maven-download central com/google/guava guava "$GUAVA_VERSION" bb811ca86cba6506cca5d415cd5559a7 | ||
|
|
||
| # https://github.com/debezium/container-images/blob/main/connect/2.4/Dockerfile | ||
| LABEL maintainer="Debezium Community" | ||
|
|
||
| ENV DEBEZIUM_VERSION="2.4.0.Final" \ | ||
| MAVEN_REPO_CENTRAL="" \ | ||
| MAVEN_REPOS_ADDITIONAL="" \ | ||
| MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_PLUGINS_DIR \ | ||
| MONGODB_MD5=a22784387e0ec8a6abb1606c2c365cb2 \ | ||
| MYSQL_MD5=4bff262afc9678f5cbc3be6315b8e71e \ | ||
| POSTGRES_MD5=b42c9e208410f39ad1ad09778b1e3f03 \ | ||
| SQLSERVER_MD5=9b8bf3c62a7c22c465a32fa27b3cffb5 \ | ||
| ORACLE_MD5=21699814400860457dc2334b165882e6 \ | ||
| DB2_MD5=0727d7f2d1deeacef39e230acac835a8 \ | ||
| SPANNER_MD5=186b07595e914e9139941889fd675044 \ | ||
| VITESS_MD5=3b4d24c8c9898df060c408a13fd3429f \ | ||
| JDBC_MD5=77c5cb9adf932ab17c041544f4ade357 \ | ||
| KCRESTEXT_MD5=25c0353f5a7304b3c4780a20f0f5d0af \ | ||
| SCRIPTING_MD5=53a3661e7a9877744f4a30d6483d7957 | ||
|
|
||
| RUN docker-maven-download debezium mongodb "$DEBEZIUM_VERSION" "$MONGODB_MD5" && \ | ||
| docker-maven-download debezium mysql "$DEBEZIUM_VERSION" "$MYSQL_MD5" && \ | ||
| docker-maven-download debezium postgres "$DEBEZIUM_VERSION" "$POSTGRES_MD5" && \ | ||
| docker-maven-download debezium sqlserver "$DEBEZIUM_VERSION" "$SQLSERVER_MD5" && \ | ||
| docker-maven-download debezium oracle "$DEBEZIUM_VERSION" "$ORACLE_MD5" && \ | ||
| docker-maven-download debezium-additional db2 db2 "$DEBEZIUM_VERSION" "$DB2_MD5" && \ | ||
| docker-maven-download debezium-additional jdbc jdbc "$DEBEZIUM_VERSION" "$JDBC_MD5" && \ | ||
| docker-maven-download debezium-additional spanner spanner "$DEBEZIUM_VERSION" "$SPANNER_MD5" && \ | ||
| docker-maven-download debezium-additional vitess vitess "$DEBEZIUM_VERSION" "$VITESS_MD5" && \ | ||
| docker-maven-download debezium-optional connect-rest-extension "$DEBEZIUM_VERSION" "$KCRESTEXT_MD5" && \ | ||
| docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| FROM ubuntu:latest | ||
|
|
||
| RUN apt-get update && apt-get -qy install curl mysql-client | ||
|
|
||
| COPY . /deploy | ||
|
|
||
| COPY docker-entrypoint.sh /usr/local/bin | ||
| RUN chmod 777 /usr/local/bin/docker-entrypoint.sh | ||
|
|
||
| ENTRYPOINT ["docker-entrypoint.sh"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| #!/bin/bash | ||
|
|
||
| set -euo pipefail | ||
|
|
||
| cd /deploy | ||
|
|
||
| bash mysql_dbz.sh |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| #!/bin/bash | ||
|
|
||
| #Initialize Debezium (Kafka Connect Component) | ||
|
|
||
| while true; do | ||
| echo "Waiting for Debezium to be ready" | ||
| sleep 0.1 | ||
| curl -s -o /dev/null -w "%{http_code}" http://debezium:8083/connectors/ | grep 200 | ||
| if [ $? -eq 0 ]; then | ||
| echo "Debezium is ready" | ||
| break | ||
| fi | ||
| done | ||
|
|
||
| # Read the JSON file and register the connector and change the ${EXTERNAL_IP} with the external IP environment variable | ||
| sed -i "s/EXTERNAL_IP/${EXTERNAL_IP}/g" /deploy/register-mysql.json | ||
|
|
||
| curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium:8083/connectors/ -d @/deploy/register-mysql.json | ||
|
|
||
| if [ $? -eq 0 ]; then | ||
| echo "Debezium connector registered" | ||
| else | ||
| echo "Debezium connector registration failed" | ||
| exit 1 | ||
| fi | ||
|
|
||
| ## | ||
| # Reviews generation mock script | ||
| # Table details: | ||
| # - name: reviews | ||
| # - columns: | ||
| # - id | ||
| # - user_id | ||
| # - review_text | ||
| # - review_rating | ||
| # - created_at | ||
| # - updated_at | ||
| ## | ||
|
|
||
| # Start generating reviews | ||
| # Start generating reviews | ||
bobbyiliev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # Start generating reviews | ||
| echo "Generating reviews..." | ||
| id=1 | ||
| while [[ true ]] ; do | ||
|
|
||
| # Define variables | ||
| user_role=$(seq 1 4 | sort -R | head -n1) | ||
| review_rating=$(seq 1 10 | sort -R | head -n1) | ||
| review_text="Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." | ||
|
|
||
| echo "Generating review for user_id: ${id}" | ||
|
|
||
| # Generate users | ||
| # Assuming you have added name and role_id columns to your demo.users table | ||
| mysql -h mysql -u mysqluser -pmysqlpwd -e "INSERT INTO demo.users (id, name, email, role_id) VALUES ( ${id}, 'user${id}', 'user${id}@demo.com', ${user_role} );" 2> /dev/null | ||
|
|
||
| # Generate reviews | ||
| # Assuming you have updated column names in your demo.reviews table | ||
| mysql -h mysql -u mysqluser -pmysqlpwd -e "INSERT INTO demo.reviews (user_id, comment, rating, created_at, updated_at) VALUES ( ${id}, '${review_text}', ${review_rating}, NOW(), NOW() );" 2> /dev/null | ||
|
|
||
| # Increment id | ||
| ((id=id+1)) | ||
|
|
||
| # Sleep for 1 second | ||
| sleep 1 | ||
|
|
||
| done | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| { | ||
| "name": "mysql-connector", | ||
| "config": { | ||
| "connector.class": "io.debezium.connector.mysql.MySqlConnector", | ||
| "tasks.max": "1", | ||
| "database.hostname": "mysql", | ||
| "database.port": "3306", | ||
| "database.user": "debezium", | ||
| "database.password": "mysqlpwd", | ||
| "database.server.id":"223344", | ||
| "topic.prefix": "mysql_repl", | ||
| "database.include.list": "demo", | ||
| "database.history.kafka.topic":"mysql_repl.history", | ||
| "database.history.kafka.bootstrap.servers":"EXTERNAL_IP:9092", | ||
| "schema.history.internal.kafka.bootstrap.servers": "EXTERNAL_IP:9092", | ||
| "schema.history.internal.kafka.topic": "mysql_repl.internal.history", | ||
| "key.converter": "io.confluent.connect.avro.AvroConverter", | ||
| "value.converter": "io.confluent.connect.avro.AvroConverter", | ||
| "key.converter.schema.registry.url": "http://redpanda:8081", | ||
| "value.converter.schema.registry.url": "http://redpanda:8081", | ||
| "include.schema.changes": false | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.