Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions extensions/eda/plugins/event_source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from typing import Any
from typing import Any, Optional

from aiokafka import AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
Expand All @@ -14,15 +14,21 @@
- An ansible-rulebook event source plugin for receiving events via a kafka topic.
options:
host:
description:
- The host where the kafka topic is hosted.
type: str
required: true
required: false
port:
description:
- The port where the kafka server is listening.
type: str
required: true
type: false
brokers:
description:
- A list of host[:port] strings
that the consumer should contact to bootstrap initial cluster metadata.
This does not have to be the full node list.
It just needs to have at least one broker that will respond to a Metadata API Request
type: list(str)
required: false
cafile:
description:
- The optional certificate authority file path containing certificates
Expand Down Expand Up @@ -126,7 +132,11 @@
EXAMPLES = r"""
- ansible.eda.kafka:
host: "localhost"
port: "9092"
port: 9092
brokers:
- broker-1:9092
- broker-2:9093
- broker-3:9094
check_hostname: true
verify_mode: "CERT_OPTIONAL"
encoding: "utf-8"
Expand All @@ -142,6 +152,22 @@
"""


def _host_or_broker_validation(
host: Optional[str], port: Optional[int], brokers: Optional[list[str]]
) -> None:
if host and brokers:
msg = "Only one of host and brokers parameter must be set"
raise ValueError(msg)

if (host and not port) or (port and not host):
msg = "Host and port must be set"
raise ValueError(msg)

if not host and not brokers:
msg = "host and port or brokers must be set"
raise ValueError(msg)


async def main( # pylint: disable=R0914
queue: asyncio.Queue[Any],
args: dict[str, Any],
Expand All @@ -163,6 +189,7 @@ async def main( # pylint: disable=R0914

host = args.get("host")
port = args.get("port")
brokers = args.get("brokers")
cafile = args.get("cafile")
certfile = args.get("certfile")
keyfile = args.get("keyfile")
Expand All @@ -174,6 +201,8 @@ async def main( # pylint: disable=R0914
encoding = args.get("encoding", "utf-8")
security_protocol = args.get("security_protocol", "PLAINTEXT")

_host_or_broker_validation(host, port, brokers)

if offset not in ("latest", "earliest"):
msg = f"Invalid offset option: {offset}"
raise ValueError(msg)
Expand Down Expand Up @@ -202,7 +231,7 @@ async def main( # pylint: disable=R0914
ssl_context.verify_mode = verify_mode

kafka_consumer = AIOKafkaConsumer(
bootstrap_servers=f"{host}:{port}",
bootstrap_servers=brokers if brokers else f"{host}:{port}",
group_id=group_id,
enable_auto_commit=True,
max_poll_records=1,
Expand Down
40 changes: 39 additions & 1 deletion tests/unit/event_source/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import json
import re
from typing import Any
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -67,7 +68,7 @@ def test_receive_from_kafka_place_in_queue(
{
topic_type: topic_value,
"host": "localhost",
"port": "9092",
"port": 9092,
"group_id": "test",
},
)
Expand Down Expand Up @@ -96,3 +97,40 @@ def test_mixed_topics_and_patterns(
match="Exactly one of topic, topics, or topic_pattern must be provided.",
):
asyncio.run(kafka_main(myqueue, topic_args))


@pytest.mark.parametrize(
"args, error_msg",
[
# Only host set
(
{"host": "localhost", "port": None, "brokers": None, "topic": "eda"},
"Host and port must be set",
),
# Only port set
(
{"host": None, "port": 9092, "brokers": None, "topic": "eda"},
"Host and port must be set",
),
# Neither host nor brokers set
(
{"host": None, "port": None, "brokers": None, "topic": "eda"},
"host and port or brokers must be set",
),
# Both host and brokers set
(
{
"host": "localhost",
"port": 9092,
"brokers": ["localhost:9092"],
"topic": "eda",
},
"Only one of host and brokers parameter must be set",
),
],
)
def test_host_port_brokers_combinations(
myqueue: MockQueue, args: dict[str, Any], error_msg: str
) -> None:
with pytest.raises(ValueError, match=re.escape(error_msg)):
asyncio.run(kafka_main(myqueue, args))