Skip to content

Latest commit

 

History

History
200 lines (161 loc) · 7.54 KB

README.md

File metadata and controls

200 lines (161 loc) · 7.54 KB

AMQP Client Python

Client with high level of abstraction for manipulation of messages in the event bus RabbitMQ.


License Package version Supported Python versions Downloads Vulnerabilities Releases


Documentation: https://nutes-uepb.github.io/amqp-client-python

Source Code: https://github.com/nutes-uepb/amqp-client-python


Features:

  • Automatic creation and management of queues, exchanges and channels;
  • Connection persistence and auto reconnect;
  • Support for direct, topic and fanout exchanges;
  • Publish;
  • Subscribe;
  • Support for a Remote procedure call (RPC).

Table of Compatibility

version compatible with
0.2.0 0.2.0
0.1.14 ~0.1.12

Examples:

you can use sync , async eventbus and sync wrapper of async eventbus

async usage
# basic configuration
from amqp_client_python import (
    AsyncEventbusRabbitMQ,
    Config, Options
)
config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = AsyncEventbusRabbitMQ(config)
# publish

eventbus.publish("rpc_exchange", "routing.key", "message_content")
# subscribe
async def subscribe_handler(body) -> None:
    print(body, type(body), flush=True) # handle messages
await eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler)
# rpc_publish
response = await eventbus.rpc_client("rpc_exchange", "user.find", "message_content")
# provider
async def rpc_provider_handler(body) -> bytes:
    print(f"body: {body}")
    return b"content"
await eventbus.provide_resource("user.find", rpc_provider_handler)
sync usage(deprecated)
from amqp_client_python import (
    EventbusRabbitMQ,
    Config, Options
)
from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler
from examples.default import queue, rpc_queue, rpc_exchange, rpc_routing_key


class ExampleEvent(IntegrationEvent):
    EVENT_NAME: str = "ExampleEvent"
    ROUTING_KEY: str = rpc_routing_key

    def __init__(self, event_type: str, message = []) -> None:
        super().__init__(self.EVENT_NAME, event_type)
        self.message = message
        self.routing_key = self.ROUTING_KEY


class ExampleEventHandler(IntegrationEventHandler):
    def handle(self, body) -> None:
        print(body,"subscribe")


config = Config(Options(queue, rpc_queue, rpc_exchange))
eventbus = EventbusRabbitMQ(config=config)

class ExampleEvent(IntegrationEvent):
    EVENT_NAME: str = "ExampleEvent"
    def __init__(self, event_type: str, message = []) -> None:
        super().__init__(self.EVENT_NAME, event_type)
        self.message = message

from time import sleep
from random import randint
def handle(*body):
    print(body[0], "rpc_provider")
    return f"{body[0]}".encode("utf-8")

subscribe_event = ExampleEvent(rpc_exchange)
publish_event = ExampleEvent(rpc_exchange, ["message"])
subscribe_event_handle = ExampleEventHandler()
eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key)
eventbus.provide_resource(rpc_routing_key+"2", handle)
count = 0
running = True
from concurrent.futures import TimeoutError
while running:
    try:
        count += 1
        if str(count) != eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).decode("utf-8"):
            running = False
        #eventbus.publish(publish_event, rpc_routing_key, "message_content")
        #running = False
    except TimeoutError as err:
        print("timeout!!!: ", str(err))
    except KeyboardInterrupt:
        running=False
    except BaseException as err:
        print("Err:", err)
sync wrapper usage
from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options

config = Config(Options("queue", "rpc_queue", "rpc_exchange"))
eventbus = EventbusWrapperRabbitMQ(config=config)

async def subscribe_handler(body) -> None:
    print(f"{body}", type(body), flush=True)

async def rpc_provider_handler(body) -> bytes:
    print(f"handle - {body}", type(body), flush=True)
    return f"{body}".encode("utf-8")

# rpc_provider
eventbus.provide_resource("user.find", rpc_provider_handler).result()
# subscribe
eventbus.subscribe("rpc_exchange", "routing.key", subscribe_handler).result()
count = 0
running = True
while running:
    try:
        count += 1
        # rpc_client call
        eventbus.rpc_client("rpc_exchange", "user.find", count).result().decode("utf-8")
        # publish
        eventbus.publish("rpc_exchange", "routing.key", "message_content").result()
        #running = False
    except KeyboardInterrupt:
        running=False
    except BaseException as err:
        print("Err:", err)

Sponsors

The library is provided by NUTES-UEPB.

Know Limitations:

basic eventbus

When using EventbusRabbitMQ Should not use rpc call inside of rpc provider or subscribe handlers, it may block the ioloop
#/obs: fixed on other kinds of eventbus, will be removed on nexts releases