|
59 | 59 | value: 2097152
|
60 | 60 | - name: channelBufferSize # Optional. Advanced setting. The number of events to buffer in internal and external channels.
|
61 | 61 | value: 512
|
| 62 | + - name: consumerGroupRebalanceStrategy # Optional. Advanced setting. The strategy to use for consumer group rebalancing. |
| 63 | + value: sticky |
62 | 64 | - name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
|
63 | 65 | value: http://localhost:8081
|
64 | 66 | - name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
|
|
69 | 71 | value: true
|
70 | 72 | - name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
|
71 | 73 | value: 5m
|
| 74 | + - name: useAvroJson # Optional. Enables Avro JSON schema for serialization as opposed to Standard JSON default. Only applicable when the subscription uses valueSchemaType=Avro |
| 75 | + value: "true" |
72 | 76 | - name: escapeHeaders # Optional.
|
73 | 77 | value: false
|
74 | 78 |
|
@@ -115,13 +119,15 @@ spec:
|
115 | 119 | | schemaRegistryAPISecret | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` |
|
116 | 120 | | schemaCachingEnabled | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` |
|
117 | 121 | | schemaLatestVersionCacheTTL | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` |
|
| 122 | +| useAvroJson | N | Enables Avro JSON schema for serialization as opposed to Standard JSON default. Only applicable when the subscription uses valueSchemaType=Avro. Default is `"false"` | `"true"` | |
118 | 123 | | clientConnectionTopicMetadataRefreshInterval | N | The interval for the client connection's topic metadata to be refreshed with the broker as a Go duration. Defaults to `9m`. | `"4m"` |
|
119 | 124 | | clientConnectionKeepAliveInterval | N | The maximum time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely. | `"4m"` |
|
120 | 125 | | consumerFetchMin | N | The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available. The default is `1`, as `0` causes the consumer to spin when no messages are available. Equivalent to the JVM's `fetch.min.bytes`. | `"2"` |
|
121 | 126 | | consumerFetchDefault | N | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` |
|
122 | 127 | | channelBufferSize | N | The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughput. Defaults to `256`. | `"512"` |
|
123 | 128 | | heartbeatInterval | N | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to "3s". | `"5s"` |
|
124 | 129 | | sessionTimeout | N | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to "10s". | `"20s"` |
|
| 130 | +| consumerGroupRebalanceStrategy | N | The strategy to use for consumer group rebalancing. Supported values: `range`, `sticky`, `roundrobin`. Default is `range` | `"sticky"` | |
125 | 131 | | escapeHeaders | N | Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false`. | `true` |
|
126 | 132 |
|
127 | 133 | The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component.
|
@@ -583,7 +589,12 @@ You can configure pub/sub to publish or consume data encoded using [Avro binary
|
583 | 589 |
|
584 | 590 | {{% alert title="Important" color="warning" %}}
|
585 | 591 | Currently, only message value serialization/deserialization is supported. Since cloud events are not supported, the `rawPayload=true` metadata must be passed when publishing Avro messages.
|
| 592 | + |
586 | 593 | Please note that `rawPayload=true` should NOT be set for consumers, as the message value will be wrapped into a CloudEvent and base64-encoded. Leaving `rawPayload` as default (i.e. `false`) will send the Avro-decoded message to the application as a JSON payload.
|
| 594 | + |
| 595 | +When setting the `useAvroJson` component metadata to `true`, the inbound/outbound Avro binary is converted into/from Avro JSON encoding. |
| 596 | +This can be preferable when accurate type mapping is desirable. |
| 597 | +The default is standard JSON which is typically easier to bind to a native type in an application. |
587 | 598 | {{% /alert %}}
|
588 | 599 |
|
589 | 600 | When configuring the Kafka pub/sub component metadata, you must define:
|
@@ -671,7 +682,25 @@ app.include_router(router)
|
671 | 682 |
|
672 | 683 | {{< /tabs >}}
|
673 | 684 |
|
674 |
| - |
| 685 | +### Overriding default consumer group rebalancing |
| 686 | +In Kafka, rebalancing strategies determine how partitions are assigned to consumers within a consumer group. The default strategy is "range", but "roundrobin" and "sticky" are also available. |
| 687 | +- `Range`: |
| 688 | +Partitions are assigned to consumers based on their lexicographical order. |
| 689 | +If you have three partitions (0, 1, 2) and two consumers (A, B), consumer A might get partitions 0 and 1, while consumer B gets partition 2. |
| 690 | +- `RoundRobin`: |
| 691 | +Partitions are assigned to consumers in a round-robin fashion. |
| 692 | +With the same example above, consumer A might get partitions 0 and 2, while consumer B gets partition 1. |
| 693 | +- `Sticky`: |
| 694 | +This strategy aims to preserve previous assignments as much as possible while still maintaining a balanced distribution. |
| 695 | +If a consumer leaves or joins the group, only the affected partitions are reassigned, minimizing disruption. |
| 696 | + |
| 697 | +#### Choosing a Strategy: |
| 698 | +- `Range`: |
| 699 | +Simple to understand and implement, but can lead to uneven distribution if partition sizes vary significantly. |
| 700 | +- `RoundRobin`: |
| 701 | +Provides a good balance in many cases, but might not be optimal if message keys are unevenly distributed. |
| 702 | +- `Sticky`: |
| 703 | +Generally preferred for its ability to minimize disruption during rebalances, especially when dealing with a large number of partitions or frequent consumer group changes. |
675 | 704 |
|
676 | 705 | ## Create a Kafka instance
|
677 | 706 |
|
|
0 commit comments