From 813baab1f1917c5be18da9212343c632a98019b0 Mon Sep 17 00:00:00 2001 From: "randomizedcoder dave.seddon.ca@gmail.com" Date: Sun, 13 Apr 2025 09:16:48 -0700 Subject: [PATCH 1/5] kafka table engine, modernize table creation, add tip on limitations, extra debug --- .../kafka/kafka-table-engine.md | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md index a627de3d09b..530ee12f78e 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md +++ b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md @@ -35,6 +35,12 @@ To persist this data from a read of the table engine, we need a means of capturi Kafka table engine architecture diagram +:::tip +The Kafka Engine does have some limitations: +- [Kafka message header](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) is not currently supported, so schema registry cannot be used +- Protobuf, ProtobufSingle are supported, while [ProtobufList](https://github.com/ClickHouse/ClickHouse/issues/78746) is not +::: + #### Steps {#steps} @@ -179,10 +185,19 @@ CREATE TABLE github_queue review_comments UInt32, member_login LowCardinality(String) ) - ENGINE = Kafka('kafka_host:9092', 'github', 'clickhouse', - 'JSONEachRow') settings kafka_thread_per_consumer = 0, kafka_num_consumers = 1; +ENGINE = Kafka SETTINGS + kafka_broker_list = 'kafka_host:9092', + kafka_topic_list = 'github', + kafka_group_name = 'clickhouse', + kafka_format = 'JSONEachRow'; ``` +:::tip +Notes +[Full Kafka Engine Table creation options](https://clickhouse.com/docs/engines/table-engines/integrations/kafka#creating-a-table) + +[kafka_format must be the last setting](https://github.com/ClickHouse/ClickHouse/issues/37895) +::: We discuss engine settings and performance tuning below. At this point, a simple select on the table `github_queue` should read some rows. Note that this will move the consumer offsets forward, preventing these rows from being re-read without a [reset](#common-operations). Note the limit and required parameter `stream_like_engine_allow_direct_select.` @@ -305,6 +320,11 @@ Errors such as authentication issues are not reported in responses to Kafka engi ``` +Another useful source of information is the system.kafka_consumers table: +``` +SELECT * FROM system.kafka_consumers FORMAT Vertical; +``` + ##### Handling malformed messages {#handling-malformed-messages} Kafka is often used as a "dumping ground" for data. This leads to topics containing mixed message formats and inconsistent field names. Avoid this and utilize Kafka features such Kafka Streams or ksqlDB to ensure messages are well-formed and consistent before insertion into Kafka. If these options are not possible, ClickHouse has some features that can help. @@ -474,7 +494,6 @@ Multiple ClickHouse instances can all be configured to read from a topic using t Consider the following when looking to increase Kafka Engine table throughput performance: - * The performance will vary depending on the message size, format, and target table types. 100k rows/sec on a single table engine should be considered obtainable. By default, messages are read in blocks, controlled by the parameter kafka_max_block_size. By default, this is set to the [max_insert_block_size](/operations/settings/settings#max_insert_block_size), defaulting to 1,048,576. Unless messages are extremely large, this should nearly always be increased. Values between 500k to 1M are not uncommon. Test and evaluate the effect on throughput performance. * The number of consumers for a table engine can be increased using kafka_num_consumers. However, by default, inserts will be linearized in a single thread unless kafka_thread_per_consumer is changed from the default value of 1. Set this to 1 to ensure flushes are performed in parallel. Note that creating a Kafka engine table with N consumers (and kafka_thread_per_consumer=1) is logically equivalent to creating N Kafka engines, each with a materialized view and kafka_thread_per_consumer=0. * Increasing consumers is not a free operation. Each consumer maintains its own buffers and threads, increasing the overhead on the server. Be conscious of the overhead of consumers and scale linearly across your cluster first and if possible. From 3004bfe3d74ae502acd507e34a5138c3608875e0 Mon Sep 17 00:00:00 2001 From: Shaun Struwig <41984034+Blargian@users.noreply.github.com> Date: Mon, 14 Apr 2025 11:57:31 +0200 Subject: [PATCH 2/5] Specify codeblock language --- docs/integrations/data-ingestion/kafka/kafka-table-engine.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md index 530ee12f78e..bb3888d6926 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md +++ b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md @@ -321,7 +321,8 @@ Errors such as authentication issues are not reported in responses to Kafka engi ``` Another useful source of information is the system.kafka_consumers table: -``` + +```sql SELECT * FROM system.kafka_consumers FORMAT Vertical; ``` From 85a85db751df3add41e1aac23bb8d49f61ca8d7c Mon Sep 17 00:00:00 2001 From: "randomizedcoder dave.seddon.ca@gmail.com" Date: Wed, 23 Apr 2025 13:58:39 -0700 Subject: [PATCH 3/5] clarify kafka engine docs enhancements --- .../integrations/data-ingestion/kafka/kafka-table-engine.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md index bb3888d6926..7ce05ba84df 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md +++ b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md @@ -37,8 +37,8 @@ To persist this data from a read of the table engine, we need a means of capturi :::tip The Kafka Engine does have some limitations: -- [Kafka message header](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) is not currently supported, so schema registry cannot be used -- Protobuf, ProtobufSingle are supported, while [ProtobufList](https://github.com/ClickHouse/ClickHouse/issues/78746) is not +- [Kafka message header](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) is not currently supported, so schema registry cannot be used. +- Protobuf, ProtobufSingle are function correctly, while [ProtobufList](https://github.com/ClickHouse/ClickHouse/issues/78746) current does not work when the Kafka quque has many messages. ::: #### Steps {#steps} @@ -196,7 +196,7 @@ ENGINE = Kafka SETTINGS Notes [Full Kafka Engine Table creation options](https://clickhouse.com/docs/engines/table-engines/integrations/kafka#creating-a-table) -[kafka_format must be the last setting](https://github.com/ClickHouse/ClickHouse/issues/37895) +Please use caution with INSERTs because the [kafka_format must be the last setting](https://github.com/ClickHouse/ClickHouse/issues/37895) ::: We discuss engine settings and performance tuning below. At this point, a simple select on the table `github_queue` should read some rows. Note that this will move the consumer offsets forward, preventing these rows from being re-read without a [reset](#common-operations). Note the limit and required parameter `stream_like_engine_allow_direct_select.` From 6b9a46aaa00655568aaf8ef6c0973b6c1032d652 Mon Sep 17 00:00:00 2001 From: "randomizedcoder dave.seddon.ca@gmail.com" Date: Wed, 23 Apr 2025 13:59:58 -0700 Subject: [PATCH 4/5] Kafka table engine --- docs/integrations/data-ingestion/kafka/kafka-table-engine.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md index 7ce05ba84df..6967fad8318 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md +++ b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md @@ -38,7 +38,7 @@ To persist this data from a read of the table engine, we need a means of capturi :::tip The Kafka Engine does have some limitations: - [Kafka message header](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) is not currently supported, so schema registry cannot be used. -- Protobuf, ProtobufSingle are function correctly, while [ProtobufList](https://github.com/ClickHouse/ClickHouse/issues/78746) current does not work when the Kafka quque has many messages. +- Protobuf, ProtobufSingle are function correctly, while [ProtobufList](https://github.com/ClickHouse/ClickHouse/issues/78746) current does not work when the Kafka queue has many messages. ::: #### Steps {#steps} From 010beaaeb9e2dbb4b6ab1a126864829fabaab50a Mon Sep 17 00:00:00 2001 From: "randomizedcoder dave.seddon.ca@gmail.com" Date: Fri, 25 Apr 2025 07:48:03 -0700 Subject: [PATCH 5/5] Improve notes on the Kafka engine integration tests --- docs/integrations/data-ingestion/kafka/kafka-table-engine.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md index 6967fad8318..4d77408e53c 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md +++ b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md @@ -39,6 +39,7 @@ To persist this data from a read of the table engine, we need a means of capturi The Kafka Engine does have some limitations: - [Kafka message header](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) is not currently supported, so schema registry cannot be used. - Protobuf, ProtobufSingle are function correctly, while [ProtobufList](https://github.com/ClickHouse/ClickHouse/issues/78746) current does not work when the Kafka queue has many messages. +- [Protobuf](https://github.com/ClickHouse/ClickHouse/blob/master/tests/integration/test_storage_kafka/test_produce_http_interface.py#L172) for Kafka engine has integration tests, while ProtobufSingle and ProtobufList do not. ::: #### Steps {#steps}