Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .config/dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Kasurde
Nikhil
OAUTHBEARER
Oliveira
PKCS
PYTHONUNBUFFERED
Passw
Thycotic
Expand Down
1 change: 1 addition & 0 deletions .config/manifest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extensions/eda/plugins/event_source/file_watch.py
extensions/eda/plugins/event_source/generic.py
extensions/eda/plugins/event_source/journald.py
extensions/eda/plugins/event_source/kafka.py
extensions/eda/plugins/event_source/oauth_tokens.py
extensions/eda/plugins/event_source/pg_listener.py
extensions/eda/plugins/event_source/range.py
extensions/eda/plugins/event_source/README.md
Expand Down
8 changes: 6 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ repos:
- id: mypy
# empty args needed in order to match mypy cli behavior
args: []
language_version: python3.9 # minimal supported version, keeps results consistent
language_version: python3.10 # minimal supported version, keeps results consistent
additional_dependencies:
- aiohttp
- aiokafka
Expand All @@ -68,6 +68,8 @@ repos:
- types-requests
- watchdog>=5.0.0
- xxhash
- pyjwt
- cryptography

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.13.3"
Expand Down Expand Up @@ -109,6 +111,8 @@ repos:
- types-aiobotocore
- watchdog>=5.0.0
- xxhash
- pyjwt
- cryptography
- repo: local
hooks:
- id: ansible-test-sanity
Expand Down Expand Up @@ -138,7 +142,7 @@ repos:
entry: pip-compile --upgrade --no-annotate --strip-extras --unsafe-package=ansible-core --unsafe-package=doctutils --unsafe-package=setuptools --output-file=.config/constraints.txt .config/requirements.txt .config/requirements-test.txt .config/requirements-docs.txt
files: ^.config\/.*requirements.*$
language: python
language_version: "3.9" # minimal we support officially
language_version: "3.10" # minimal we support officially
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do this in a different PR and set 3.11 as minimal.

pass_filenames: false
stages: [manual]
additional_dependencies:
Expand Down
166 changes: 149 additions & 17 deletions extensions/eda/plugins/event_source/kafka.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,43 @@
import asyncio
import json
import logging
import os
import sys
from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from typing import Any

from aiokafka import AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context

try:
from .oauth_tokens import create_oauth_provider
except ImportError:
# Since ansible-rulebook launches the source plugin via
# run_py.run_path it doesn't set the python path and the
# import fails, hence this workaround.
module_directory = os.path.dirname(os.path.abspath(__file__))
sys.path.append(module_directory)
from oauth_tokens import create_oauth_provider # type: ignore

DEFAULT_SASL_MECHANISM = "PLAIN"
DEFAULT_OFFSET = "latest"
DEFAULT_VERIFY_MODE = "CERT_REQUIRED"
VERIFY_MODES = {
"CERT_NONE": CERT_NONE,
"CERT_OPTIONAL": CERT_OPTIONAL,
"CERT_REQUIRED": CERT_REQUIRED,
}

SUPPORTED_SASL_MECHANISMS = [
"PLAIN",
"SCRAM-SHA-256",
"SCRAM-SHA-512",
"GSSAPI",
"OAUTHBEARER",
]
USER_PASSWORD_MECHANISMS = ["SCRAM-SHA-256", "SCRAM-SHA-512"]
PLAIN_CREDENTIAL_KEYS = ["sasl_plain_username", "sasl_plain_password"]

DOCUMENTATION = r"""
---
short_description: Receive events via a kafka topic.
Expand All @@ -32,7 +63,7 @@
description:
- The optional client certificate file path containing the client
certificate, as well as CA certificates needed to establish
the certificate's authenticity.
the certificates authenticity.
type: str
keyfile:
description:
Expand Down Expand Up @@ -121,6 +152,80 @@
description:
- The kerberos REALM
type: str
sasl_oauth_token_endpoint:
Copy link
Contributor

@Alex-Izquierdo Alex-Izquierdo Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would have already 17 sasl related parameters, this is a awful and bad interface IMO. We should nest all these params into their own sasl config object, with backward compatibility of the existing params with deprecation in favor of the new system. Which probably should be correlated with an internal schema for sasl config because dealing with some many fields with a regular dict becomes hard to read and maintain.

description:
- The URL to get the OAuth2 token from your Authorization Server
type: str
sasl_oauth_client_id:
description:
- The id used when fetching the token from Authorization Server
type: str
sasl_oauth_client_secret:
description:
- The secret used when fetching the token from Authorization Server.
This is not needed if you are using private_key_jwt
type: str
sasl_oauth_private_keyfile:
description:
- When using the private_key_jwt this specifies the private key file
This is not needed if you are using regular oauth flow
type: str
sasl_oauth_public_keyfile:
description:
- When using the private_key_jwt this specifies the public key file
This is not needed if you are using regular oauth flow
Some of the Authorization Servers require that the public key
signature be sent instead of the key id
If a public key file is specified the JWT Header will have the
x5t X.509 Certificate SHA-1 Thumb print
x5t#S256 X.509 Certificate SHA-256 Thumb print
If this field is missing we wont set these in the JWT Header
type: str
sasl_oauth_issuer:
description:
- When using the private_key_jwt this specifies the issuer (iss)
that will be set in the JWT header, by default this value
is set to be the same as sasl_oauth_client_id
type: str
default: the same value as sasl_oauth_client_id
sasl_oauth_subject:
description:
- When using the private_key_jwt this specifies the issuer (sub)
that will be set in the JWT header, by default this value
is set to be the same as sasl_oauth_client_id
type: str
default: the same value as sasl_oauth_client_id
sasl_oauth_audience:
description:
- When using the private_key_jwt this specifies the audience (aud)
that will be set in the JWT header, by default this value
is set to be the same as sasl_oauth_token_endpoint
type: str
default: the same value as sasl_oauth_token_endpoint
sasl_oauth_token_duration:
description:
- The life span of the token specified in minutes
The default is 30 minutes
type: int
default: 30
sasl_oauth_algorithm:
description:
- When using the private_key_jwt the algorithm to use in jwt
that will be set in the JWT header, by default this value
is set to be RS256
type: str
default: "RS256"
sasl_oauth_method:
description:
- When fetching a token from a Auth Server you can choose from
client_secret_basic, client_secret_post
client_secret_jwt, private_key_jwt
type: str
default: "client_secret_basic"
sasl_oauth_scope:
description:
- The optional scope when using OAUTHBEARER
type: str
"""

EXAMPLES = r"""
Expand All @@ -142,6 +247,38 @@
"""


def _validate_args(args: dict[str, Any]) -> None:

sasl_mechanism = args.get("sasl_mechanism", DEFAULT_SASL_MECHANISM)
verify_mode = args.get("verify_mode", DEFAULT_VERIFY_MODE)
offset = args.get("offset", DEFAULT_OFFSET)

if sasl_mechanism not in SUPPORTED_SASL_MECHANISMS:
msg = (
f"SASL Mechanism {sasl_mechanism} is not supported: "
f"valid mechanisms are {SUPPORTED_SASL_MECHANISMS}"
)
raise ValueError(msg)

if sasl_mechanism in USER_PASSWORD_MECHANISMS:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

USER_PASSWORD_MECHANISMS does not include PLAIN

for key in PLAIN_CREDENTIAL_KEYS:
if key not in args:
msg = (
f"For sasl_mechanism {sasl_mechanism}, {key} is missing."
"Please specify all of the following arguments: "
f"{','.join(PLAIN_CREDENTIAL_KEYS)}"
)
raise ValueError(msg)

if offset not in ("latest", "earliest"):
msg = f"Invalid offset option: {offset}"
raise ValueError(msg)

if verify_mode not in VERIFY_MODES:
msg = f"Invalid verify_mode option: {verify_mode}"
raise ValueError(msg)


async def main( # pylint: disable=R0914
queue: asyncio.Queue[Any],
args: dict[str, Any],
Expand All @@ -168,28 +305,22 @@ async def main( # pylint: disable=R0914
keyfile = args.get("keyfile")
password = args.get("password")
check_hostname = args.get("check_hostname", True)
verify_mode = args.get("verify_mode", "CERT_REQUIRED")
verify_mode = args.get("verify_mode", DEFAULT_VERIFY_MODE)
group_id = args.get("group_id")
offset = args.get("offset", "latest")
offset = args.get("offset", DEFAULT_OFFSET)
encoding = args.get("encoding", "utf-8")
security_protocol = args.get("security_protocol", "PLAINTEXT")
sasl_mechanism = args.get("sasl_mechanism", DEFAULT_SASL_MECHANISM)
sasl_oauth_token_provider = None

if offset not in ("latest", "earliest"):
msg = f"Invalid offset option: {offset}"
raise ValueError(msg)
_validate_args(args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaults management is inconsistent because we already set the default values also in main, the default values should be set only in one place.
Encapsulating validation is great but it should be called before declaring any other variable that comes from the args.

I suggest a validate args object, where we can centralize parsing, validation and defaults.


verify_modes = {
"CERT_NONE": CERT_NONE,
"CERT_OPTIONAL": CERT_OPTIONAL,
"CERT_REQUIRED": CERT_REQUIRED,
}
try:
verify_mode = verify_modes[verify_mode]
except KeyError as exc:
msg = f"Invalid verify_mode option: {verify_mode}"
raise ValueError(msg) from exc
if sasl_mechanism == "OAUTHBEARER":

sasl_oauth_token_provider = create_oauth_provider(args)

ssl_context = None
verify_mode = VERIFY_MODES[verify_mode]
if cafile or security_protocol.endswith("SSL"):
security_protocol = security_protocol.replace("PLAINTEXT", "SSL")
ssl_context = create_ssl_context(
Expand All @@ -209,12 +340,13 @@ async def main( # pylint: disable=R0914
auto_offset_reset=offset,
security_protocol=security_protocol,
ssl_context=ssl_context,
sasl_mechanism=args.get("sasl_mechanism", "PLAIN"),
sasl_mechanism=args.get("sasl_mechanism", DEFAULT_SASL_MECHANISM),
sasl_plain_username=args.get("sasl_plain_username"),
sasl_plain_password=args.get("sasl_plain_password"),
sasl_kerberos_service_name=args.get("sasl_kerberos_service_name"),
sasl_kerberos_domain_name=args.get("sasl_kerberos_domain_name"),
metadata_max_age_ms=int(args.get("metadata_max_age_ms", 300000)),
sasl_oauth_token_provider=sasl_oauth_token_provider,
)

kafka_consumer.subscribe(topics=topics, pattern=topic_pattern)
Expand Down
Loading
Loading