Skip to content

Commit f443ea6

Browse files
MSealpranavrth
andauthored
Moved async into experimental package for regular release (#2089)
* Moved async into experimental package for regular release * Style fixes --------- Co-authored-by: Pranav Rathi <[email protected]>
1 parent adc1b81 commit f443ea6

25 files changed

+89
-83
lines changed

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66

77
### Added
88

9-
- AsyncIO Producer (experimental): Introduces `confluent_kafka.aio.AIOProducer` for
9+
- AsyncIO Producer (experimental): Introduces beta class `AIOProducer` for
1010
asynchronous message production in asyncio applications. This API offloads
1111
blocking librdkafka calls to a thread pool and schedules common callbacks
1212
(`error_cb`, `throttle_cb`, `stats_cb`, `oauth_cb`, `logger`) onto the event
1313
loop for safe usage inside async frameworks.
1414

1515
### Features
1616

17-
- Batched async produce: `await aio.AIOProducer(...).produce(topic, value=...)`
17+
- Batched async produce: `await AIOProducer(...).produce(topic, value=...)`
1818
buffers messages and flushes when the buffer threshold or timeout is reached.
1919
- Async lifecycle: `await producer.flush()`, `await producer.purge()`, and
2020
transactional operations (`init_transactions`, `begin_transaction`,

DEVELOPER.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build
6262
## Project layout
6363

6464
- `src/confluent_kafka/` — core sync client APIs
65-
- `src/confluent_kafka/aio/` — AsyncIO Producer/Consumer (first-class asyncio, not generated)
65+
- `src/confluent_kafka/experimental/aio/` — AsyncIO Producer/Consumer (first-class asyncio, not generated)
6666
- `src/confluent_kafka/schema_registry/` — Schema Registry clients and serdes
6767
- `tests/` — unit and integration tests (including async producer tests)
6868
- `examples/` — runnable samples (includes asyncio example)
@@ -103,14 +103,14 @@ python3 tools/unasync.py --check
103103

104104
If you make any changes to the async code (in `src/confluent_kafka/schema_registry/_async` and `tests/integration/schema_registry/_async`), you **must** run this script to generate the sync counterparts (in `src/confluent_kafka/schema_registry/_sync` and `tests/integration/schema_registry/_sync`). Otherwise, this script will be run in CI with the `--check` flag and fail the build.
105105

106-
Note: The AsyncIO Producer/Consumer under `src/confluent_kafka/aio/` are first-class asyncio implementations and are not generated using `unasync`.
106+
Note: The AsyncIO Producer/Consumer under `src/confluent_kafka/experimental/aio/` are first-class asyncio implementations and are not generated using `unasync`.
107107

108108
## AsyncIO Producer development (AIOProducer)
109109

110110
Source:
111111

112-
- `src/confluent_kafka/aio/producer/_AIOProducer.py` (public async API)
113-
- Internal modules in `src/confluent_kafka/aio/producer/` and helpers in `src/confluent_kafka/aio/_common.py`
112+
- `src/confluent_kafka/experimental/aio/producer/_AIOProducer.py` (public async API)
113+
- Internal modules in `src/confluent_kafka/experimental/aio/producer/` and helpers in `src/confluent_kafka/experimental/aio/_common.py`
114114

115115
For a complete usage example, see [`examples/asyncio_example.py`](examples/asyncio_example.py).
116116

README.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Unlike the basic Apache Kafka Python client, `confluent-kafka-python` provides:
2727

2828
- **High Performance & Reliability**: Built on [`librdkafka`](https://github.com/confluentinc/librdkafka), the battle-tested C client for Apache Kafka, ensuring maximum throughput, low latency, and stability. The client is supported by Confluent and is trusted in mission-critical production environments.
2929
- **Comprehensive Kafka Support**: Full support for the Kafka protocol, transactions, and administration APIs.
30-
- **AsyncIO Producer**: A fully asynchronous producer (`AIOProducer`) for seamless integration with modern Python applications using `asyncio`.
30+
- **Experimental; AsyncIO Producer**: An experimental fully asynchronous producer (`AIOProducer`) for seamless integration with modern Python applications using `asyncio`.
3131
- **Seamless Schema Registry Integration**: Synchronous and asynchronous clients for Confluent Schema Registry to handle schema management and serialization (Avro, Protobuf, JSON Schema).
3232
- **Improved Error Handling**: Detailed, context-aware error messages and exceptions to speed up debugging and troubleshooting.
3333
- **[Confluent Cloud] Automatic Zone Detection**: Producers automatically connect to brokers in the same availability zone, reducing latency and data transfer costs without requiring manual configuration.
@@ -60,7 +60,7 @@ Use the AsyncIO `Producer` inside async applications to avoid blocking the event
6060

6161
```python
6262
import asyncio
63-
from confluent_kafka.aio import AIOProducer
63+
from confluent_kafka.experimental.aio import AIOProducer
6464

6565
async def main():
6666
p = AIOProducer({"bootstrap.servers": "mybroker"})
@@ -97,7 +97,6 @@ For a more detailed example that includes both an async producer and consumer, s
9797

9898
The AsyncIO producer and consumer integrate seamlessly with async Schema Registry serializers. See the [Schema Registry Integration](#schema-registry-integration) section below for full details.
9999

100-
**Migration Note:** If you previously used custom AsyncIO wrappers, you can now migrate to the official `AIOProducer` which handles thread pool management, callback scheduling, and cleanup automatically. See the [blog post](https://www.confluent.io/blog/kafka-python-asyncio-integration/) for migration guidance.
101100
### Basic Producer example
102101

103102
```python
@@ -178,7 +177,7 @@ producer.flush()
178177
Use the `AsyncSchemaRegistryClient` and `Async` serializers with `AIOProducer` and `AIOConsumer`. The configuration is the same as the synchronous client.
179178

180179
```python
181-
from confluent_kafka.aio import AIOProducer
180+
from confluent_kafka.experimental.aio import AIOProducer
182181
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
183182
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer
184183

@@ -316,7 +315,7 @@ For source install, see the *Install from source* section in [INSTALL.md](INSTAL
316315
## Broker compatibility
317316

318317
The Python client (as well as the underlying C library librdkafka) supports
319-
all broker versions &gt;= 0.8.
318+
all broker versions >= 0.8.
320319
But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it
321320
is not safe for a client to assume what protocol version is actually supported
322321
by the broker, thus you will need to hint the Python client what protocol

aio_producer_simple_diagram.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ The `AIOProducer` implements a multi-component architecture designed for high-pe
7777

7878
### Source Code Location
7979

80-
- **Main Implementation**: `src/confluent_kafka/aio/producer/_AIOProducer.py`
81-
- **Supporting Modules**: `src/confluent_kafka/aio/producer/` directory
82-
- **Common Utilities**: `src/confluent_kafka/aio/_common.py`
80+
- **Main Implementation**: `src/confluent_kafka/experimental/aio/producer/_AIOProducer.py`
81+
- **Supporting Modules**: `src/confluent_kafka/experimental/aio/producer/` directory
82+
- **Common Utilities**: `src/confluent_kafka/experimental/aio/_common.py`
8383

8484
### Design Principles
8585

examples/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ The scripts in this directory provide various examples of using the Confluent Py
1111

1212
## AsyncIO Examples
1313

14-
- [asyncio_example.py](asyncio_example.py): Comprehensive AsyncIO example demonstrating both AIOProducer and AIOConsumer with transactional operations, batched async produce, proper event loop integration, signal handling, and async callback patterns.
14+
- [asyncio_example.py](asyncio_example.py): Experimental comprehensive AsyncIO example demonstrating both AIOProducer and AIOConsumer with transactional operations, batched async produce, proper event loop integration, signal handling, and async callback patterns.
1515
- [asyncio_avro_producer.py](asyncio_avro_producer.py): Minimal AsyncIO Avro producer using `AsyncSchemaRegistryClient` and `AsyncAvroSerializer` (supports Confluent Cloud using `--sr-api-key`/`--sr-api-secret`).
1616

1717
**Architecture:** For implementation details and component design, see the [AIOProducer Architecture Overview](../aio_producer_simple_diagram.md).
@@ -24,7 +24,7 @@ The AsyncIO producer works seamlessly with popular Python web frameworks:
2424

2525
```python
2626
from fastapi import FastAPI
27-
from confluent_kafka.aio import AIOProducer
27+
from confluent_kafka.experimental.aio import AIOProducer
2828

2929
app = FastAPI()
3030
producer = None
@@ -45,7 +45,7 @@ async def create_event(data: dict):
4545

4646
```python
4747
from aiohttp import web
48-
from confluent_kafka.aio import AIOProducer
48+
from confluent_kafka.experimental.aio import AIOProducer
4949

5050
async def init_app():
5151
app = web.Application()
@@ -66,7 +66,7 @@ For more details, see [Integrating Apache Kafka With Python Asyncio Web Applicat
6666
The AsyncIO producer and consumer work seamlessly with async Schema Registry serializers:
6767

6868
```python
69-
from confluent_kafka.aio import AIOProducer
69+
from confluent_kafka.experimental.aio import AIOProducer
7070
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
7171
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer
7272

@@ -163,15 +163,15 @@ producer_conf = {
163163
producer = Producer(producer_conf)
164164
```
165165

166-
### Asynchronous usage (AsyncIO)
166+
### Asynchronous usage (Experimental AsyncIO)
167167

168168
Use async serializers with `AIOProducer` and `AIOConsumer`. Note that you must
169169
instantiate the serializer and then call it to serialize the data *before*
170170
producing.
171171

172172
```python
173173
# From examples/README.md
174-
from confluent_kafka.aio import AIOProducer
174+
from confluent_kafka.experimental.aio import AIOProducer
175175
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
176176
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer
177177

examples/asyncio_avro_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import argparse
2121
import asyncio
2222

23-
from confluent_kafka.aio import AIOProducer
23+
from confluent_kafka.experimental.aio import AIOProducer
2424
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
2525
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer
2626
from confluent_kafka.serialization import SerializationContext, MessageField

examples/asyncio_example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
import asyncio
1818
import sys
19-
from confluent_kafka.aio import AIOProducer
20-
from confluent_kafka.aio import AIOConsumer
19+
from confluent_kafka.experimental.aio import AIOProducer
20+
from confluent_kafka.experimental.aio import AIOConsumer
2121
import random
2222
import logging
2323
import signal

src/confluent_kafka/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
__all__ = [
5050
"admin",
5151
"Consumer",
52+
"experimental",
5253
"KafkaError",
5354
"KafkaException",
5455
"kafkatest",

src/confluent_kafka/aio/producer/__init__.py

Lines changed: 0 additions & 40 deletions
This file was deleted.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright 2025 Confluent Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Experimental APIs for confluent_kafka.
17+
18+
These APIs are subject to change and may be removed or modified in
19+
incompatible ways in future releases.
20+
"""

0 commit comments

Comments
 (0)