Skip to content

kafka table engine, modernize table creation, add tip on limitations,… #3666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions docs/integrations/data-ingestion/kafka/kafka-table-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ To persist this data from a read of the table engine, we need a means of capturi

<Image img={kafka_01} size="lg" alt="Kafka table engine architecture diagram" style={{width: '80%'}} />

:::tip
The Kafka Engine does have some limitations:
- [Kafka message header](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) is not currently supported, so schema registry cannot be used.
- Protobuf, ProtobufSingle are function correctly, while [ProtobufList](https://github.com/ClickHouse/ClickHouse/issues/78746) current does not work when the Kafka queue has many messages.
- [Protobuf](https://github.com/ClickHouse/ClickHouse/blob/master/tests/integration/test_storage_kafka/test_produce_http_interface.py#L172) for Kafka engine has integration tests, while ProtobufSingle and ProtobufList do not.
:::

#### Steps {#steps}


Expand Down Expand Up @@ -179,10 +186,19 @@ CREATE TABLE github_queue
review_comments UInt32,
member_login LowCardinality(String)
)
ENGINE = Kafka('kafka_host:9092', 'github', 'clickhouse',
'JSONEachRow') settings kafka_thread_per_consumer = 0, kafka_num_consumers = 1;
ENGINE = Kafka SETTINGS
kafka_broker_list = 'kafka_host:9092',
kafka_topic_list = 'github',
kafka_group_name = 'clickhouse',
kafka_format = 'JSONEachRow';
```

:::tip
Notes
[Full Kafka Engine Table creation options](https://clickhouse.com/docs/engines/table-engines/integrations/kafka#creating-a-table)

Please use caution with INSERTs because the [kafka_format must be the last setting](https://github.com/ClickHouse/ClickHouse/issues/37895)
:::

We discuss engine settings and performance tuning below. At this point, a simple select on the table `github_queue` should read some rows. Note that this will move the consumer offsets forward, preventing these rows from being re-read without a [reset](#common-operations). Note the limit and required parameter `stream_like_engine_allow_direct_select.`

Expand Down Expand Up @@ -305,6 +321,12 @@ Errors such as authentication issues are not reported in responses to Kafka engi
</kafka>
```

Another useful source of information is the system.kafka_consumers table:

```sql
SELECT * FROM system.kafka_consumers FORMAT Vertical;
```

##### Handling malformed messages {#handling-malformed-messages}

Kafka is often used as a "dumping ground" for data. This leads to topics containing mixed message formats and inconsistent field names. Avoid this and utilize Kafka features such Kafka Streams or ksqlDB to ensure messages are well-formed and consistent before insertion into Kafka. If these options are not possible, ClickHouse has some features that can help.
Expand Down Expand Up @@ -474,7 +496,6 @@ Multiple ClickHouse instances can all be configured to read from a topic using t

Consider the following when looking to increase Kafka Engine table throughput performance:


* The performance will vary depending on the message size, format, and target table types. 100k rows/sec on a single table engine should be considered obtainable. By default, messages are read in blocks, controlled by the parameter kafka_max_block_size. By default, this is set to the [max_insert_block_size](/operations/settings/settings#max_insert_block_size), defaulting to 1,048,576. Unless messages are extremely large, this should nearly always be increased. Values between 500k to 1M are not uncommon. Test and evaluate the effect on throughput performance.
* The number of consumers for a table engine can be increased using kafka_num_consumers. However, by default, inserts will be linearized in a single thread unless kafka_thread_per_consumer is changed from the default value of 1. Set this to 1 to ensure flushes are performed in parallel. Note that creating a Kafka engine table with N consumers (and kafka_thread_per_consumer=1) is logically equivalent to creating N Kafka engines, each with a materialized view and kafka_thread_per_consumer=0.
* Increasing consumers is not a free operation. Each consumer maintains its own buffers and threads, increasing the overhead on the server. Be conscious of the overhead of consumers and scale linearly across your cluster first and if possible.
Expand Down