Skip to content

Commit 3a988e2

Browse files
authored
OAuth Azure IMDS Kafka example (#2083)
1 parent 48ad122 commit 3a988e2

12 files changed

+119
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
v2.12.0 is a feature release with the following enhancements:
77

8+
- Kafka OAuth/OIDC metadata based authentication examples with Azure IMDS (#2083).
9+
810
confluent-kafka-python v2.12.0 is based on librdkafka v2.12.0, see the
911
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0)
1012
for a complete list of changes, enhancements, fixes and upgrade considerations.

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ from confluent_kafka.schema_registry._async.protobuf import AsyncProtobufSeriali
239239
- [sasl_producer.py](sasl_producer.py): Demonstrates SASL Authentication.
240240
- [oauth_producer.py](oauth_producer.py): Demonstrates OAuth Authentication (client credentials).
241241
- [oauth_oidc_ccloud_producer.py](oauth_oidc_ccloud_producer.py): Demonstrates OAuth OIDC authentication with Confluent Cloud.
242+
- [oauth_oidc_ccloud_azure_imds_producer.py](oauth_oidc_ccloud_azure_imds_producer.py): Demonstrates OAuth/OIDC Authentication with Confluent Cloud (Azure IMDS metadata based authentication).
242243
- [oauth_schema_registry.py](oauth_schema_registry.py): Demonstrates OAuth authentication with Schema Registry.
243244

244245
## Admin API Examples

examples/avro_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def delivery_report(err, msg):
7777
Reports the failure or success of a message delivery.
7878
7979
Args:
80-
err (KafkaError): The error that occurred on None on success.
80+
err (KafkaError): The error that occurred, or None on success.
8181
8282
msg (Message): The message that was produced or failed.
8383

examples/avro_producer_encryption.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def delivery_report(err, msg):
9595
Reports the failure or success of a message delivery.
9696
9797
Args:
98-
err (KafkaError): The error that occurred on None on success.
98+
err (KafkaError): The error that occurred, or None on success.
9999
100100
msg (Message): The message that was produced or failed.
101101

examples/json_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def delivery_report(err, msg):
7676
Reports the success or failure of a message delivery.
7777
7878
Args:
79-
err (KafkaError): The error that occurred on None on success.
79+
err (KafkaError): The error that occurred, or None on success.
8080
msg (Message): The message that was produced or failed.
8181
"""
8282

examples/json_producer_encryption.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def delivery_report(err, msg):
9494
Reports the success or failure of a message delivery.
9595
9696
Args:
97-
err (KafkaError): The error that occurred on None on success.
97+
err (KafkaError): The error that occurred, or None on success.
9898
msg (Message): The message that was produced or failed.
9999
"""
100100

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2025 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
# This example uses Azure IMDS for credential-less authentication
20+
# to Kafka on Confluent Cloud
21+
22+
import logging
23+
import argparse
24+
from confluent_kafka import Producer
25+
from confluent_kafka.serialization import (StringSerializer)
26+
27+
28+
def producer_config(args):
29+
logger = logging.getLogger(__name__)
30+
logger.setLevel(logging.DEBUG)
31+
params = {
32+
'bootstrap.servers': args.bootstrap_servers,
33+
'security.protocol': 'SASL_SSL',
34+
'sasl.mechanisms': 'OAUTHBEARER',
35+
'sasl.oauthbearer.method': 'oidc',
36+
'sasl.oauthbearer.metadata.authentication.type': 'azure_imds',
37+
'sasl.oauthbearer.config': f'query={args.query}'
38+
}
39+
# These two parameters are only applicable when producing to
40+
# Confluent Cloud where some sasl extensions are required.
41+
if args.logical_cluster and args.identity_pool_id:
42+
params['sasl.oauthbearer.extensions'] = 'logicalCluster=' + args.logical_cluster + \
43+
',identityPoolId=' + args.identity_pool_id
44+
45+
return params
46+
47+
48+
def delivery_report(err, msg):
49+
"""
50+
Reports the failure or success of a message delivery.
51+
52+
Args:
53+
err (KafkaError): The error that occurred, or None on success.
54+
55+
msg (Message): The message that was produced or failed.
56+
57+
Note:
58+
In the delivery report callback the Message.key() and Message.value()
59+
will be the binary format as encoded by any configured Serializers and
60+
not the same object that was passed to produce().
61+
If you wish to pass the original object(s) for key and value to delivery
62+
report callback we recommend a bound callback or lambda where you pass
63+
the objects along.
64+
65+
"""
66+
if err is not None:
67+
print('Delivery failed for User record {}: {}'.format(msg.key(), err))
68+
return
69+
print('User record {} successfully produced to {} [{}] at offset {}'.format(
70+
msg.key(), msg.topic(), msg.partition(), msg.offset()))
71+
72+
73+
def main(args):
74+
topic = args.topic
75+
producer_conf = producer_config(args)
76+
producer = Producer(producer_conf)
77+
string_serializer = StringSerializer('utf_8')
78+
79+
print('Producing records to topic {}. ^C to exit.'.format(topic))
80+
while True:
81+
# Serve on_delivery callbacks from previous calls to produce()
82+
producer.poll(0.0)
83+
try:
84+
name = input(">")
85+
producer.produce(topic=topic,
86+
key=string_serializer(name),
87+
value=string_serializer(name),
88+
on_delivery=delivery_report)
89+
except KeyboardInterrupt:
90+
break
91+
92+
print('\nFlushing {} records...'.format(len(producer)))
93+
producer.flush()
94+
95+
96+
if __name__ == '__main__':
97+
parser = argparse.ArgumentParser(description="OAuth/OIDC example using Azure IMDS metadata-based authentication")
98+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
99+
help="Bootstrap broker(s) (host[:port])")
100+
parser.add_argument('-t', dest="topic", default="example_producer_oauth",
101+
help="Topic name")
102+
parser.add_argument('--query', dest="query", required=True,
103+
help="Query parameters for Azure IMDS token endpoint")
104+
parser.add_argument('--logical-cluster', dest="logical_cluster", required=False, help="Logical Cluster.")
105+
parser.add_argument('--identity-pool-id', dest="identity_pool_id", required=False, help="Identity Pool ID.")
106+
107+
main(parser.parse_args())

examples/oauth_oidc_ccloud_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def delivery_report(err, msg):
5454
Reports the failure or success of a message delivery.
5555
5656
Args:
57-
err (KafkaError): The error that occurred on None on success.
57+
err (KafkaError): The error that occurred, or None on success.
5858
5959
msg (Message): The message that was produced or failed.
6060

examples/oauth_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def delivery_report(err, msg):
6565
Reports the failure or success of a message delivery.
6666
6767
Args:
68-
err (KafkaError): The error that occurred on None on success.
68+
err (KafkaError): The error that occurred, or None on success.
6969
7070
msg (Message): The message that was produced or failed.
7171

examples/protobuf_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def delivery_report(err, msg):
4646
Reports the failure or success of a message delivery.
4747
4848
Args:
49-
err (KafkaError): The error that occurred on None on success.
49+
err (KafkaError): The error that occurred, or None on success.
5050
msg (Message): The message that was produced or failed.
5151
"""
5252

0 commit comments

Comments
 (0)