|
| 1 | +### KIP-848 - Migration Guide |
| 2 | + |
| 3 | +#### Overview |
| 4 | + |
| 5 | +- **What changed:** |
| 6 | + |
| 7 | + The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**. |
| 8 | + |
| 9 | +- **Requirements:** |
| 10 | + |
| 11 | + - Broker version **4.0.0+** |
| 12 | + - confluent-kafka-python version **2.12.0+**: GA (production-ready) |
| 13 | + |
| 14 | +- **Enablement (client-side):** |
| 15 | + |
| 16 | + - `group.protocol=consumer` |
| 17 | + - `group.remote.assignor=<assignor>` (optional; broker-controlled if `NULL`; default broker assignor is `uniform`) |
| 18 | + |
| 19 | +#### Available Features |
| 20 | + |
| 21 | +All KIP-848 features are supported including: |
| 22 | + |
| 23 | +- Subscription to one or more topics, including **regular expression (regex) subscriptions** |
| 24 | +- Rebalance callbacks (**incremental only**) |
| 25 | +- Static group membership |
| 26 | +- Configurable remote assignor |
| 27 | +- Enforced max poll interval |
| 28 | +- Upgrade from `classic` protocol or downgrade from `consumer` protocol |
| 29 | +- AdminClient changes as per KIP |
| 30 | + |
| 31 | +#### Contract Changes |
| 32 | + |
| 33 | +##### Client Configuration changes |
| 34 | + |
| 35 | +| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement | |
| 36 | +|--------------------------------------------------|-------------------------------------------------------| |
| 37 | +| `partition.assignment.strategy` | `group.remote.assignor` | |
| 38 | +| `session.timeout.ms` | Broker config: `group.consumer.session.timeout.ms` | |
| 39 | +| `heartbeat.interval.ms` | Broker config: `group.consumer.heartbeat.interval.ms` | |
| 40 | +| `group.protocol.type` | Not used in the new protocol | |
| 41 | + |
| 42 | +**Note:** The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are **no longer used** when using the KIP-848 consumer protocol. |
| 43 | + |
| 44 | +##### Rebalance Callback Changes |
| 45 | + |
| 46 | +- The **protocol is fully incremental** in KIP-848. |
| 47 | +- In the **rebalance callbacks**, you **must use**: |
| 48 | + - `consumer.incremental_assign(partitions)` to assign new partitions |
| 49 | + - `consumer.incremental_unassign(partitions)` to revoke partitions |
| 50 | +- **Do not** use `consumer.assign()` or `consumer.unassign()` when using `group.protocol='consumer'` (KIP-848). |
| 51 | +- ⚠️ The `partitions` list passed to `incremental_assign()` and `incremental_unassign()` contains only the **incremental changes** — partitions being **added** or **revoked** — **not the full assignment**, as was the case with `assign()` in the classic protocol. |
| 52 | +- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol. |
| 53 | + |
| 54 | +##### Static Group Membership |
| 55 | + |
| 56 | +- Duplicate `group.instance.id` handling: |
| 57 | + - **Newly joining member** is fenced with **UNRELEASED_INSTANCE_ID (fatal)**. |
| 58 | + - (Classic protocol fenced the **existing** member instead.) |
| 59 | +- Implications: |
| 60 | + - Ensure only **one active instance per** `group.instance.id`. |
| 61 | + - Consumers must shut down cleanly to avoid blocking replacements until session timeout expires. |
| 62 | + |
| 63 | +##### Session Timeout & Fetching |
| 64 | + |
| 65 | +- **Session timeout is broker-controlled**: |
| 66 | + - If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets. |
| 67 | + - Consumer is fenced once a heartbeat response is received from the Coordinator. |
| 68 | +- In the classic protocol, the client stopped fetching when session timeout expired. |
| 69 | + |
| 70 | +##### Closing / Auto-Commit |
| 71 | + |
| 72 | +- On `close()` or unsubscribe with auto-commit enabled: |
| 73 | + - Member retries committing offsets until a timeout expires. |
| 74 | + - Currently uses the **default remote session timeout**. |
| 75 | + - Future **KIP-1092** will allow custom commit timeouts. |
| 76 | + |
| 77 | +##### Error Handling Changes |
| 78 | + |
| 79 | +- `UNKNOWN_TOPIC_OR_PART` (**subscription case**): |
| 80 | + - No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds. |
| 81 | +- `TOPIC_AUTHORIZATION_FAILED`: |
| 82 | + - Reported once per heartbeat or subscription change, even if only one topic is unauthorized. |
| 83 | + |
| 84 | +##### Summary of Key Differences (Classic vs Next-Gen) |
| 85 | + |
| 86 | +- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)** |
| 87 | +- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range |
| 88 | +- **Deprecated configs:** Classic client configs are replaced by `group.remote.assignor` and broker-controlled session/heartbeat configs |
| 89 | +- **Static membership fencing:** KIP-848 fences **new member** on duplicate `group.instance.id` |
| 90 | +- **Session timeout:** Classic enforced on client; KIP-848 enforced on broker |
| 91 | +- **Auto-commit on close:** Classic stops at client session timeout; KIP-848 retries until remote timeout |
| 92 | +- **Unknown topics:** KIP-848 does not return error on subscription if topic missing |
| 93 | +- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols |
| 94 | + |
| 95 | +#### Minimal Example Config |
| 96 | + |
| 97 | +##### Classic Protocol |
| 98 | + |
| 99 | +``` properties |
| 100 | +# Optional; default is 'classic' |
| 101 | +group.protocol=classic |
| 102 | + |
| 103 | +partition.assignment.strategy=<range,roundrobin,sticky> |
| 104 | +session.timeout.ms=45000 |
| 105 | +heartbeat.interval.ms=15000 |
| 106 | +``` |
| 107 | + |
| 108 | +##### Next-Gen Protocol / KIP-848 |
| 109 | + |
| 110 | +``` properties |
| 111 | +group.protocol=consumer |
| 112 | + |
| 113 | +# Optional: select a remote assignor |
| 114 | +# Valid options currently: 'uniform' or 'range' |
| 115 | +# group.remote.assignor=<uniform,range> |
| 116 | +# If unset(NULL), broker chooses the assignor (default: 'uniform') |
| 117 | + |
| 118 | +# Session & heartbeat now controlled by broker: |
| 119 | +# group.consumer.session.timeout.ms |
| 120 | +# group.consumer.heartbeat.interval.ms |
| 121 | +``` |
| 122 | + |
| 123 | +#### Rebalance Callback Migration |
| 124 | + |
| 125 | +##### Range Assignor (Classic) |
| 126 | + |
| 127 | +``` python |
| 128 | +# Rebalance Callback for Range Assignor (Classic Protocol) |
| 129 | +def on_assign(consumer, partitions): |
| 130 | + # Full partition list is provided under the classic protocol |
| 131 | + print(f"[Classic] Assigned partitions: {partitions}") |
| 132 | + consumer.assign(partitions) |
| 133 | + |
| 134 | +def on_revoke(consumer, partitions): |
| 135 | + print(f"[Classic] Revoked partitions: {partitions}") |
| 136 | + consumer.unassign() |
| 137 | +``` |
| 138 | + |
| 139 | +##### Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol) |
| 140 | + |
| 141 | +``` python |
| 142 | +# Rebalance callback for incremental assignor |
| 143 | +def on_assign(consumer, partitions): |
| 144 | + # Only incremental partitions are passed here (not full list) |
| 145 | + print(f"[KIP-848] Incrementally assigning: {partitions}") |
| 146 | + consumer.incremental_assign(partitions) |
| 147 | + |
| 148 | +def on_revoke(consumer, partitions): |
| 149 | + print(f"[KIP-848] Incrementally revoking: {partitions}") |
| 150 | + consumer.incremental_unassign(partitions) |
| 151 | +``` |
| 152 | + |
| 153 | +**Note:** The `partitions` list contains **only partitions being added or revoked**, not the full partition list as in the classic `consumer.assign()`. |
| 154 | + |
| 155 | +#### Upgrade and Downgrade |
| 156 | + |
| 157 | +- A group made up entirely of `classic` consumers runs under the classic protocol. |
| 158 | +- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins. |
| 159 | +- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain. |
| 160 | +- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported. |
| 161 | + |
| 162 | +#### Migration Checklist (Next-Gen Protocol / KIP-848) |
| 163 | + |
| 164 | +1. Upgrade to **confluent-kafka-python ≥ 2.12.0** (GA release) |
| 165 | +2. Run against **Kafka brokers ≥ 4.0.0** |
| 166 | +3. Set `group.protocol=consumer` |
| 167 | +4. Optionally set `group.remote.assignor`; leave `NULL` for broker-controlled (default: `uniform`), valid options: `uniform` or `range` |
| 168 | +5. Replace deprecated configs with new ones |
| 169 | +6. Update rebalance callbacks to **incremental APIs only** |
| 170 | +7. Review static membership handling (`group.instance.id`) |
| 171 | +8. Ensure proper shutdown to avoid fencing issues |
| 172 | +9. Adjust error handling for unknown topics and authorization failures |
0 commit comments