Skip to content

Commit

Permalink
feat: Schema Registry Supoprt
Browse files Browse the repository at this point in the history
  • Loading branch information
1ambda committed Sep 16, 2023
1 parent 09ca127 commit 2ecdccc
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 84 deletions.
25 changes: 24 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ trino.shell:
airflow.shell:
docker exec -it airflow-scheduler /bin/bash

.PHONY: mysql.shell
mysql.shell:
mycli -u root -p admin │

.PHONY: debezium.register
debezium.register: debezium.register.customers debezium.register.products

.PHONY: debezium.register.customers
debezium.register.customers:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @docker/debezium/register.inventory_customers.json

.PHONY: debezium.register.products
debezium.register.products:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8084/connectors/ -d @docker/debezium/register.inventory_products.json

.PHONY: compose.trino
compose.trino:
COMPOSE_PROFILES=trino docker-compose up
Expand All @@ -36,14 +53,20 @@ compose.dbt:

.PHONY: compose.cdc
compose.cdc:
COMPOSE_PROFILES=flink,kafka docker-compose -f docker-compose.yml -f docker-compose-cdc.yml up
COMPOSE_PROFILES=kafka docker-compose -f docker-compose-cdc.yml up

.PHONY: compose.stream
compose.stream:
COMPOSE_PROFILES=flink,kafka docker-compose -f docker-compose.yml -f docker-compose-cdc.yml up

.PHONY: compose.clean
compose.clean:
@ echo ""
@ echo ""
@ echo "[$(TAG)] ($(shell date '+%H:%M:%S')) - Cleaning container volumes ('docker/volume')"
@ rm -rf docker/volume
@ docker container prune -f
@ docker volume prune -f
@ echo ""
@ echo ""

Expand Down
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Supported Data Pipeline Components
| [Hudi](https://hudi.apache.org/) | 0.13.1+ | Table Format (Lakehouse) |
| [Airflow](https://airflow.apache.org/) | 2.7+ | Scheduler |
| [Jupyterlab](https://jupyter.org/) | 3+ | Notebook |
| [Kafka](https://kafka.apache.org/) | 3.5+ | Messaging Broker |
| [Kafka](https://kafka.apache.org/) | 3.4+ | Messaging Broker |
| [Debezium](https://debezium.io/) | 2.3+ | CDC Connector |

<br/>
Expand All @@ -34,11 +34,16 @@ COMPOSE_PROFILES=airflow docker-compose up;
COMPOSE_PROFILES=trino,spark docker-compose up;

# for CDC environment (Kafka, ZK, Debezium)
COMPOSE_PROFILES=kafka docker-compose up;
make compose.clean compose.cdc

# for Stream environment (Kafka, ZK, Debezium + Flink)
make compose.clean compose.stream
```

Then access the lakehouse services.

- Kafka UI: http://localhost:8088
- Kafka Connect UI: http://localhost:8089
- Trino: http://localhost:8889
- Airflow (`airflow` / `airflow`) : http://localhost:8080
- Local S3 Minio (`minio` / `minio123`): http://localhost:9000
Expand All @@ -59,8 +64,10 @@ make compose.cdc;

# Register debezium mysql connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @docker/debezium/register-cdc-inventory.json
http://localhost:8083/connectors/ -d @docker/debezium/register-cdc-inventory-plain.json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @docker/debezium/register-avro-inventory_products.json
```

## DBT Starter kit
Expand Down
182 changes: 119 additions & 63 deletions docker-compose-cdc.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
x-kafka-common:
&kafka-common
# profiles: [ "kafka" ]
image: confluentinc/cp-kafka:7.5.0 # OSS Kafka 3.5.X
image: confluentinc/cp-kafka:7.4.1
environment:
&kafka-common-env
KAFKA_ZOOKEEPER_CONNECT: "zk:2181"
Expand All @@ -17,9 +17,52 @@ x-kafka-common:
zk:
condition: service_healthy

x-connect-common:
&connect-common
# profiles: [ "kafka" ]
build:
dockerfile: ./docker/debezium/Dockerfile
args:
DEBEZIUM_VERSION: 2.3.3.Final
CONFLUENT_VERSION: 7.4.1
image: 1ambda/lakehouse:debezium-connect-2.3.3.Final
environment:
&connect-common-env
# https://hub.docker.com/r/debezium/connect-base
BOOTSTRAP_SERVERS: kafka1:29092
GROUP_ID: cdc.inventory
CONFIG_STORAGE_TOPIC: connect-cluster.inventory.config
OFFSET_STORAGE_TOPIC: connect-cluster.inventory.offset
STATUS_STORAGE_TOPIC: connect-cluster.inventory.status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_PARTITIONS: 1
CONNECT_CONFIG_STORAGE_PARTITIONS: 1
CONNECT_OFFSET_STORAGE_PARTITIONS: 1
OFFSET_FLUSH_INTERVAL_MS: 10000
OFFSET_FLUSH_TIMEOUT_MS: 5000
SHUTDOWN_TIMEOUT: 30
LOG_LEVEL: INFO
CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: All
CONNECT_ACCESS_CONTROL_ALLOW_ORIGIN: "*"
CONNECT_ACCESS_CONTROL_ALLOW_METHODS: GET,POST,PUT,DELETE,OPTIONS
CONNECT_ACCESS_CONTROL_ALLOW_HEADERS: origin,content-type,accept,authorization
CONNECT_LISTENERS: http://0.0.0.0:8083
CONNECT_REST_PORT: 8083
# - HEAP_OPTS=-Xms2G -Xmx2G
# - CONNECT_PRODUCER_BUFFER_MEMORY=45554432
# - CONNECT_MAX_REQUEST_SIZE=10485760
depends_on:
&connect-common-depends
mysql:
condition: service_healthy
kafka1:
condition: service_healthy

x-zk-common:
&zk-common
image: confluentinc/cp-zookeeper:7.5.0
image: confluentinc/cp-zookeeper:7.4.1
# profiles: [ "kafka" ]
environment:
&zk-common-env
Expand All @@ -46,9 +89,30 @@ services:
timeout: 10s
retries: 3
restart: always
volumes:
- ./docker/volume/zookeeper/data:/var/lib/zookeeper/data
- ./docker/volume/zookeeper/log:/var/lib/zookeeper/log
# volumes:
# - ./docker/volume/zookeeper/data:/var/lib/zookeeper/data
# - ./docker/volume/zookeeper/log:/var/lib/zookeeper/log

kafka-schema-registry:
image: confluentinc/cp-schema-registry:7.4.1
hostname: kafka-schema-registry
container_name: kafka-schema-registry
ports:
- 8085:8085
depends_on:
- kafka1
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
# SCHEMA_REGISTRY_LISTENERS: http://kafka-schema-registry:8085
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
depends_on:
kafka1:
condition: service_healthy

kafka1:
<<: *kafka-common
Expand All @@ -57,10 +121,13 @@ services:
ports:
- "9092:9092"
- "29092:29092"
- "9997:9997"
environment:
<<: *kafka-common-env
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
healthcheck:
test: nc -z localhost 9092 || exit -1
start_period: 15s
Expand All @@ -70,60 +137,31 @@ services:
restart: always
depends_on:
*kafka-common-depends
volumes:
- ./docker/volume/kafka/kafka1-data:/var/lib/kafka/data
# volumes:
# - ./docker/volume/kafka/kafka1-data:/var/lib/kafka/data

kafka-ui:
# profiles: [ "kafka" ]
image: provectuslabs/kafka-ui
hostname: kafka-ui
container_name: kafka-ui
ports:
- "8085:8080"
- "8088:8080"
# 환경 변수 설정
environment:
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zk:2181
- KAFKA_CLUSTERS_0_METRICS_PORT=9997
- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://kafka-schema-registry:8085
- KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=connect-cdc
- KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=http://kafka-connect-1:8083,http://kafka-connect-2:8083
restart: always
depends_on:
zk:
condition: service_healthy
kafka1:
condition: service_healthy

# kafka2:
# <<: *kafka-common
# hostname: kafka2
# container_name: kafka2
# ports:
# - "9093:9093"
# - "29093:29093"
# environment:
# <<: *kafka-common-env
# KAFKA_BROKER_ID: 2
# KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
# depends_on:
# <<: *kafka-common-depends
# volumes:
# - ./docker/volume/kafka/kafka2-data:/var/lib/kafka/data
#
# kafka3:
# <<: *kafka-common
# hostname: kafka3
# container_name: kafka3
# ports:
# - "9094:9094"
# - "29094:29094"
# environment:
# <<: *kafka-common-env
# KAFKA_BROKER_ID: 3
# KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
# depends_on:
# <<: *kafka-common-depends
# volumes:
# - ./docker/volume/kafka/kafka3-data:/var/lib/kafka/data


####################################################################################################
# MySQL
Expand All @@ -145,41 +183,59 @@ services:
MYSQL_PASSWORD: "mysqlpw"
healthcheck:
test: [ "CMD-SHELL", "mysqladmin -u $$MYSQL_USER -p$$MYSQL_PASSWORD ping -h localhost || exit 1" ]
start_period: 15s
interval: 10s
timeout: 10s
start_period: 30s
interval: 15s
timeout: 15s
retries: 3
restart: always
volumes:
- ./docker/mysql/my.cnf:/etc/mysql/conf.d/my.cnf
- ./docker/mysql/entrypoint.sql:/docker-entrypoint-initdb.d/entrypoint.sql
- ./docker/volume/mysql/data:/var/lib/mysql
- ./docker/volume/mysql/logs/:/var/log/mysql
# - ./docker/volume/mysql/data:/var/lib/mysql
# - ./docker/volume/mysql/logs/:/var/log/mysql


####################################################################################################
# Kafka Producer: Debezium MySQL Connector
# Kafka Producer: Debezium MySQL Connector (Connect Distributed Mode)
####################################################################################################
kafka-producer:
# profiles: [ "kafka" ]
image: debezium/connect:2.3.3.Final
container_name: kafka-producer
hostname: kafka-producer

kafka-connect-1:
<<: *connect-common
container_name: kafka-connect-1
hostname: kafka-connect-1
ports:
- "8083:8083"
environment:
# https://hub.docker.com/r/debezium/connect-base
- BOOTSTRAP_SERVERS=kafka1:29092
- GROUP_ID=cdc.inventory
- CONFIG_STORAGE_TOPIC=cdc.inventory.connect.configs
- OFFSET_STORAGE_TOPIC=cdc.inventory.connect.offsets
- STATUS_STORAGE_TOPIC=cdc.inventory.connect.statuses
<<: *connect-common-env
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect-1
# KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8085
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8085
restart: always
depends_on:
kafka1:
condition: service_healthy
mysql:
condition: service_healthy
*connect-common-depends

kafka-connect-2:
<<: *connect-common
container_name: kafka-connect-2
hostname: kafka-connect-2
ports:
- "8084:8083"
environment:
<<: *connect-common-env
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect-2
# KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
# VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
# INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8085
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8085
restart: always
depends_on:
*connect-common-depends

# Configure Network
networks:
Expand Down
29 changes: 29 additions & 0 deletions docker/debezium/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
ARG DEBEZIUM_VERSION=2.3.3.Final
FROM debezium/connect:${DEBEZIUM_VERSION}

ARG CONFLUENT_VERSION=7.4.1
ARG DEBEZIUM_VERSION=2.3.3.Final

ENV AVRO_VERSION=1.11.0 \
GUAVA_VERSION=30.1.1-jre \
GUAVA_FAILURE_VERSION=1.0.1

USER root
COPY ./docker/debezium/maven-downloader.sh /usr/local/bin/docker-maven-download
RUN chmod +x /usr/local/bin/docker-maven-download

USER kafka
ENV MAVEN_DEP_DESTINATION=/kafka/libs
RUN docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" && \
docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" && \
docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" && \
docker-maven-download confluent kafka-schema-converter "$CONFLUENT_VERSION" && \
docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" && \
docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" && \
docker-maven-download confluent common-config "$CONFLUENT_VERSION" && \
docker-maven-download confluent common-utils "$CONFLUENT_VERSION" && \
docker-maven-download central org/apache/avro avro "$AVRO_VERSION" && \
docker-maven-download central com/google/guava guava "$GUAVA_VERSION" && \
docker-maven-download central com/google/guava failureaccess "$GUAVA_FAILURE_VERSION"

RUN ls -al /kafka/libs;
Loading

0 comments on commit 2ecdccc

Please sign in to comment.