Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,35 +329,330 @@ ClickHouse Kafka Connect reports the following metrics:
- Batch size is inherited from the Kafka Consumer properties.
- When using KeeperMap for exactly-once and the offset is changed or re-wound, you need to delete the content from KeeperMap for that specific topic. (See troubleshooting guide below for more details)

### Tuning performance {#tuning-performance}
### Performance tuning and throughput optimization {#tuning-performance}

If you've ever though to yourself "I would like to adjust the batch size for the sink connector", then this is the section for you.
This section covers performance tuning strategies for the ClickHouse Kafka Connect Sink. Performance tuning is essential when dealing with high-throughput use cases or when you need to optimize resource utilization and minimize lag.

##### Connect fetch vs connector poll {#connect-fetch-vs-connector-poll}
#### When is performance tuning needed? {#when-is-performance-tuning-needed}

Kafka Connect (the framework our sink connector is built on) will fetch messages from kafka topics in the background (independent of the connector).
Performance tuning is typically required in the following scenarios:

You can control this process using `fetch.min.bytes` and `fetch.max.bytes` - while `fetch.min.bytes` sets the minimum amount required before the framework will pass values to the connector (up to a time limit set by `fetch.max.wait.ms`), `fetch.max.bytes` sets the upper size limit. If you wanted to pass larger batches to the connector, an option could be to increase the minimum fetch or maximum wait to build bigger data bundles.
- **High-throughput workloads**: When processing millions of events per second from Kafka topics
- **Consumer lag**: When your connector can't keep up with the rate of data production, causing increasing lag
- **Resource constraints**: When you need to optimize CPU, memory, or network usage
- **Multiple topics**: When consuming from multiple high-volume topics simultaneously
- **Small message sizes**: When dealing with many small messages that would benefit from server-side batching

This fetched data is then consumed by the connector client polling for messages, where the amount for each poll is controlled by `max.poll.records` - please note that fetch is independent of poll, though!
Performance tuning is **NOT typically needed** when:

When tuning these settings, users should aim so their fetch size produces multiple batches of `max.poll.records` (and keep in mind, the settings `fetch.min.bytes` and `fetch.max.bytes` represent compressed data) - that way, each connector task is inserting as large a batch as possible.
- You're processing low to moderate volumes (< 10,000 messages/second)
- Consumer lag is stable and acceptable for your use case
- Default connector settings already meet your throughput requirements
- Your ClickHouse cluster can easily handle the incoming load

ClickHouse is optimized for larger batches, even at a slight delay, rather than frequent but smaller batches - the larger the batch, the better.
#### Understanding the data flow {#understanding-the-data-flow}

Before tuning, it's important to understand how data flows through the connector:

1. **Kafka Connect Framework** fetches messages from Kafka topics in the background
2. **Connector polls** for messages from the framework's internal buffer
3. **Connector batches** messages based on poll size
4. **ClickHouse receives** the batched insert via HTTP/S
5. **ClickHouse processes** the insert (synchronously or asynchronously)

Performance can be optimized at each of these stages.

#### Kafka Connect batch size tuning {#connect-fetch-vs-connector-poll}

The first level of optimization is controlling how much data the connector receives per batch from Kafka.

##### Fetch settings {#fetch-settings}

Kafka Connect (the framework) fetches messages from Kafka topics in the background, independent of the connector:

- **`fetch.min.bytes`**: Minimum amount of data before the framework passes values to the connector (default: 1 byte)
- **`fetch.max.bytes`**: Maximum amount of data to fetch in a single request (default: 52428800 / 50 MB)
- **`fetch.max.wait.ms`**: Maximum time to wait before returning data if `fetch.min.bytes` is not met (default: 500 ms)

##### Poll settings {#poll-settings}

The connector polls for messages from the framework's buffer:

- **`max.poll.records`**: Maximum number of records returned in a single poll (default: 500)
- **`max.partition.fetch.bytes`**: Maximum amount of data per partition (default: 1048576 / 1 MB)

##### Recommended settings for high throughput {#recommended-batch-settings}

For optimal performance with ClickHouse, aim for larger batches:

```properties
# Increase the number of records per poll
consumer.max.poll.records=5000

# Increase the partition fetch size (5 MB)
consumer.max.partition.fetch.bytes=5242880

# Optional: Increase minimum fetch size to wait for more data (1 MB)
consumer.fetch.min.bytes=1048576

# Optional: Reduce wait time if latency is critical
consumer.fetch.max.wait.ms=300
```

**Important**: Kafka Connect fetch settings represent compressed data, while ClickHouse receives uncompressed data. Balance these settings based on your compression ratio.

**Trade-offs**:
- **Larger batches** = Better ClickHouse ingestion performance, fewer parts, lower overhead
- **Larger batches** = Higher memory usage, potential increased end-to-end latency
- **Too large batches** = Risk of timeouts, OutOfMemory errors, or exceeding `max.poll.interval.ms`

More details: [Confluent documentation](https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration) | [Kafka documentation](https://kafka.apache.org/documentation/#consumerconfigs)

#### Asynchronous inserts {#asynchronous-inserts}

Asynchronous inserts are a powerful feature when the connector sends relatively small batches or when you want to further optimize ingestion by shifting batching responsibility to ClickHouse.

##### When to use async inserts {#when-to-use-async-inserts}

Consider enabling async inserts when:

- **Many small batches**: Your connector sends frequent small batches (< 1000 rows per batch)
- **High concurrency**: Multiple connector tasks are writing to the same table
- **Distributed deployment**: Running many connector instances across different hosts
- **Part creation overhead**: You're experiencing "too many parts" errors
- **Mixed workload**: Combining real-time ingestion with query workloads

Do **NOT** use async inserts when:

- You're already sending large batches (> 10,000 rows per batch) with controlled frequency
- You require immediate data visibility (queries must see data instantly)
- Exactly-once semantics with `wait_for_async_insert=0` conflicts with your requirements
- Your use case can benefit from client-side batching improvements instead

##### How async inserts work {#how-async-inserts-work}

With asynchronous inserts enabled, ClickHouse:

1. Receives the insert query from the connector
2. Writes data to an in-memory buffer (instead of immediately to disk)
3. Returns success to the connector (if `wait_for_async_insert=0`)
4. Flushes the buffer to disk when one of these conditions is met:
- Buffer reaches `async_insert_max_data_size` (default: 10 MB)
- `async_insert_busy_timeout_ms` milliseconds elapsed since first insert (default: 1000 ms)
- Maximum number of queries accumulated (`async_insert_max_query_number`, default: 100)

This significantly reduces the number of parts created and improves overall throughput.

##### Enabling async inserts {#enabling-async-inserts}

Add async insert settings to the `clickhouseSettings` configuration parameter:

```json
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
}
}
```

**Key settings**:

- **`async_insert=1`**: Enable asynchronous inserts
- **`wait_for_async_insert=1`** (recommended): Connector waits for data to be flushed to ClickHouse storage before acknowledging. Provides delivery guarantees.
- **`wait_for_async_insert=0`**: Connector acknowledges immediately after buffering. Better performance but data may be lost on server crash before flush.

##### Tuning async insert behavior {#tuning-async-inserts}

You can fine-tune the async insert flush behavior:

```json
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=10485760,async_insert_busy_timeout_ms=1000"
```

Common tuning parameters:

- **`async_insert_max_data_size`** (default: 10485760 / 10 MB): Maximum buffer size before flush
- **`async_insert_busy_timeout_ms`** (default: 1000): Maximum time (ms) before flush
- **`async_insert_stale_timeout_ms`** (default: 0): Time (ms) since last insert before flush
- **`async_insert_max_query_number`** (default: 100): Maximum queries before flush

**Trade-offs**:

- **Benefits**: Fewer parts, better merge performance, lower CPU overhead, improved throughput under high concurrency
- **Considerations**: Data not immediately queryable, slightly increased end-to-end latency
- **Risks**: Data loss on server crash if `wait_for_async_insert=0`, potential memory pressure with large buffers

##### Async inserts with exactly-once semantics {#async-inserts-with-exactly-once}

When using `exactlyOnce=true` with async inserts:

```json
{
"config": {
"exactlyOnce": "true",
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
}
}
```

**Important**: Always use `wait_for_async_insert=1` with exactly-once to ensure offset commits happen only after data is persisted.

For more information about async inserts, see the [ClickHouse async inserts documentation](/best-practices/selecting-an-insert-strategy#asynchronous-inserts).

#### Connector parallelism {#connector-parallelism}

Increase parallelism to improve throughput:

##### Tasks per connector {#tasks-per-connector}

```json
"tasks.max": "4"
```

More details can be found in the [Confluent documentation](https://docs.confluent.io/platform/current/connect/references/allconfigs.html#override-the-worker-configuration)
or in the [Kafka documentation](https://kafka.apache.org/documentation/#consumerconfigs).
Each task processes a subset of topic partitions. More tasks = more parallelism, but:

- Maximum effective tasks = number of topic partitions
- Each task maintains its own connection to ClickHouse
- More tasks = higher overhead and potential resource contention

**Recommendation**: Start with `tasks.max` equal to the number of topic partitions, then adjust based on CPU and throughput metrics.

##### Ignoring partitions when batching {#ignoring-partitions}

By default, the connector batches messages per partition. For higher throughput, you can batch across partitions:

```json
"ignorePartitionsWhenBatching": "true"
```

** Warning**: Only use when `exactlyOnce=false`. This setting can improve throughput by creating larger batches but loses per-partition ordering guarantees.

#### Multiple high throughput topics {#multiple-high-throughput-topics}

If your connector is configured to subscribe to multiple topics, you're using `topic2TableMap` to map topics to tables, and you're experiencing a bottleneck at insertion resulting in consumer lag, consider creating one connector per topic instead. The main reason why this happens is that currently batches are inserted into every table [serially](https://github.com/ClickHouse/clickhouse-kafka-connect/blob/578ac07e8be1a920aaa3b26e49183595c3edd04b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java#L95-L100).
If your connector is configured to subscribe to multiple topics, you're using `topic2TableMap` to map topics to tables, and you're experiencing a bottleneck at insertion resulting in consumer lag, consider creating one connector per topic instead.

The main reason why this happens is that currently batches are inserted into every table [serially](https://github.com/ClickHouse/clickhouse-kafka-connect/blob/578ac07e8be1a920aaa3b26e49183595c3edd04b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java#L95-L100).

**Recommendation**: For multiple high-volume topics, deploy one connector instance per topic to maximize parallel insert throughput.

#### ClickHouse table engine considerations {#table-engine-considerations}

Choose the appropriate ClickHouse table engine for your use case:

- **`MergeTree`**: Best for most use cases, balances query and insert performance
- **`ReplicatedMergeTree`**: Required for high availability, adds replication overhead
- **`*MergeTree` with proper `ORDER BY`**: Optimize for your query patterns

**Settings to consider**:

```sql
CREATE TABLE my_table (...)
ENGINE = MergeTree()
ORDER BY (timestamp, id)
SETTINGS
-- Increase max insert threads for parallel part writing
max_insert_threads = 4,
-- Allow inserts with quorum for reliability (ReplicatedMergeTree)
insert_quorum = 2
```

For connector-level insert settings:

```json
"clickhouseSettings": "insert_quorum=2,insert_quorum_timeout=60000"
```

#### Connection pooling and timeouts {#connection-pooling}

The connector maintains HTTP connections to ClickHouse. Adjust timeouts for high-latency networks:

```json
"clickhouseSettings": "socket_timeout=300000,connection_timeout=30000"
```

- **`socket_timeout`** (default: 30000 ms): Maximum time for read operations
- **`connection_timeout`** (default: 10000 ms): Maximum time to establish connection

Increase these values if you experience timeout errors with large batches.

#### Monitoring and troubleshooting performance {#monitoring-performance}

Monitor these key metrics:

1. **Consumer lag**: Use Kafka monitoring tools to track lag per partition
2. **Connector metrics**: Monitor `receivedRecords`, `recordProcessingTime`, `taskProcessingTime` via JMX (see [Monitoring](#monitoring))
3. **ClickHouse metrics**:
- `system.asynchronous_inserts`: Monitor async insert buffer usage
- `system.parts`: Monitor part count to detect merge issues
- `system.merges`: Monitor active merges
- `system.events`: Track `InsertedRows`, `InsertedBytes`, `FailedInsertQuery`

**Common performance issues**:

| Symptom | Possible Cause | Solution |
|---------|----------------|----------|
| High consumer lag | Batches too small | Increase `max.poll.records`, enable async inserts |
| "Too many parts" errors | Small frequent inserts | Enable async inserts, increase batch size |
| Timeout errors | Large batch size, slow network | Reduce batch size, increase `socket_timeout`, check network |
| High CPU usage | Too many small parts | Enable async inserts, increase merge settings |
| OutOfMemory errors | Batch size too large | Reduce `max.poll.records`, `max.partition.fetch.bytes` |
| Uneven task load | Uneven partition distribution | Rebalance partitions or adjust `tasks.max` |

#### Best practices summary {#performance-best-practices}

1. **Start with defaults**, then measure and tune based on actual performance
2. **Prefer larger batches**: Aim for 10,000-100,000 rows per insert when possible
3. **Use async inserts** when sending many small batches or under high concurrency
4. **Always use `wait_for_async_insert=1`** with exactly-once semantics
5. **Scale horizontally**: Increase `tasks.max` up to the number of partitions
6. **One connector per high-volume topic** for maximum throughput
7. **Monitor continuously**: Track consumer lag, part count, and merge activity
8. **Test thoroughly**: Always test configuration changes under realistic load before production deployment

#### Example: High-throughput configuration {#example-high-throughput}

Here's a complete example optimized for high throughput:

```json
{
"name": "clickhouse-high-throughput",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "8",

"topics": "high_volume_topic",
"hostname": "my-clickhouse-host.cloud",
"port": "8443",
"database": "default",
"username": "default",
"password": "<PASSWORD>",
"ssl": "true",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

"exactlyOnce": "false",
"ignorePartitionsWhenBatching": "true",

"consumer.max.poll.records": "10000",
"consumer.max.partition.fetch.bytes": "5242880",
"consumer.fetch.min.bytes": "1048576",
"consumer.fetch.max.wait.ms": "500",

"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=16777216,async_insert_busy_timeout_ms=1000,socket_timeout=300000"
}
}
```

Creating one connector per topic is a workaround that ensures that you get the fastest possible insert rate.
**This configuration**:
- Processes up to 10,000 records per poll
- Batches across partitions for larger inserts
- Uses async inserts with 16 MB buffer
- Runs 8 parallel tasks (match your partition count)
- Optimized for throughput over strict ordering

### Troubleshooting {#troubleshooting}

Expand Down