title | description |
---|---|
Debezium |
Ingesting from Postgres tables with Debezium |
Debezium is a CDC (change data capture) tool that is widely used to dynamically respond to changes in databases like Postgres and MySQL. In this tutorial we'll show how to use Debezium as a source for Arroyo.
We will read off of the checkpoints
table in the Arroyo postgres database,
which stores information on checkpoint progress and history for Arroyo
pipelines.
For this tutorial, we will run Arroyo locally. To install Arroyo, see the getting started instructions.
By default, Arroyo uses a sqlite data when run locally. We'll instead have it use postgres. Ensure that postgres is installed and running, then we'll set up the Arroyo database by running
$ psql postgres -c "CREATE USER arroyo WITH PASSWORD 'arroyo' SUPERUSER;"
$ createdb arroyo
$ arroyo migrate
Next, we'll need to configure Postgres in order to enable Debezium to read the changes.
First, we'll need to enable logical decoding by adding the following to postgresql.conf
:
wal_level = logical
The location of this file varies depending on operating system and method of installation.
On Mac OS with Homebrew, it will be something like /opt/homebrew/var/postgresql@15/postgresql.conf
,
while on Ubuntu something like /etc/postgresql/14/main/postgresql.conf
.
Then restart Postgres to apply the changes:
```shell Ubuntu $ sudo systemctl restart postgresql ```$ brew services restart postgresql
Next, for the checkpoints
table, we need to enable the REPLICA IDENTITY
to FULL
:
$ psql -d arroyo \
-c "ALTER TABLE checkpoints REPLICA IDENTITY FULL;"
For this demo we can use the same arroyo
user that we created for the Arroyo services,
but in production it's recommended to create a separate user for Debezium.
Debezium requires Zookeeper, Kafka and some way to run Kafka Connect. There are many ways to do this, but for simplicity we recommend a single docker-compose file:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on:
- zookeeper
ports:
- 9094:9094
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:9092,INTERNAL://kafka:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
debezium:
image: debezium/connect:2.3
depends_on:
- kafka
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: kafka:9094
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_connect_configs
OFFSET_STORAGE_TOPIC: debezium_connect_offsets
STATUS_STORAGE_TOPIC: debezium_connect_statuses
Write that to a file (e.g. docker-compose.yml
) and run it with docker-compose up
.
Confirm that the three docker images are running via docker ps
and tailing their logs.
We can use curl
to create a debezium connection using the Debezium REST API.
First, create a file connector-config.json with contents
{
"name": "arroyo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "host.docker.internal",
"database.port": "5432",
"database.user": "arroyo",
"database.password": "arroyo",
"database.dbname": "arroyo",
"database.server.name": "arroyo",
"tombstones.on.delete": "false",
"table.include.list": "public.checkpoints",
"topic.prefix": "arroyo",
"plugin.name": "pgoutput",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Then, submit it to Debezium via
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors -d @connector-config.json
This should now emit Debezium messages for each change to the checkpoints
table to a topic named arroyo.public.checkpoints
.
This tutorial will involve two Arroyo pipelines: one to generate checkpoints data, and one that consumes the database records generated by the first's checkpoint activity.
First, we need to run Arroyo to use Postgres as its config store:
$ ARROYO__DATABASE__TYPE=postgres target/debug/arroyo cluster
We'll start by creating the pipeline that will produce checkpoints—which will form the input for our debezium data.
Open the Arroyo Pipelines UI at http://localhost:5115/pipelines/new, and create a new pipeline with the following SQL:
CREATE TABLE impulse (
counter BIGINT UNSIGNED NOT NULL,
subtask_index BIGINT UNSIGNED NOT NULL
)
WITH (
connector = 'impulse',
event_rate = '100'
);
select count(*)
from impulse
group by hop(interval '2 seconds', interval '10 seconds');
This creates an impulse source that generates 100 events per second, and then counts them in 10-second sliding window, although the details of the query are not relevant to this tutorial.
Click "Launch," give it a name, and click "Start."
On the job detail page, there's a "Checkpoints" tab; clicking into that will show the data that we'll be reading from the database.
Next we'll create a second pipeline that consumes the checkpointing data via Debezium.
CREATE TABLE checkpoints (
id BIGINT PRIMARY KEY,
organization_id TEXT,
job_id TEXT,
state_backend TEXT,
epoch INT,
min_epoch INT,
start_time TIMESTAMP NOT NULL,
finish_time TIMESTAMP,
state TEXT,
operators TEXT
) WITH (
connector = 'kafka',
bootstrap_servers = 'localhost:9092',
type = 'source',
topic = 'arroyo.public.checkpoints',
format = 'debezium_json',
'json.timestamp_format' = 'RFC3339'
);
SELECT * FROM checkpoints
This will merely dump out the data as three columns, "before"
, "after"
, and
"op"
, in line with the Debezium schema. However, we can also write more
complex queries over the source. For instance, consider the following query.
SELECT id, epoch, min_epoch, state FROM checkpoints where finish_time is null;
This will produce an updating dataset that shows all non-finished checkpoints as the state evolves.
In addition to reading Debezium streams as update tables, Arroyo also supports writing Debezium messages to Kafka as a sink. See the connector docs for more details.
Have more questions? File an issue on Github or join our Discord!