Skip to content

8. Change data capture

Enes Hoxha edited this page Jan 18, 2025 · 3 revisions

8. Change Data Capture (CDC)

Change Data Capture (CDC) is a critical pattern in modern data streaming systems that allows applications to react to changes (insert, update, delete) in data sources in real-time. Within the Cortex streaming framework, CDC is implemented as a source operator that integrates with various database systems to capture and emit change events into a stream processing pipeline.

Cortex is designed to support CDC from multiple systems such as Microsoft SQL Server, PostgreSQL, MongoDB, etc. Cortex implements CDC through specialized Source Operators that connect to databases, detect changes, and emit them as stream events for downstream processing.

8.1. What is Change Data Capture?

Change Data Capture is a design pattern that enables the capture and tracking of changes in a data store. By monitoring data modifications:

  • CDC captures inserts, updates, and deletes on tables.
  • It emits change events that can be processed by downstream applications in near real-time.
  • It supports use cases like real-time analytics, data replication, synchronization between heterogeneous systems, and event-driven architectures.

8.2. CDC in Cortex: General Overview

Cortex implements CDC as part of its streaming framework through specialized source operators. These operators:

  • Connect to specific data sources.
  • Detect and capture changes as they occur.
  • Emit change events into the Cortex stream for further processing, such as filtering, mapping, aggregation, or routing to sinks.

8.2.1. Key Features of Cortex CDC Operators

  • Real-Time Ingestion: Captures and streams data changes as they occur in the source system.
  • Optional Initial Load: Optionally performs a one-time full read of the target data before streaming incremental changes.
  • Stateful Processing: Maintains checkpointing and state to ensure exactly-once or at-least-once processing semantics.
  • Duplicate Handling: Implements mechanisms such as record hashing to filter out duplicate events.
  • Telemetry Integration: Provides observability through integrated metrics and tracing.
  • Modular Design: The architecture allows for easy extension to support additional systems like PostgreSQL, MongoDB, Oracle, etc.

8.3. Using CDC in Cortex: Microsoft SQL Server

The Microsoft SQL Server CDC Source Operator is Cortex's current implementation of Change Data Capture. It allows Cortex to stream data changes from a SQL Server database into a processing pipeline in real-time.

8.3.1. Overview

The SqlServerCDCSourceOperator integrates with Microsoft SQL Server to capture row-level data changes. Under the hood, it leverages SQL Server’s built-in Change Data Capture (CDC) to track inserts, updates, and deletes.

Key Features

  • Optional Automatic CDC Enablement: When ConfigureCDCInServer = true, the operator attempts to enable database/table-level CDC if not already enabled.
  • Initial Load: If DoInitialLoad = true, the operator performs a single pass over the table before streaming incremental changes.
  • Polling Interval: Configurable via PullInterval, controlling how frequently changes are polled from the CDC tables.
  • Checkpointing: Stores last processed LSN (Log Sequence Number) plus a last-record hash to prevent duplicate emissions.
  • Error Handling: Uses exceptions with exponential back-off to avoid overloading the system if issues arise repeatedly.

How It Works

  1. CDC Enablement:
    • Checks if CDC is enabled on the target database and table.
    • If ConfigureCDCInServer is set, it can automatically enable CDC.
  2. Initial Load (Optional):
    • If DoInitialLoad is true and not previously completed, the operator reads all existing rows from the table.
    • Emits each row as a stream event before moving to change tracking.
  3. Continuous Polling for Changes:
    • Periodically polls the SQL Server CDC tables using functions like cdc.fn_cdc_get_all_changes_<capture_instance>.
    • Uses a configurable PullInterval to define the delay between polls.
  4. Duplicate Filtering:
    • Computes an MD5 hash of each record's data.
    • Compares it with the hash of the last emitted record to avoid duplicates.
  5. Checkpointing:
    • Maintains and updates a checkpoint (LSN and record hash) in a state store.
    • Ensures that upon restart, streaming resumes from the last processed change without data loss or duplication.

8.3.2. Server Configuration Prerequisites for SQL Server

  1. Enable SQL Server Agent

SQL Server Agent must be running because CDC relies on it to capture changes in the background.

  1. Permissions

    • You must have sufficient privileges (sysadmin or db_owner) to enable CDC at the database and table levels.
    • If ConfigureCDCInServer is true, the Cortex CDC operator will attempt:
        EXEC sys.sp_cdc_enable_db;
        EXEC sys.sp_cdc_enable_table ...
    • If ConfigureCDCInServer is false, you should manually enable CDC on the database and table before using the operator.
  2. Database Log Growth

    • CDC can increase the transaction log usage. Ensure you have proper log retention and space provisioning.

8.3.3. CDC Support by SQL Server Version

  • Introduced in SQL Server 2008 (Enterprise Edition).
  • SQL Server 2016 SP1 and later supports CDC in Standard Edition as well.
  • Azure SQL Database also has CDC support in certain service tiers.
  • If you are using an older or unsupported SQL Server edition, you may not have native CDC functionality.

8.3.4. Example docker-compose for SQL Server

Below is a minimal example of running SQL Server in a Docker container (2019 version). CDC is available from 2008 onward, but 2019 is commonly used:

version: '3.8'
services:
  sqlserver:
    image: mcr.microsoft.com/mssql/server:2019-latest
    container_name: sqlserver_cdc
    environment:
      - ACCEPT_EULA=Y
      - MSSQL_SA_PASSWORD=YourStrong@Passw0rd
      - MSSQL_PID=Developer
    ports:
      - "1433:1433"
    healthcheck:
      test: ["CMD", "/opt/mssql-tools/bin/sqlcmd", "-U", "sa", "-P", "YourStrong@Passw0rd", "-Q", "SELECT 1"]
      interval: 10s
      timeout: 5s
      retries: 5
  • Developer Edition includes CDC functionality.
  • After starting, you can connect to this SQL Server from Cortex or any client at localhost:1433.

8.4. Using CDC in Cortex: Microsoft SQL Server

The Microsoft SQL Server CDC Source Operator is Cortex's implementation for capturing and streaming data changes from SQL Server databases. Below are various usage examples demonstrating how to configure and utilize this operator effectively.

8.4.1. Prerequisites

  • Cortex Setup: Ensure that Cortex is installed and properly configured in your environment.
  • SQL Server CDC Enabled: CDC must be enabled on your target SQL Server database and the specific tables you intend to monitor. Cortex can automate this process if configured accordingly.

8.4.2. Example 1: Basic CDC Stream with Default Settings

Scenario: Set up a CDC stream to capture all change events from a SQL Server table with default settings, including performing an initial data load. Configuration Highlights:

  • Initial Load: Enabled (DoInitialLoad = true) to read existing table data before streaming changes.
  • Polling Interval: Default interval of 3 seconds (PullInterval = TimeSpan.FromSeconds(3)).
  • CDC Configuration: Automatic CDC enablement is disabled (ConfigureCDCInServer = false).

Usage:

using Cortex.Streams;
using Cortex.Streams.MSSqlServer;

// Define connection and table details.
string connectionString = "Server=myServer;Database=myDB;User Id=myUser;Password=myPass;";
string schemaName = "dbo";
string tableName = "Orders";

// Configure CDC settings with default options.
var sqlServerSettings = new SqlServerSettings
{
    DoInitialLoad = true,                // Perform initial full load.
    PullInterval = TimeSpan.FromSeconds(3),  // Poll every 3 seconds.
    ConfigureCDCInServer = false          // Do not automatically enable CDC.
};

// Create a CDC source operator.
var cdcSourceOperator = new SqlServerCDCSourceOperator(
    connectionString,
    schemaName,
    tableName,
    sqlServerSettings
);

// Build a stream using the CDC source.
var stream = StreamBuilder<SqlServerRecord, SqlServerRecord>
    .CreateNewStream("SQL Server CDC Stream")
    .Stream(cdcSourceOperator)             // Use CDC source as the stream origin.
    .Sink(record => Console.WriteLine($"Change Detected: {record.Operation} - {record.Data}"))
    .Build();

// Start the stream to begin processing CDC events.
stream.Start();

Explanation:

  • Initial Load: The operator reads existing records from the Orders table and emits them as SqlServerRecord events.
  • Continuous Polling: After the initial load, the operator polls for new changes every 3 seconds.
  • Sink: Detected changes are printed to the console.

8.4.3. Example 2: CDC Stream with Automatic CDC Configuration

Scenario: Allow Cortex to automatically enable CDC on the SQL Server database and target table if it's not already enabled.

Configuration Highlights:

  • Automatic CDC Configuration: Enabled (ConfigureCDCInServer = true).
  • Use Case: Simplifies setup by letting Cortex manage CDC enablement, ensuring that CDC is active without manual intervention.

Usage:

using Cortex.Streams;
using Cortex.Streams.MSSqlServer;

// Define connection and table details.
string connectionString = "Server=myServer;Database=myDB;User Id=myUser;Password=myPass;";
string schemaName = "hr";
string tableName = "Employees";

// Configure CDC settings with automatic CDC enablement.
var sqlServerSettings = new SqlServerSettings
{
    DoInitialLoad = true,                 // Perform initial data load.
    PullInterval = TimeSpan.FromSeconds(4),  // Poll every 4 seconds.
    ConfigureCDCInServer = true            // Automatically enable CDC if not enabled.
};

// Create a CDC source operator.
var cdcSourceOperator = new SqlServerCDCSourceOperator(
    connectionString,
    schemaName,
    tableName,
    sqlServerSettings
);

// Build a stream using the CDC source.
var stream = StreamBuilder<SqlServerRecord, SqlServerRecord>
    .CreateNewStream("SQL Server CDC Stream - Auto CDC")
    .Stream(cdcSourceOperator)             
    .Sink(record => Console.WriteLine($"Employee Change: {record.Operation} - {record.Data}"))
    .Build();

// Start the stream to begin processing CDC events.
stream.Start();

Explanation:

  • Automatic CDC Configuration: Enabled, allowing Cortex to activate CDC on the Employees table if it isn't already enabled.
  • Initial Load and Polling: Configured to perform an initial data load and poll every 4 seconds.
  • Sink: Outputs all change operations related to employees.

8.4.4. Example 3: CDC Stream with Persistent State Store

Scenario: Utilize a persistent state store to ensure checkpoint data persists across application restarts, providing fault tolerance and data integrity.

Configuration Highlights:

  • State Store: Custom persistent IDataStore implementation is provided.
  • Use Case: Essential for production environments where stream processing must resume accurately after failures.
using Cortex.Streams;
using Cortex.Streams.MSSqlServer;
using Cortex.States;
using Cortex.States.Operators;

// Define connection and table details.
string connectionString = "Server=myServer;Database=myDB;User Id=myUser;Password=myPass;";
string schemaName = "inventory";
string tableName = "Products";

// Implement a persistent IDataStore (e.g., external database).
IDataStore<string, byte[]> checkpointStateStore = new new RocksDbStateStore<string, int>("ExampleStateStore", "./data");

// Configure CDC settings with persistent state store.
var sqlServerSettings = new SqlServerSettings
{
    DoInitialLoad = true,                     // Perform initial data load.
    PullInterval = TimeSpan.FromSeconds(6),      // Poll every 6 seconds.
    ConfigureCDCInServer = true,               // Automatically enable CDC if necessary.
};

// Create a CDC source operator with the persistent state store.
var cdcSourceOperator = new SqlServerCDCSourceOperator(
    connectionString,
    schemaName,
    tableName,
    sqlServerSettings,
    checkpointStateStore
);

// Build a stream using the CDC source.
var stream = StreamBuilder<SqlServerRecord, SqlServerRecord>
    .CreateNewStream("SQL Server CDC Stream - Persistent State")
    .Stream(cdcSourceOperator)             
    .Sink(record => Console.WriteLine($"Inventory Update: {record.Operation} - {record.Data}"))
    .Build();

// Start the stream to begin processing CDC events.
stream.Start();

Explanation:

  • Persistent State Store: RocksDbStateStore is used to persist checkpoint information to RocksDb.
  • Fault Tolerance: Ensures that upon restarting the application, the CDC operator resumes from the last processed Log Sequence Number (LSN), preventing data loss or duplication.
  • Sink: Outputs all inventory-related change events.

8.4.5. Example 9: CDC Stream with Complex Transformation and Routing

Scenario: Apply complex data transformations and route change events to different sinks based on specific criteria.

Configuration Highlights:

  • Map Operator: Transforms SqlServerRecord into a custom data structure.
  • Branching: Routes transformed records to different sinks based on operation type.
  • Use Case: Enables sophisticated processing pipelines where different types of changes trigger different actions.

Usage:

using Cortex.Streams;
using Cortex.Streams.MSSqlServer;

// Define connection and table details.
string connectionString = "Server=myServer;Database=myDB;User Id=myUser;Password=myPass;";
string schemaName = "operations";
string tableName = "Logs";

// Configure CDC settings.
var sqlServerSettings = new SqlServerSettings
{
    DoInitialLoad = true,                      // Perform initial data load.
    PullInterval = TimeSpan.FromSeconds(2),       // Poll every 2 seconds.
    ConfigureCDCInServer = true                 // Automatically enable CDC if necessary.
};

// Create a CDC source operator.
var cdcSourceOperator = new SqlServerCDCSourceOperator(
    connectionString,
    schemaName,
    tableName,
    sqlServerSettings
);

// Build a stream with complex transformations and routing.
var stream = StreamBuilder<SqlServerRecord, SqlServerRecord>
    .CreateNewStream("SQL Server CDC Stream - Complex Routing")
    .Stream(cdcSourceOperator)             
    .Map(record => new 
    { 
        Operation = record.Operation, 
        Message = record.Data["Message"]?.ToString(), 
        Severity = record.Data["Severity"]?.ToString(),
        Timestamp = record.ChangeTime 
    })  // Transform records to a custom anonymous type.
    .AddBranch("ErrorLogs", branch => 
    {
        branch.Filter(r => r.Severity == "Error")
               .Sink(r => Console.WriteLine($"Error Log: {r.Message} at {r.Timestamp}"));
    })
    .AddBranch("InfoLogs", branch => 
    {
        branch.Filter(r => r.Severity == "Info")
               .Sink(r => Console.WriteLine($"Info Log: {r.Message} at {r.Timestamp}"));
    })
    .Build();

// Start the stream to begin processing CDC events.
stream.Start();

Explanation:

  • Map Operator: Transforms SqlServerRecord into a structured anonymous type containing operation type, message, severity, and timestamp. Branching:
    • ErrorLogs Branch: Filters for records with Severity equal to "Error" and outputs them to the console.
    • InfoLogs Branch: Filters for records with Severity equal to "Info" and outputs them separately.
  • Benefits: Allows different handling of various log severities, enabling specialized processing or alerting mechanisms for critical events.

8.5. Best Practices for Using CDC in Cortex

  1. Enable CDC Properly:
    • Ensure CDC is enabled on both the database and target tables.
    • Use ConfigureCDCInServer = true during initial setup to automate CDC enablement if appropriate.
  2. Choose Appropriate Polling Intervals:
    • Balance between data freshness and system performance.
    • Higher intervals reduce database load but increase data latency.
  3. Use Persistent State Stores:
    • For production environments, always use a persistent IDataStore to maintain checkpoint and hash data, ensuring reliability across restarts.
  4. Implement Robust Filtering:
    • Use Cortex's filter operators to process only relevant change events, optimizing resource usage.
  5. Leverage Telemetry:
    • Integrate telemetry providers to monitor operator performance, track metrics, and facilitate troubleshooting.
  6. Handle Duplicates Carefully:
    • Ensure that duplicate filtering mechanisms are correctly configured and that the state store reliably maintains hash data.
  7. Test Thoroughly:
    • Validate CDC streams under various scenarios, including high data volumes, operator restarts, and failure conditions to ensure resilience.
  8. Secure Connections:
    • Protect database connection strings and credentials.
    • Follow security best practices to safeguard data integrity and privacy.

8.6. Using CDC in Cortex: PostgreSQL

8.6.1. Overview

The PostgresSourceOperator uses logical replication and the wal2json plugin to capture changes from PostgreSQL in real-time. It optionally sets up or verifies a publication and logical replication slot if configured to do so.

Key Features

  • Optional Automatic Setup: When ConfigureCDCInServer = true, attempts to create a publication (my_publication) and a logical replication slot (my_slot) if they do not exist.
  • Replica Identity: Supports setting REPLICA IDENTITY to DEFAULT or FULL (via ReplicaIdentityMode) for more complete update/delete data.
  • Initial Load: If DoInitialLoad = true, performs a single SELECT * FROM schema.table before streaming from the WAL.
  • Checkpointing: Maintains the last processed LSN in the state store to resume from the same position upon restart.

8.6.2. Server Configuration Prerequisites for PostgreSQL

  1. wal_level = logical
    In your postgresql.conf, ensure:

    wal_level = logical

    Then restart PostgreSQL for changes to take effect.

  2. Role Privileges
    The user connecting must have replication privileges (superuser or a role with replication rights) in order to create and read from logical replication slots.

  3. Plugin Installation

    • The operator uses the wal2json plugin. Make sure wal2json is installed on your PostgreSQL instance. On some distributions, you may need to install it separately.

    Installing wal2json:
    On many systems, wal2json is included by default. Otherwise, install from your package manager or build from source. For example on Debian/Ubuntu:

    sudo apt-get install postgresql-14-wal2json
  4. Publication / Slot

    • If ConfigureCDCInServer = false, you must manually create a publication and replication slot. Example:
    CREATE PUBLICATION my_publication FOR TABLE my_schema.my_table;
    SELECT * FROM pg_create_logical_replication_slot('my_slot', 'wal2json');

8.6.3. Basic Usage Example

using Cortex.Streams;
using Cortex.Streams.PostgreSQL;
using Cortex.States;

// 1. Setup connection & table info
string connectionString = "Host=myHost;Database=myDB;Username=myUser;Password=myPass;";
string schemaName = "public";
string tableName = "Customers";

// 2. Configure Postgres settings
var postgresSettings = new PostgresSettings
{
    DoInitialLoad = true,
    PullInterval = TimeSpan.FromSeconds(3),
    ConfigureCDCInServer = true,
    ReplicaIdentity = ReplicaIdentityMode.Full  // Ensure full row data for DELETEs
};

// 3. Create the Postgres CDC source
var pgCdcOperator = new PostgresSourceOperator(
    connectionString,
    schemaName,
    tableName,
    slotName: "my_slot",
    publicationName: "my_publication",
    postgresSettings
);

// 4. Build and start the stream
var stream = StreamBuilder<PostgresRecord, PostgresRecord>
    .CreateNewStream("Postgres CDC Stream")
    .Stream(pgCdcOperator)
    .Sink(record =>
    {
        Console.WriteLine($"[PostgresCDC] {record.Operation} => {record.Data.Count} columns changed.");
    })
    .Build();

stream.Start();

8.6.4. Handling Complex Scenarios

  • Multiple Tables: Create multiple operators (or a single publication with multiple tables) and configure them accordingly.
  • Clustering / HA: For high availability, ensure the replication slot is managed in your failover strategy.
  • Performance: Tuning wal2json parameters, like pretty-print, or chunking large transactions, can improve throughput.

8.7. Using CDC in Cortex: MongoDB

8.7.1. Overview

The MongoDbCDCSourceOperator uses MongoDB Change Streams to capture real-time changes (inserts, updates, replacements, deletes) on a collection. Optionally, it can perform an initial full scan of the collection if desired.

Key Features

  • Change Stream: Reliably captures changes from a replica set or sharded cluster without manual polling.
  • Optional Initial Load: If DoInitialLoad = true, the entire collection is read once.
  • Checkpointing: Stores a resume token from the change stream plus a record hash to skip duplicates.
  • Error Handling: Retries on errors with a back-off approach; gracefully handles operator stop signals.

8.7.2. Server Configuration Prerequisites for MongoDb

  1. Replica Set or Sharded Cluster
    MongoDB Change Streams only work on a replica set or a sharded cluster.

    • For a single-node developer instance, initialize a replica set locally
    // In the mongo shell:
    rs.initiate()
  2. Database User Permissions The user must have permission to read the oplog or have the changeStream privilege on the database in question.

  3. MongoDB Version Change Streams are supported in MongoDB 3.6+ with feature enhancements in later versions. Ensure you’re running a compatible version.

8.7.3 Basic Usage Example

using Cortex.Streams;
using Cortex.Streams.MongoDb;
using MongoDB.Driver;
using Cortex.States;

// 1. Setup MongoDB client & collection details
var client = new MongoClient("mongodb://localhost:27017");
var database = client.GetDatabase("myDb");
string collectionName = "Products";

// 2. Configure MongoDB CDC settings
var mongoCdcSettings = new MongoDbCDCSettings
{
    DoInitialLoad = true,          // Read entire collection first
    Delay = TimeSpan.FromSeconds(3),
    MaxBackOffSeconds = 60
};

// 3. Create the operator
var cdcOperator = new MongoDbCDCSourceOperator(
    database,
    collectionName,
    mongoCdcSettings
);

// 4. Build a stream
var stream = StreamBuilder<MongoDbRecord, MongoDbRecord>
    .CreateNewStream("MongoDB CDC Stream")
    .Stream(cdcOperator)
    .Sink(record =>
    {
        Console.WriteLine($"[MongoCDC] Operation: {record.Operation}, Document: {record.Data}");
    })
    .Build();

// 5. Start streaming
stream.Start();

8.7.4. Additional Considerations

  • Single vs. Multiple Collections: Each MongoDbCDCSourceOperator targets one collection. For multiple collections, instantiate multiple operators or watch the entire database if needed (using $changeStream at the DB level).
  • OpLog Size: Ensure your replica set’s oplog is sized appropriately if you expect to handle high write volumes.
  • Filtering: You can filter on specific operation types (insert, update, delete) using stream operators within Cortex if needed.

8.8. General Best Practices for CDC Operators

  1. Persistent State Stores
  • In production, always use a reliable persistent IDataStore (e.g., RocksDB, or another external store) so checkpoints survive application restarts.
  1. Sensible Polling or Streaming Intervals
  • Database load and latency requirements vary. Tweak PullInterval or the equivalent delay/timeout for your environment.
  1. Deduplication Logic
  • The built-in hash-based mechanism prevents accidentally re-emitting the same record. Ensure unique or stable column sets for hashing.
  1. Security & Permissions
  • Each database user must have privileges to enable or read CDC. Secure your credentials and follow least-privilege guidelines.
  1. Monitoring & Alerting
  • Monitor for lag or errors. In high-throughput systems, ensure operators keep up with the volume of changes and that logs are properly captured.
  1. Transaction Log / WAL Management
  • For SQL Server and PostgreSQL, keep an eye on transaction log or WAL usage. For MongoDB, monitor oplog retention and size.
  1. Testing & Validation
  • Thoroughly test with simulated or real workloads before production. Validate that initial loads and incremental changes match your data expectations.
Clone this wiki locally