diff --git a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md index a627de3d09b..4d77408e53c 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md +++ b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md @@ -35,6 +35,13 @@ To persist this data from a read of the table engine, we need a means of capturi Kafka table engine architecture diagram +:::tip +The Kafka Engine does have some limitations: +- [Kafka message header](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format) is not currently supported, so schema registry cannot be used. +- Protobuf, ProtobufSingle are function correctly, while [ProtobufList](https://github.com/ClickHouse/ClickHouse/issues/78746) current does not work when the Kafka queue has many messages. +- [Protobuf](https://github.com/ClickHouse/ClickHouse/blob/master/tests/integration/test_storage_kafka/test_produce_http_interface.py#L172) for Kafka engine has integration tests, while ProtobufSingle and ProtobufList do not. +::: + #### Steps {#steps} @@ -179,10 +186,19 @@ CREATE TABLE github_queue review_comments UInt32, member_login LowCardinality(String) ) - ENGINE = Kafka('kafka_host:9092', 'github', 'clickhouse', - 'JSONEachRow') settings kafka_thread_per_consumer = 0, kafka_num_consumers = 1; +ENGINE = Kafka SETTINGS + kafka_broker_list = 'kafka_host:9092', + kafka_topic_list = 'github', + kafka_group_name = 'clickhouse', + kafka_format = 'JSONEachRow'; ``` +:::tip +Notes +[Full Kafka Engine Table creation options](https://clickhouse.com/docs/engines/table-engines/integrations/kafka#creating-a-table) + +Please use caution with INSERTs because the [kafka_format must be the last setting](https://github.com/ClickHouse/ClickHouse/issues/37895) +::: We discuss engine settings and performance tuning below. At this point, a simple select on the table `github_queue` should read some rows. Note that this will move the consumer offsets forward, preventing these rows from being re-read without a [reset](#common-operations). Note the limit and required parameter `stream_like_engine_allow_direct_select.` @@ -305,6 +321,12 @@ Errors such as authentication issues are not reported in responses to Kafka engi ``` +Another useful source of information is the system.kafka_consumers table: + +```sql +SELECT * FROM system.kafka_consumers FORMAT Vertical; +``` + ##### Handling malformed messages {#handling-malformed-messages} Kafka is often used as a "dumping ground" for data. This leads to topics containing mixed message formats and inconsistent field names. Avoid this and utilize Kafka features such Kafka Streams or ksqlDB to ensure messages are well-formed and consistent before insertion into Kafka. If these options are not possible, ClickHouse has some features that can help. @@ -474,7 +496,6 @@ Multiple ClickHouse instances can all be configured to read from a topic using t Consider the following when looking to increase Kafka Engine table throughput performance: - * The performance will vary depending on the message size, format, and target table types. 100k rows/sec on a single table engine should be considered obtainable. By default, messages are read in blocks, controlled by the parameter kafka_max_block_size. By default, this is set to the [max_insert_block_size](/operations/settings/settings#max_insert_block_size), defaulting to 1,048,576. Unless messages are extremely large, this should nearly always be increased. Values between 500k to 1M are not uncommon. Test and evaluate the effect on throughput performance. * The number of consumers for a table engine can be increased using kafka_num_consumers. However, by default, inserts will be linearized in a single thread unless kafka_thread_per_consumer is changed from the default value of 1. Set this to 1 to ensure flushes are performed in parallel. Note that creating a Kafka engine table with N consumers (and kafka_thread_per_consumer=1) is logically equivalent to creating N Kafka engines, each with a materialized view and kafka_thread_per_consumer=0. * Increasing consumers is not a free operation. Each consumer maintains its own buffers and threads, increasing the overhead on the server. Be conscious of the overhead of consumers and scale linearly across your cluster first and if possible.