Skip to content

Commit c59dce8

Browse files
committed
Added migration guide for KIP-848
1 parent adc1b81 commit c59dce8

File tree

4 files changed

+670
-6
lines changed

4 files changed

+670
-6
lines changed

CHANGELOG.md

Lines changed: 193 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,225 @@
11
# Confluent Python Client for Apache Kafka - CHANGELOG
22

3-
## Unreleased
4-
53
## v2.12.0b1 - 2025-10-01
64

7-
### Added
5+
- **General Availability for Next Generation Consumer Group Protocol (KIP-848)**:
6+
Starting with __confluent-kafka-python 2.12.0__, the next generation consumer group rebalance protocol defined in **[KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)** is **production-ready** (GA release).
7+
- **AsyncIO Producer (experimental):** Introduces beta class `AIOProducer` for
8+
asynchronous message production in asyncio applications.
9+
10+
11+
confluent-kafka-python v2.12.0 is based on librdkafka v2.12.0, see the
12+
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0)
13+
for a complete list of changes, enhancements, fixes and upgrade considerations.
14+
15+
<details>
16+
<summary> KIP-848 - Next Generation Consumer Group Rebalance Protocol (Classic to Consumer) Migration Guide </summary>
17+
18+
#### Overview
19+
- **What changed:**
20+
The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**.
21+
22+
- **Requirements:**
23+
- Broker version **4.0.0+**
24+
- confluent-kafka-python version **2.12.0+**: GA (production-ready)
25+
26+
- **Enablement (client-side):**
27+
- `group.protocol=consumer`
28+
- `group.remote.assignor=<assignor>` (optional; broker-controlled if `NULL`; default broker assignor is **`uniform`**)
29+
30+
#### Available Features
31+
32+
All KIP-848 features are supported including:
33+
34+
- Subscription to one or more topics, including **regular expression (regex) subscriptions**
35+
- Rebalance callbacks (**incremental only**)
36+
- Static group membership
37+
- Configurable remote assignor
38+
- Enforced max poll interval
39+
- Upgrade from `classic` protocol or downgrade from `consumer` protocol
40+
- AdminClient changes as per KIP
41+
42+
#### Contract Changes
43+
44+
##### Client Configuration changes
45+
46+
| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement |
47+
| ------------------------------------------------ | ----------------------------------------------------- |
48+
| `partition.assignment.strategy` | `group.remote.assignor` |
49+
| `session.timeout.ms` | Broker config: `group.consumer.session.timeout.ms` |
50+
| `heartbeat.interval.ms` | Broker config: `group.consumer.heartbeat.interval.ms` |
51+
| `group.protocol.type` | Not used in the new protocol |
52+
53+
**Note:** The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are **no longer used** when using the KIP-848 consumer protocol.
54+
55+
##### Rebalance Callback Changes
56+
57+
- The **protocol is fully incremental** in KIP-848.
58+
- In the **rebalance callbacks**, you **must use**:
59+
- `consumer.incremental_assign(partitions)` to assign new partitions
60+
- `consumer.incremental_unassign(partitions)` to revoke partitions
61+
- **Do not** use `consumer.assign()` or `consumer.unassign()` when using `group.protocol='consumer'` (KIP-848).
62+
- ⚠️ The `partitions` list passed to `incremental_assign()` and `incremental_unassign()` contains only the **incremental changes** — partitions being **added** or **revoked****not the full assignment**, as was the case with `assign()` in the classic protocol.
63+
- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol.
64+
65+
66+
##### Static Group Membership
67+
68+
- Duplicate `group.instance.id` handling:
69+
- **Newly joining member** is fenced with **UNRELEASED_INSTANCE_ID (fatal)**.
70+
- (Classic protocol fenced the **existing** member instead.)
71+
- Implications:
72+
- Ensure only **one active instance per `group.instance.id`**.
73+
- Consumers must shut down cleanly to avoid blocking replacements until session timeout expires.
74+
75+
##### Session Timeout & Fetching
76+
77+
- **Session timeout is broker-controlled**:
78+
- If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets.
79+
- Consumer is fenced once a heartbeat response is received from the Coordinator.
80+
- In the classic protocol, the client stopped fetching when session timeout expired.
81+
82+
##### Closing / Auto-Commit
83+
84+
- On `close()` or unsubscribe with auto-commit enabled:
85+
- Member retries committing offsets until a timeout expires.
86+
- Currently uses the **default remote session timeout**.
87+
- Future **KIP-1092** will allow custom commit timeouts.
88+
89+
##### Error Handling Changes
90+
91+
- `UNKNOWN_TOPIC_OR_PART` (**subscription case**):
92+
- No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds.
93+
- `TOPIC_AUTHORIZATION_FAILED`:
94+
- Reported once per heartbeat or subscription change, even if only one topic is unauthorized.
95+
96+
##### Summary of Key Differences (Classic vs Next-Gen)
97+
98+
- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)**
99+
- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range
100+
- **Deprecated configs:** Classic client configs are replaced by `group.remote.assignor` and broker-controlled session/heartbeat configs
101+
- **Static membership fencing:** KIP-848 fences **new member** on duplicate `group.instance.id`
102+
- **Session timeout:** Classic enforced on client; KIP-848 enforced on broker
103+
- **Auto-commit on close:** Classic stops at client session timeout; KIP-848 retries until remote timeout
104+
- **Unknown topics:** KIP-848 does not return error on subscription if topic missing
105+
- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols
106+
107+
#### Minimal Example Config
108+
109+
##### Classic Protocol
110+
```properties
111+
# Optional; default is 'classic'
112+
group.protocol=classic
113+
114+
partition.assignment.strategy=<range,roundrobin,sticky>
115+
session.timeout.ms=45000
116+
heartbeat.interval.ms=15000
117+
```
118+
119+
##### Next-Gen Protocol / KIP-848
120+
```properties
121+
group.protocol=consumer
122+
123+
# Optional: select a remote assignor
124+
# Valid options currently: 'uniform' or 'range'
125+
# group.remote.assignor=<uniform,range>
126+
# If unset(NULL), broker chooses the assignor (default: 'uniform')
127+
128+
# Session & heartbeat now controlled by broker:
129+
# group.consumer.session.timeout.ms
130+
# group.consumer.heartbeat.interval.ms
131+
```
132+
133+
#### Rebalance Callback Migration
134+
135+
##### Range Assignor (Classic)
136+
137+
```python
138+
# Rebalance Callback for Range Assignor (Classic Protocol)
139+
def on_assign(consumer, partitions):
140+
# Full partition list is provided under the classic protocol
141+
print(f"[Classic] Assigned partitions: {partitions}")
142+
consumer.assign(partitions)
143+
144+
def on_revoke(consumer, partitions):
145+
print(f"[Classic] Revoked partitions: {partitions}")
146+
consumer.unassign()
147+
```
148+
149+
##### Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol)
150+
151+
```python
152+
# Rebalance callback for incremental assignor
153+
def on_assign(consumer, partitions):
154+
# Only incremental partitions are passed here (not full list)
155+
print(f"[KIP-848] Incrementally assigning: {partitions}")
156+
consumer.incremental_assign(partitions)
157+
158+
def on_revoke(consumer, partitions):
159+
print(f"[KIP-848] Incrementally revoking: {partitions}")
160+
consumer.incremental_unassign(partitions)
161+
```
162+
**Note:**
163+
- The `partitions` list contains **only partitions being added or revoked**, not the full partition list as in the classic `consumer.assign()`.
164+
- Incremental assignors (including range) are **supported in both classic and KIP-848 protocols**, but this callback is required for KIP-848.
165+
166+
#### Upgrade and Downgrade
167+
168+
- A group made up entirely of `classic` consumers runs under the classic protocol.
169+
- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins.
170+
- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain.
171+
- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported.
172+
173+
174+
#### Migration Checklist (Next-Gen Protocol / KIP-848)
175+
176+
1. Upgrade to **confluent-kafka-python ≥ 2.12.0** (GA release)
177+
2. Run against **Kafka brokers ≥ 4.0.0**
178+
3. Set `group.protocol=consumer`
179+
4. Optionally set `group.remote.assignor`; leave `NULL` for broker-controlled (default: `uniform`), valid options: `uniform` or `range`
180+
5. Replace deprecated configs with new ones
181+
6. Update rebalance callbacks to **incremental APIs only**
182+
7. Review static membership handling (`group.instance.id`)
183+
8. Ensure proper shutdown to avoid fencing issues
184+
9. Adjust error handling for unknown topics and authorization failures
185+
186+
</details>
187+
188+
189+
<details>
190+
<summary> AsyncIO Producer (experimental): </summary>
191+
192+
#### Added
8193

9194
- AsyncIO Producer (experimental): Introduces `confluent_kafka.aio.AIOProducer` for
10195
asynchronous message production in asyncio applications. This API offloads
11196
blocking librdkafka calls to a thread pool and schedules common callbacks
12197
(`error_cb`, `throttle_cb`, `stats_cb`, `oauth_cb`, `logger`) onto the event
13198
loop for safe usage inside async frameworks.
14199

15-
### Features
200+
#### Features
16201

17202
- Batched async produce: `await aio.AIOProducer(...).produce(topic, value=...)`
18203
buffers messages and flushes when the buffer threshold or timeout is reached.
19204
- Async lifecycle: `await producer.flush()`, `await producer.purge()`, and
20205
transactional operations (`init_transactions`, `begin_transaction`,
21206
`commit_transaction`, `abort_transaction`).
22207

23-
### Limitations
208+
#### Limitations
24209

25210
- Per-message headers are not supported in the current batched async produce
26211
path. If headers are required, use the synchronous `Producer.produce(...)` or
27212
offload a sync produce call to a thread executor within your async app.
28213

29-
### Guidance
214+
#### Guidance
30215

31216
- Use the AsyncIO Producer inside async apps/servers (FastAPI/Starlette, aiohttp,
32217
asyncio tasks) to avoid blocking the event loop.
33218
- For batch jobs, scripts, or highest-throughput pipelines without an event
34219
loop, the synchronous `Producer` remains recommended.
35220

221+
</details>
222+
36223
## v2.11.1 - 2025-08-18
37224

38225
v2.11.1 is a maintenance release with the following fixes:

docs/index.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ A reliable, performant and feature-rich Python client for Apache Kafka v0.8 and
66
Guides
77
- :ref:`Configuration Guide <pythonclient_configuration>`
88
- :ref:`Transactional API <pythonclient_transactional>`
9+
- :ref:`KIP-848 Migration Guide <pythonclient_migration_kip848>`
910

1011
Client API
1112
- :ref:`Producer <pythonclient_producer>`
@@ -1090,3 +1091,8 @@ addition to the properties dictated by the underlying librdkafka C library:
10901091

10911092
For the full range of configuration properties, please consult librdkafka's documentation:
10921093
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
1094+
1095+
1096+
.. _pythonclient_migration_kip848:
1097+
1098+
.. include:: kip-848-migration-guide.rst

0 commit comments

Comments
 (0)