Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions plugins/event_source/gcp_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
gcp_pubsub.py

An ansible-events event source module for receiving events from GCP Pub/Sub

Arguments:
project_id: The GCP project name
subscription_id: The name of the topic to pull messages from
max_messages: The number of messages to retreive
Default 3
retry_deadline: How long to keep retrying in seconds
Default 300

Example:

- ansible.eda.gpc_pubsub:
project_id: "{{ project_id }}"
subscription_id: "{{ subscription_id }}"
max_messages: "{{ max_messages }}"
retry_deadline: "{{ retry_deadline }}"

"""

import asyncio
from typing import Any, Dict

from google.api_core import retry
from google.cloud import pubsub_v1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pubsub_v1 support asyncio? Typically pull-and-sleep is not the best approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I see this plugin would block the python thread. It could implement futures + callbacks or, I think easier, use an asyncio library compatible



async def main(queue: asyncio.Queue, args: Dict[str, Any]):
subscriber = pubsub_v1.SubscriberClient()

subscription_path = subscriber.subscription_path(args.get("project_id"), args.get("subscription_id"))

with subscriber:
while True:
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": args.get("max_messages", 3)},
retry=retry.Retry(deadline=args.get("retry_deadline", 300)),
)

if len(response.received_messages) == 0:
continue

ack_ids = []
for received_message in response.received_messages:
data = {"message": received_message.message.data.decode(),
"attributes": dict(received_message.message.attributes)}

await queue.put(data)

ack_ids.append(received_message.ack_id)

subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)

await asyncio.sleep(1)

if __name__ == "__main__":
class MockQueue:
@staticmethod
async def put(event):
print(event)

asyncio.run(main(MockQueue(), {"project_id": "lab", "subscription_id": "eda",
"max_messages": 3, "retry_deadline": 300}))
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ aiohttp
aiokafka
watchdog
asyncio
dpath
google-cloud-pubsub
protobuf==3.20.*
dpath

20 changes: 20 additions & 0 deletions rulebooks/gcp-pubsub-test-rules.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
- name: Listen for events on a GPC Pub/Sub topic
hosts: all

## Define our source for events

sources:
- ansible.eda.gcp_pubsub:
project_id: "eda"
subscription_id: "test"

## Define the conditions we are looking for

rules:
- name: Message attribute action equals test
condition: event.attributes.action == "test"

## Define the action we should take should the condition be met
action:
debug: