Skip to content

Commit 07c66f9

Browse files
committed
feat: Support SCRAM and OAuth mechanisms in Kafka plugin
Added support for fetching tokens from external Auth Servers. When fetching tokens supports * client_secret_basic * client_secret_post * client_secret_jwt * private_key_jwt These authentication methods are described in RFC 6749 and RFC 7523 Added validations for SCRAM-SHA-256 and SCRAM-SHA-512 With this PR we have complete support for all SASL Mechanisms needed in Kafka * PLAIN * SCRAM-SHA-256 * SCRAM-SHA-512 * GSSAPI * OAUTHBEARER
1 parent 6cdd8dc commit 07c66f9

File tree

7 files changed

+1785
-4
lines changed

7 files changed

+1785
-4
lines changed

.config/manifest.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ extensions/eda/plugins/event_source/file_watch.py
2828
extensions/eda/plugins/event_source/generic.py
2929
extensions/eda/plugins/event_source/journald.py
3030
extensions/eda/plugins/event_source/kafka.py
31+
extensions/eda/plugins/event_source/oauth_tokens.py
3132
extensions/eda/plugins/event_source/pg_listener.py
3233
extensions/eda/plugins/event_source/range.py
3334
extensions/eda/plugins/event_source/README.md

extensions/eda/plugins/event_source/kafka.py

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,29 @@
11
import asyncio
22
import json
33
import logging
4+
import os
5+
import sys
46
from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
57
from typing import Any
68

79
from aiokafka import AIOKafkaConsumer
810
from aiokafka.helpers import create_ssl_context
911

12+
module_path = os.path.abspath(__file__)
13+
module_directory = os.path.dirname(module_path)
14+
sys.path.append(module_directory)
15+
16+
17+
SUPPORTED_SASL_MECHANISMS = [
18+
"PLAIN",
19+
"SCRAM-SHA-256",
20+
"SCRAM-SHA-512",
21+
"GSSAPI",
22+
"OAUTHBEARER",
23+
]
24+
USER_PASSWORD_MECHANISMS = ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]
25+
PLAIN_CREDENTIAL_KEYS = ["sasl_plain_username", "sasl_plain_password"]
26+
1027
DOCUMENTATION = r"""
1128
---
1229
short_description: Receive events via a kafka topic.
@@ -32,7 +49,7 @@
3249
description:
3350
- The optional client certificate file path containing the client
3451
certificate, as well as CA certificates needed to establish
35-
the certificate's authenticity.
52+
the certificates authenticity.
3653
type: str
3754
keyfile:
3855
description:
@@ -121,6 +138,80 @@
121138
description:
122139
- The kerberos REALM
123140
type: str
141+
sasl_oauth_token_endpoint:
142+
description:
143+
- The URL to get the OAuth2 token from your Authorization Server
144+
type: str
145+
sasl_oauth_client_id:
146+
description:
147+
- The id used when fetching the token from Authorization Server
148+
type: str
149+
sasl_oauth_client_secret:
150+
description:
151+
- The secret used when fetching the token from Authorization Server.
152+
This is not needed if you are using private_key_jwt
153+
type: str
154+
sasl_oauth_private_keyfile:
155+
description:
156+
- When using the private_key_jwt this specifies the private key file
157+
This is not needed if you are using regular oauth flow
158+
type: str
159+
sasl_oauth_public_keyfile:
160+
description:
161+
- When using the private_key_jwt this specifies the public key file
162+
This is not needed if you are using regular oauth flow
163+
Some of the Authorization Servers require that the public key
164+
signature be sent instead of the key id
165+
If a public key file is specified the JWT Header will have the
166+
x5t X.509 Certificate SHA-1 Thumb print
167+
x5t#S256 X.509 Certificate SHA-256 Thumb print
168+
If this field is missing we wont set these in the JWT Header
169+
type: str
170+
sasl_oauth_issuer:
171+
description:
172+
- When using the private_key_jwt this specifies the issuer (iss)
173+
that will be set in the JWT header, by default this value
174+
is set to be the same as sasl_oauth_client_id
175+
type: str
176+
default: the same value as sasl_oauth_client_id
177+
sasl_oauth_subject:
178+
description:
179+
- When using the private_key_jwt this specifies the issuer (sub)
180+
that will be set in the JWT header, by default this value
181+
is set to be the same as sasl_oauth_client_id
182+
type: str
183+
default: the same value as sasl_oauth_client_id
184+
sasl_oauth_audience:
185+
description:
186+
- When using the private_key_jwt this specifies the audience (aud)
187+
that will be set in the JWT header, by default this value
188+
is set to be the same as sasl_oauth_token_endpoint
189+
type: str
190+
default: the same value as sasl_oauth_token_endpoint
191+
sasl_oauth_token_duration:
192+
description:
193+
- The life span of the token specified in minutes
194+
The default is 30 minutes
195+
type: int
196+
default: 30
197+
sasl_oauth_algorithm:
198+
description:
199+
- When using the private_key_jwt the algorithm to use in jwt
200+
that will be set in the JWT header, by default this value
201+
is set to be RS256
202+
type: str
203+
default: "RS256"
204+
sasl_oauth_method:
205+
description:
206+
- When fetching a token from a Auth Server you can choose from
207+
client_secret_basic, client_secret_post
208+
client_secret_jwt, private_key_jwt
209+
type: str
210+
default: "client_secret_basic"
211+
sasl_oauth_scope:
212+
description:
213+
- The optional scope when using OAUTHBEARER
214+
type: str
124215
"""
125216

126217
EXAMPLES = r"""
@@ -173,6 +264,30 @@ async def main( # pylint: disable=R0914
173264
offset = args.get("offset", "latest")
174265
encoding = args.get("encoding", "utf-8")
175266
security_protocol = args.get("security_protocol", "PLAINTEXT")
267+
sasl_mechanism = args.get("sasl_mechanism", "PLAIN")
268+
sasl_oauth_token_provider = None
269+
270+
if sasl_mechanism not in SUPPORTED_SASL_MECHANISMS:
271+
msg = (
272+
f"SASL Mechanism {sasl_mechanism} is not supported: "
273+
f"valid mechanisms are {SUPPORTED_SASL_MECHANISMS}"
274+
)
275+
raise ValueError(msg)
276+
277+
if sasl_mechanism == "OAUTHBEARER":
278+
from oauth_tokens import create_oauth_provider
279+
280+
sasl_oauth_token_provider = create_oauth_provider(args)
281+
282+
if sasl_mechanism in USER_PASSWORD_MECHANISMS:
283+
for key in PLAIN_CREDENTIAL_KEYS:
284+
if key not in args:
285+
msg = (
286+
f"For sasl_mechanism {sasl_mechanism}, {key} is missing."
287+
"Please specify all of the following arguments: "
288+
f"{','.join(PLAIN_CREDENTIAL_KEYS)}"
289+
)
290+
raise ValueError(msg)
176291

177292
if offset not in ("latest", "earliest"):
178293
msg = f"Invalid offset option: {offset}"
@@ -215,6 +330,7 @@ async def main( # pylint: disable=R0914
215330
sasl_kerberos_service_name=args.get("sasl_kerberos_service_name"),
216331
sasl_kerberos_domain_name=args.get("sasl_kerberos_domain_name"),
217332
metadata_max_age_ms=int(args.get("metadata_max_age_ms", 300000)),
333+
sasl_oauth_token_provider=sasl_oauth_token_provider,
218334
)
219335

220336
kafka_consumer.subscribe(topics=topics, pattern=topic_pattern)

0 commit comments

Comments
 (0)