Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions quixstreams/sinks/community/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class KafkaReplicatorSink(BaseSink):

```python
from quixstreams import Application
from quixstreams.sinks.community.kafka import KafkaSink
from quixstreams.sinks.community.kafka import KafkaReplicatorSink

app = Application(
consumer_group="group",
Expand All @@ -38,7 +38,7 @@ class KafkaReplicatorSink(BaseSink):
topic = app.topic("input-topic")

# Define the external Kafka cluster configuration
kafka_sink = KafkaSink(
kafka_sink = KafkaReplicatorSink(
broker_address="external-kafka:9092",
topic_name="output-topic",
value_serializer="json",
Expand Down Expand Up @@ -206,4 +206,6 @@ def flush(self) -> None:
)
raise SinkBackpressureError(retry_after=10.0)

logger.debug(f'Successfully flushed KafkaSink for topic "{self._topic_name}"')
logger.debug(
f'Successfully flushed KafkaReplicatorSink for topic "{self._topic_name}"'
)