|
| 1 | +Starting with **confluent-kafka-python 2.12.0** (GA release), the next generation consumer group rebalance protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is **production-ready**. |
| 2 | + |
| 3 | +**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided. |
| 4 | + |
| 5 | +# Overview |
| 6 | + |
| 7 | +- **What changed:** |
| 8 | + |
| 9 | + The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**. |
| 10 | + |
| 11 | +- **Requirements:** |
| 12 | + |
| 13 | + - Broker version **4.0.0+** |
| 14 | + - confluent-kafka-python version **2.12.0+**: GA (production-ready) |
| 15 | + |
| 16 | +- **Enablement (client-side):** |
| 17 | + |
| 18 | + - `group.protocol=consumer` |
| 19 | + - `group.remote.assignor=<assignor>` (optional; broker-controlled if unset; default broker assignor is `uniform`) |
| 20 | + |
| 21 | +# Available Features |
| 22 | + |
| 23 | +All [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) features are supported including: |
| 24 | + |
| 25 | +- Subscription to one or more topics, including **regular expression (regex) subscriptions** |
| 26 | +- Rebalance callbacks (**incremental only**) |
| 27 | +- Static group membership |
| 28 | +- Configurable remote assignor |
| 29 | +- Enforced max poll interval |
| 30 | +- Upgrade from `classic` protocol or downgrade from `consumer` protocol |
| 31 | +- AdminClient changes as per KIP |
| 32 | + |
| 33 | +# Contract Changes |
| 34 | + |
| 35 | +## Client Configuration changes |
| 36 | + |
| 37 | +| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement | |
| 38 | +|--------------------------------------------------|-------------------------------------------------------| |
| 39 | +| `partition.assignment.strategy` | `group.remote.assignor` | |
| 40 | +| `session.timeout.ms` | Broker config: `group.consumer.session.timeout.ms` | |
| 41 | +| `heartbeat.interval.ms` | Broker config: `group.consumer.heartbeat.interval.ms` | |
| 42 | +| `group.protocol.type` | Not used in the new protocol | |
| 43 | + |
| 44 | +**Note:** The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are **no longer used** when using the KIP-848 consumer protocol. |
| 45 | + |
| 46 | +## Rebalance Callback Changes |
| 47 | + |
| 48 | +- The **protocol is fully incremental** in KIP-848. |
| 49 | +- In the **rebalance callbacks**, you **must use**: |
| 50 | + - `consumer.incremental_assign(partitions)` to assign new partitions |
| 51 | + - `consumer.incremental_unassign(partitions)` to revoke partitions |
| 52 | +- **Do not** use `consumer.assign()` or `consumer.unassign()` when using `group.protocol='consumer'` (KIP-848). |
| 53 | +- ⚠️ The `partitions` list passed to `incremental_assign()` and `incremental_unassign()` contains only the **incremental changes** — partitions being **added** or **revoked** — **not the full assignment**, as was the case with `assign()` in the classic protocol. |
| 54 | +- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol. |
| 55 | + |
| 56 | +## Static Group Membership |
| 57 | + |
| 58 | +- Duplicate `group.instance.id` handling: |
| 59 | + - **Newly joining member** is fenced with **UNRELEASED_INSTANCE_ID (fatal)**. |
| 60 | + - (Classic protocol fenced the **existing** member instead.) |
| 61 | +- Implications: |
| 62 | + - Ensure only **one active instance per** `group.instance.id`. |
| 63 | + - Consumers must shut down cleanly to avoid blocking replacements until session timeout expires. |
| 64 | + |
| 65 | +## Session Timeout & Fetching |
| 66 | + |
| 67 | +- **Session timeout is broker-controlled**: |
| 68 | + - If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets. |
| 69 | + - Consumer is fenced once a heartbeat response is received from the Coordinator. |
| 70 | +- In the classic protocol, the client stopped fetching when session timeout expired. |
| 71 | + |
| 72 | +## Closing / Auto-Commit |
| 73 | + |
| 74 | +- On `close()` or unsubscribe with auto-commit enabled: |
| 75 | + - Member retries committing offsets until a timeout expires. |
| 76 | + - Currently uses the **default remote session timeout**. |
| 77 | + - Future **KIP-1092** will allow custom commit timeouts. |
| 78 | + |
| 79 | +## Error Handling Changes |
| 80 | + |
| 81 | +- `UNKNOWN_TOPIC_OR_PART` (**subscription case**): |
| 82 | + - No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds. |
| 83 | +- `TOPIC_AUTHORIZATION_FAILED`: |
| 84 | + - Reported once per heartbeat or subscription change, even if only one topic is unauthorized. |
| 85 | + |
| 86 | +## Summary of Key Differences (Classic vs Next-Gen) |
| 87 | + |
| 88 | +- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)** |
| 89 | +- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range |
| 90 | +- **Deprecated configs:** Classic client configs are replaced by `group.remote.assignor` and broker-controlled session/heartbeat configs |
| 91 | +- **Static membership fencing:** KIP-848 fences **new member** on duplicate `group.instance.id` |
| 92 | +- **Session timeout:** Classic enforced on client; KIP-848 enforced on broker |
| 93 | +- **Auto-commit on close:** Classic stops at client session timeout; KIP-848 retries until remote timeout |
| 94 | +- **Unknown topics:** KIP-848 does not return error on subscription if topic missing |
| 95 | +- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols |
| 96 | + |
| 97 | +# Minimal Example Config |
| 98 | + |
| 99 | +## Classic Protocol |
| 100 | + |
| 101 | +``` properties |
| 102 | +# Optional; default is 'classic' |
| 103 | +group.protocol=classic |
| 104 | + |
| 105 | +partition.assignment.strategy=<range,roundrobin,sticky> |
| 106 | +session.timeout.ms=45000 |
| 107 | +heartbeat.interval.ms=15000 |
| 108 | +``` |
| 109 | + |
| 110 | +## Next-Gen Protocol / KIP-848 |
| 111 | + |
| 112 | +``` properties |
| 113 | +group.protocol=consumer |
| 114 | + |
| 115 | +# Optional: select a remote assignor |
| 116 | +# Valid options currently: 'uniform' or 'range' |
| 117 | +# group.remote.assignor=<uniform,range> |
| 118 | +# If unset, broker chooses the assignor (default: 'uniform') |
| 119 | + |
| 120 | +# Session & heartbeat now controlled by broker: |
| 121 | +# group.consumer.session.timeout.ms |
| 122 | +# group.consumer.heartbeat.interval.ms |
| 123 | +``` |
| 124 | + |
| 125 | +# Rebalance Callback Migration |
| 126 | + |
| 127 | +## Range Assignor (Classic) |
| 128 | + |
| 129 | +``` python |
| 130 | +# Rebalance Callback for Range Assignor (Classic Protocol) |
| 131 | +def on_assign(consumer, partitions): |
| 132 | + # Full partition list is provided under the classic protocol |
| 133 | + print(f"[Classic] Assigned partitions: {partitions}") |
| 134 | + consumer.assign(partitions) |
| 135 | + |
| 136 | +def on_revoke(consumer, partitions): |
| 137 | + print(f"[Classic] Revoked partitions: {partitions}") |
| 138 | + consumer.unassign() |
| 139 | +``` |
| 140 | + |
| 141 | +## Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol) |
| 142 | + |
| 143 | +``` python |
| 144 | +# Rebalance callback for incremental assignor |
| 145 | +def on_assign(consumer, partitions): |
| 146 | + # Only incremental partitions are passed here (not full list) |
| 147 | + print(f"[KIP-848] Incrementally assigning: {partitions}") |
| 148 | + consumer.incremental_assign(partitions) |
| 149 | + |
| 150 | +def on_revoke(consumer, partitions): |
| 151 | + print(f"[KIP-848] Incrementally revoking: {partitions}") |
| 152 | + consumer.incremental_unassign(partitions) |
| 153 | +``` |
| 154 | + |
| 155 | +**Note:** The `partitions` list contains **only partitions being added or revoked**, not the full partition list as in the classic `consumer.assign()`. |
| 156 | + |
| 157 | +# Upgrade and Downgrade |
| 158 | + |
| 159 | +- A group made up entirely of `classic` consumers runs under the classic protocol. |
| 160 | +- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins. |
| 161 | +- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain. |
| 162 | +- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported. |
| 163 | + |
| 164 | +# Migration Checklist (Next-Gen Protocol / [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)) |
| 165 | + |
| 166 | +1. Upgrade to **confluent-kafka-python ≥ 2.12.0** (GA release) |
| 167 | +2. Run against **Kafka brokers ≥ 4.0.0** |
| 168 | +3. Set `group.protocol=consumer` |
| 169 | +4. Optionally set `group.remote.assignor`; leave unspecified for broker-controlled (default: `uniform`), valid options: `uniform` or `range` |
| 170 | +5. Replace deprecated configs with new ones |
| 171 | +6. Update rebalance callbacks to **incremental APIs only** |
| 172 | +7. Review static membership handling (`group.instance.id`) |
| 173 | +8. Ensure proper shutdown to avoid fencing issues |
| 174 | +9. Adjust error handling for unknown topics and authorization failures |
0 commit comments