diff --git a/Makefile b/Makefile index 1e5ffc5f..a074a3ef 100644 --- a/Makefile +++ b/Makefile @@ -112,7 +112,7 @@ docs: ready-docs lint-docs: ready-docs if [ -f .venv/bin/activate ]; then . .venv/bin/activate; fi; $(MAKE) -C docs clean - if [ -f .venv/bin/activate ]; then . .venv/bin/activate; fi; ! (make -C docs html 2>&1 | grep -v 'nonlocal image URI found\|included in any toctree' | grep WARNING) + if [ -f .venv/bin/activate ]; then . .venv/bin/activate; fi; ! (make -C docs html 2>&1 | grep -v 'more than one target found\|nonlocal image URI found\|included in any toctree' | grep WARNING) _open-docs: open docs/_build/html/index.html || xdg-open docs/_build/html/index.html diff --git a/docs/configure.rst b/docs/configure.rst index fb1dc748..670080fe 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -113,7 +113,7 @@ You can consult the `Kombu documentation even more information. User Authentication/Authorization -````````````` +````````````````````````````````` You can configure Pulsar to authenticate user during request processing and check if this user is allowed to run a job. @@ -222,6 +222,190 @@ In the event that the connection to the AMQP server is lost during message publish, the Pulsar server can retry the connection, governed by the ``amqp_publish*`` options documented in `app.yml.sample`_. +Message Queue (pulsar-relay) +----------------------------- + +Pulsar can also communicate with Galaxy via an experimental **pulsar-relay** server, +an HTTP-based message proxy. This mode is similar to the AMQP message queue mode but uses +HTTP long-polling instead of a message broker like RabbitMQ. This can help when: + +* Galaxy cannot directly reach Pulsar (e.g., due to firewall restrictions) +* You want to avoid deploying and managing a RabbitMQ server +* You prefer HTTP-based communication for simplicity and observability + +Architecture +```````````` + +In this mode: + +1. **Galaxy → Pulsar**: Galaxy posts control messages (job setup, status requests, + kill commands) to the proxy via HTTP POST +2. **Pulsar → Galaxy**: Pulsar polls the proxy via HTTP long-polling to receive + these messages +3. **Pulsar → Galaxy**: Pulsar posts status updates to the proxy +4. **Galaxy → Pulsar**: Galaxy polls the proxy to receive status updates +5. **File Transfers**: Pulsar transfers files directly to/from Galaxy via HTTP + (not through the proxy) + +:: + + Galaxy ──POST messages──> pulsar-relay ──poll──> Pulsar Server + │ + │ + Galaxy <────────direct HTTP for file transfers─────────┘ + +Pulsar Configuration +```````````````````` + +To configure Pulsar to use pulsar-relay, set the ``message_queue_url`` in +``app.yml`` with a ``http://`` or ``https://`` prefix:: + + message_queue_url: http://proxy-server.example.org:9000 + message_queue_username: admin + message_queue_password: your_secure_password + +The ``http://`` / ``https://`` prefix tells Pulsar to use the proxy communication mode instead +of AMQP. + +.. note:: + + Unlike AMQP mode, the pulsar-relay mode does **not** require the ``kombu`` + Python dependency. It only requires the ``requests`` library, which is a + standard dependency of Pulsar. + +Galaxy Configuration +```````````````````` + +In Galaxy's job configuration (``job_conf.yml``), configure a Pulsar destination +with proxy parameters:: + + runners: + pulsar: + load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner + # Proxy connection + proxy_url: http://proxy-server.example.org:9000 + proxy_username: your_username + proxy_password: your_secure_password + + + execution: + default: pulsar_relay + environments: + pulsar_relay: + runner: pulsar + # Galaxy's URL (for Pulsar to reach back for file transfers) + url: http://galaxy-server.example.org:8080 + # Remote job staging directory + jobs_directory: /data/pulsar/staging + + +Authentication +`````````````` + +The pulsar-relay uses JWT (JSON Web Token) authentication. Galaxy and Pulsar +authenticate with the proxy using the username and password provided in the +configuration. Tokens are automatically managed and refreshed as needed. + +.. tip:: + + In production, always use HTTPS for the proxy URL to encrypt credentials + and message content during transit:: + + message_queue_url: https://proxy-server.example.org:443 + +Security Considerations +``````````````````````` + +* **Use HTTPS**: Always use HTTPS for the proxy URL in production +* **Strong Passwords**: Use strong, unique passwords for proxy authentication +* **Network Isolation**: Deploy the proxy in a DMZ accessible to both Galaxy + and Pulsar +* **Firewall Rules**: + * Galaxy → Proxy: Allow outbound HTTPS + * Pulsar → Proxy: Allow outbound HTTPS + * Pulsar → Galaxy: Allow outbound HTTP/HTTPS for file transfers + +Multiple Pulsar Instances +`````````````````````````` + +You can deploy multiple Pulsar instances with different managers, all using the +same proxy. Messages are routed by topic names that include the manager name. + +For example, configure two Pulsar servers: + +**Pulsar Server 1** (``app.yml``):: + + message_queue_url: http://proxy-server:9000 + message_queue_username: admin + message_queue_password: password + managers: + cluster_a: + type: queued_slurm + +**Pulsar Server 2** (``app.yml``):: + + message_queue_url: http://proxy-server:9000 + message_queue_username: admin + message_queue_password: password + managers: + cluster_b: + type: queued_condor + +In Galaxy's job configuration, route jobs to specific clusters using the +``manager`` parameter:: + + execution: + environments: + cluster_a_jobs: + runner: pulsar + proxy_url: http://proxy-server:9000 + manager: cluster_a + # ... other settings + + cluster_b_jobs: + runner: pulsar + proxy_url: http://proxy-server:9000 + manager: cluster_b + # ... other settings + +Topic Naming +```````````` + +Messages are organized by topic with automatic naming based on the manager name: + +* Job setup: ``job_setup_{manager_name}`` or ``job_setup`` (for default manager) +* Status requests: ``job_status_request_{manager_name}`` +* Kill commands: ``job_kill_{manager_name}`` +* Status updates: ``job_status_update_{manager_name}`` + +This allows multiple Pulsar instances to share the same proxy without message +conflicts. + +Comparison with AMQP Mode +`````````````````````````` + ++------------------------+---------------------------+-------------------------+ +| Feature | AMQP (RabbitMQ) | pulsar-relay | ++========================+===========================+=========================+ +| Protocol | AMQP over TCP | HTTP/HTTPS | ++------------------------+---------------------------+-------------------------+ +| Dependencies | kombu, RabbitMQ server | requests (built-in) | ++------------------------+---------------------------+-------------------------+ +| Deployment Complexity | Moderate (broker setup) | Simple (HTTP service) | ++------------------------+---------------------------+-------------------------+ +| Message Delivery | Push-based | Long-polling | ++------------------------+---------------------------+-------------------------+ +| Observability | Queue monitoring tools | HTTP access logs | ++------------------------+---------------------------+-------------------------+ +| SSL/TLS | Via AMQPS | Via HTTPS | ++------------------------+---------------------------+-------------------------+ +| Firewall Friendly | Moderate | High (standard HTTP) | ++------------------------+---------------------------+-------------------------+ + +For more information on deploying pulsar-relay, see the `pulsar-relay documentation`_. + +.. _pulsar-relay documentation: https://github.com/galaxyproject/pulsar-relay + Caching (Experimental) ---------------------- diff --git a/docs/pulsar.client.rst b/docs/pulsar.client.rst index 5d534569..c92b6881 100644 --- a/docs/pulsar.client.rst +++ b/docs/pulsar.client.rst @@ -54,6 +54,14 @@ pulsar.client.config\_util module :undoc-members: :show-inheritance: +pulsar.client.container\_job\_config module +------------------------------------------- + +.. automodule:: pulsar.client.container_job_config + :members: + :undoc-members: + :show-inheritance: + pulsar.client.decorators module ------------------------------- @@ -110,6 +118,14 @@ pulsar.client.path\_mapper module :undoc-members: :show-inheritance: +pulsar.client.relay\_auth module +-------------------------------- + +.. automodule:: pulsar.client.relay_auth + :members: + :undoc-members: + :show-inheritance: + pulsar.client.server\_interface module -------------------------------------- diff --git a/docs/pulsar.client.transport.rst b/docs/pulsar.client.transport.rst index f88852e7..1632a7f6 100644 --- a/docs/pulsar.client.transport.rst +++ b/docs/pulsar.client.transport.rst @@ -20,6 +20,14 @@ pulsar.client.transport.poster module :undoc-members: :show-inheritance: +pulsar.client.transport.relay module +------------------------------------ + +.. automodule:: pulsar.client.transport.relay + :members: + :undoc-members: + :show-inheritance: + pulsar.client.transport.requests module --------------------------------------- diff --git a/docs/pulsar.managers.util.rst b/docs/pulsar.managers.util.rst index 6a4711b9..7d138a82 100644 --- a/docs/pulsar.managers.util.rst +++ b/docs/pulsar.managers.util.rst @@ -39,6 +39,14 @@ pulsar.managers.util.external module :undoc-members: :show-inheritance: +pulsar.managers.util.gcp\_util module +------------------------------------- + +.. automodule:: pulsar.managers.util.gcp_util + :members: + :undoc-members: + :show-inheritance: + pulsar.managers.util.kill module -------------------------------- diff --git a/docs/pulsar.messaging.rst b/docs/pulsar.messaging.rst index ea7d1476..3e8c548f 100644 --- a/docs/pulsar.messaging.rst +++ b/docs/pulsar.messaging.rst @@ -12,6 +12,22 @@ pulsar.messaging.bind\_amqp module :undoc-members: :show-inheritance: +pulsar.messaging.bind\_relay module +----------------------------------- + +.. automodule:: pulsar.messaging.bind_relay + :members: + :undoc-members: + :show-inheritance: + +pulsar.messaging.relay\_state module +------------------------------------ + +.. automodule:: pulsar.messaging.relay_state + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 0dcc785b..94fb0f28 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -568,6 +568,78 @@ def kill(self): pass +class RelayJobClient(BaseMessageJobClient): + """Client that communicates with Pulsar via pulsar-relay. + + This client posts control messages (setup, status, kill) to the relay, + which are then consumed by the Pulsar server. File transfers happen + directly between Pulsar and Galaxy via HTTP. + """ + + def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, + dynamic_file_sources=None, token_endpoint=None): + """Submit a job by posting a setup message to the relay. + + Args: + command_line: Command to execute on Pulsar + dependencies_description: Tool dependencies + env: Environment variables + remote_staging: Remote staging configuration + job_config: Job configuration + dynamic_file_sources: Dynamic file sources + token_endpoint: Token endpoint for file access + + Returns: + None (async operation) + """ + launch_params = self._build_setup_message( + command_line, + dependencies_description=dependencies_description, + env=env, + remote_staging=remote_staging, + job_config=job_config, + dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, + ) + + # Determine topic name based on manager + manager_name = self.client_manager.manager_name + topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup" + + # Post message to relay + self.client_manager.relay_transport.post_message(topic, launch_params) + log.info("Job %s published to relay topic '%s'", self.job_id, topic) + return None + + def get_status(self): + """Request job status by posting a status request message to the relay. + + Returns: + Cached status if available, None otherwise + """ + manager_name = self.client_manager.manager_name + topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request" + + status_params = { + 'job_id': self.job_id, + } + + self.client_manager.relay_transport.post_message(topic, status_params) + log.debug("Job status request for %s published to relay topic '%s'", self.job_id, topic) + + # Return cached status if available + return self.client_manager.status_cache.get(self.job_id, {}).get('status', None) + + def kill(self): + """Kill a job by posting a kill message to the relay.""" + manager_name = self.client_manager.manager_name + topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill" + + kill_params = {'job_id': self.job_id} + self.client_manager.relay_transport.post_message(topic, kill_params) + log.info("Job kill request for %s published to relay topic '%s'", self.job_id, topic) + + class ExecutionType(str, Enum): # containers run one after each other with similar configuration # like in TES or AWS Batch diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 2b239212..bb999adf 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -7,13 +7,16 @@ import functools import threading +import time from logging import getLogger from os import getenv from queue import Queue from typing import ( Any, Dict, + Optional, Type, + TYPE_CHECKING, ) from typing_extensions import Protocol @@ -29,6 +32,7 @@ K8sPollingCoexecutionJobClient, MessageCLIJobClient, MessageJobClient, + RelayJobClient, TesMessageCoexecutionJobClient, TesPollingCoexecutionJobClient, ) @@ -40,8 +44,12 @@ PulsarInterface, ) from .transport import get_transport +from .transport.relay import RelayTransport from .util import TransferEventManager +if TYPE_CHECKING: + from pulsar.managers import ManagerInterface + log = getLogger(__name__) DEFAULT_TRANSFER_THREADS = 2 @@ -66,12 +74,11 @@ class ClientManager(ClientManagerInterface): job_manager_interface_class: Type[PulsarInterface] client_class: Type[BaseJobClient] - def __init__(self, **kwds: Dict[str, Any]): + def __init__(self, job_manager: Optional["ManagerInterface"] = None, **kwds: Dict[str, Any]): """Build a HTTP client or a local client that talks directly to a job manger.""" - if 'pulsar_app' in kwds or 'job_manager' in kwds: + if 'pulsar_app' in kwds or job_manager: self.job_manager_interface_class = LocalPulsarInterface pulsar_app = kwds.get('pulsar_app', None) - job_manager = kwds.get('job_manager', None) file_cache = kwds.get('file_cache', None) self.job_manager_interface_args = dict( job_manager=job_manager, @@ -124,9 +131,9 @@ class MessageQueueClientManager(BaseRemoteConfiguredJobClientManager): status_cache: Dict[str, Any] ack_consumer_threads: Dict[str, threading.Thread] - def __init__(self, **kwds: Dict[str, Any]): + def __init__(self, amqp_url: str, **kwds: Dict[str, Any]): super().__init__(**kwds) - self.url = kwds.get('amqp_url') + self.url = amqp_url self.amqp_key_prefix = kwds.get("amqp_key_prefix", None) self.exchange = get_exchange(self.url, self.manager_name, kwds) self.status_cache = {} @@ -248,6 +255,116 @@ def get_client(self, destination_params, job_id, **kwargs): return MessageJobClient(destination_params, job_id, self) +class RelayClientManager(BaseRemoteConfiguredJobClientManager): + """Client manager that communicates with Pulsar via pulsar-relay. + + This manager uses HTTP-based long-polling to receive status updates from + Pulsar through the relay, while posting control messages (setup, status + requests, kill) to the relay for Pulsar to consume. + """ + status_cache: Dict[str, Any] + + def __init__(self, relay_url: str, relay_username: str, relay_password: str, **kwds: Dict[str, Any]): + super().__init__(**kwds) + + if not relay_url: + raise Exception("relay_url is required for RelayClientManager") + + # Initialize relay transport + self.relay_transport = RelayTransport(relay_url, relay_username, relay_password) + self.status_cache = {} + self.callback_lock = threading.Lock() + self.callback_thread = None + self.active = True + + def callback_wrapper(self, callback, message_data): + """Process status update messages from the relay.""" + if not self.active: + log.debug("Obtained update message for inactive client manager, ignoring.") + return + + try: + payload = message_data.get('payload', {}) + if "job_id" in payload: + job_id = payload["job_id"] + self.status_cache[job_id] = payload + log.debug("Handling asynchronous status update from Pulsar via relay.") + callback(payload) + except Exception: + log.exception("Failure processing job status update message.") + except BaseException as e: + log.exception("Failure processing job status update message - BaseException type %s" % type(e)) + + def status_consumer(self, callback_wrapper): + """Long-poll the relay for status update messages.""" + manager_name = self.manager_name + topic = f"job_status_update_{manager_name}" if manager_name != "_default_" else "job_status_update" + + log.info("Starting relay status consumer for topic '%s'", topic) + + while self.active: + try: + # Long poll for status updates (30 second timeout) + messages = self.relay_transport.long_poll([topic], timeout=30) + + for message in messages: + callback_wrapper(message) + + except Exception: + if self.active: + log.exception("Exception while polling for status updates from relay, will retry.") + # Brief sleep before retrying to avoid tight loop on persistent errors + time.sleep(5) + else: + log.debug("Exception during shutdown, ignoring.") + break + + log.debug("Leaving Pulsar client relay status consumer, no additional updates will be processed.") + + def ensure_has_status_update_callback(self, callback): + """Start a thread to poll for status updates if not already running.""" + with self.callback_lock: + if self.callback_thread is not None: + return + + callback_wrapper = functools.partial(self.callback_wrapper, callback) + run = functools.partial(self.status_consumer, callback_wrapper) + thread = threading.Thread( + name="pulsar_client_%s_relay_status_consumer" % self.manager_name, + target=run + ) + thread.daemon = False # Don't interrupt processing + thread.start() + self.callback_thread = thread + + def ensure_has_ack_consumers(self): + """No-op for relay client manager, as acknowledgements are handled via HTTP.""" + pass + + def shutdown(self, ensure_cleanup: bool = False): + """Shutdown the client manager and cleanup resources.""" + self.active = False + if ensure_cleanup: + if self.callback_thread is not None: + self.callback_thread.join() + # Close relay transport + if hasattr(self, 'relay_transport'): + self.relay_transport.close() + + def __nonzero__(self): + return self.active + + __bool__ = __nonzero__ # Both needed Py2 v 3 + + def get_client(self, destination_params, job_id, **kwargs): + """Create a RelayJobClient for the given job.""" + if job_id is None: + raise Exception("Cannot generate Pulsar client for empty job_id.") + destination_params = _parse_destination_params(destination_params) + destination_params.update(**kwargs) + return RelayJobClient(destination_params, job_id, self) + + class PollingJobClientManager(BaseRemoteConfiguredJobClientManager): def get_client(self, destination_params, job_id, **kwargs): @@ -269,12 +386,25 @@ def shutdown(self, ensure_cleanup=False): pass -def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface: - if 'job_manager' in kwargs: - return ClientManager(**kwargs) # TODO: Consider more separation here. - elif kwargs.get('amqp_url', None): - return MessageQueueClientManager(**kwargs) - elif kwargs.get("k8s_enabled") or kwargs.get("tes_enabled") or kwargs.get("gcp_batch_enabled"): +def build_client_manager( + job_manager: Optional["ManagerInterface"] = None, + relay_url: Optional[str] = None, + relay_username: Optional[str] = None, + relay_password: Optional[str] = None, + amqp_url: Optional[str] = None, + k8s_enabled: Optional[bool] = None, + tes_enabled: Optional[bool] = None, + gcp_batch_enabled: Optional[bool] = None, + **kwargs +) -> ClientManagerInterface: + if job_manager: + return ClientManager(job_manager=job_manager, **kwargs) # TODO: Consider more separation here. + elif relay_url: + assert relay_password and relay_username, "relay_url set, but relay_username and relay_password must also be set" + return RelayClientManager(relay_url=relay_url, relay_username=relay_username, relay_password=relay_password, **kwargs) + elif amqp_url: + return MessageQueueClientManager(amqp_url=amqp_url, **kwargs) + elif k8s_enabled or tes_enabled or gcp_batch_enabled: return PollingJobClientManager(**kwargs) else: return ClientManager(**kwargs) diff --git a/pulsar/client/relay_auth.py b/pulsar/client/relay_auth.py new file mode 100644 index 00000000..e3421c43 --- /dev/null +++ b/pulsar/client/relay_auth.py @@ -0,0 +1,115 @@ +""" +JWT authentication manager for pulsar-relay. + +Handles token acquisition, caching, and automatic refresh. +""" +import logging +import threading +from typing import cast, Optional +from datetime import datetime, timedelta + +import requests + +log = logging.getLogger(__name__) + + +class RelayAuthManager: + """Manages JWT authentication tokens for pulsar-relay communication. + + Features: + - Thread-safe token caching + - Automatic token refresh before expiry + - Lazy authentication (only authenticates when needed) + """ + + def __init__(self, relay_url: str, username: str, password: str): + """Initialize the authentication manager. + + Args: + relay_url: Base URL of the pulsar-relay server + username: Username for authentication + password: Password for authentication + """ + self.relay_url = relay_url.rstrip('/') + self.username = username + self.password = password + + self._token: Optional[str] = None + self._token_expiry: Optional[datetime] = None + self._lock = threading.Lock() + + # Refresh token 5 minutes before expiry + self._refresh_buffer_seconds = 300 + + def get_token(self) -> str: + """Get a valid JWT token, refreshing if necessary. + + Returns: + Valid JWT access token + + Raises: + Exception: If authentication fails + """ + with self._lock: + if self._is_token_valid(): + return cast(str, self._token) + + # Need to authenticate or refresh + log.debug("Authenticating with pulsar-relay at %s", self.relay_url) + self._authenticate() + return cast(str, self._token) + + def _is_token_valid(self) -> bool: + """Check if current token is valid and not expiring soon. + + Returns: + True if token exists and won't expire soon, False otherwise + """ + if self._token is None or self._token_expiry is None: + return False + + # Check if token will expire within refresh buffer + time_until_expiry = (self._token_expiry - datetime.now()).total_seconds() + return time_until_expiry > self._refresh_buffer_seconds + + def _authenticate(self) -> None: + """Perform authentication and cache the token. + + Raises: + Exception: If authentication fails + """ + + auth_url = f"{self.relay_url}/auth/login" + + try: + response = requests.post( + auth_url, + data={ + 'username': self.username, + 'password': self.password, + 'grant_type': 'password' + }, + headers={'Content-Type': 'application/x-www-form-urlencoded'}, + timeout=10 + ) + response.raise_for_status() + + data = response.json() + self._token = data['access_token'] + expires_in = data['expires_in'] + + # Calculate expiry time + self._token_expiry = datetime.now() + timedelta(seconds=expires_in) + + log.info("Successfully authenticated with pulsar-relay, token expires in %d seconds", expires_in) + + except requests.RequestException as e: + log.error("Failed to authenticate with pulsar-relay: %s", e) + raise Exception(f"pulsar-relay authentication failed: {e}") + + def invalidate(self) -> None: + """Invalidate the current token, forcing re-authentication on next request.""" + with self._lock: + self._token = None + self._token_expiry = None + log.debug("Invalidated pulsar-relay authentication token") diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 6c3f20bd..eade5cfa 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -36,14 +36,12 @@ ) if TYPE_CHECKING: - from ..staging import ClientJobDescription + from pulsar.client.staging import ClientJobDescription log = getLogger(__name__) def submit_job(client, client_job_description: "ClientJobDescription", job_config=None): - """ - """ file_stager = FileStager(client, client_job_description, job_config) rebuilt_command_line = file_stager.get_command_line() job_id = file_stager.job_id diff --git a/pulsar/client/transport/relay.py b/pulsar/client/transport/relay.py new file mode 100644 index 00000000..9f13c9f5 --- /dev/null +++ b/pulsar/client/transport/relay.py @@ -0,0 +1,231 @@ +""" +HTTP transport for communicating with pulsar-relay. + +Provides methods for posting messages, long-polling, and managing +authentication with the relay server. +""" +import logging +from typing import Any, Dict, List, Optional + +import requests + +from ..relay_auth import RelayAuthManager + +log = logging.getLogger(__name__) + + +class RelayTransportError(Exception): + """Raised when communication with pulsar-relay fails.""" + pass + + +class RelayTransport: + """HTTP transport for pulsar-relay communication. + + Handles: + - Message publishing (single and bulk) + - Long-polling for message consumption + - Automatic authentication and retry + """ + + def __init__(self, relay_url: str, username: str, password: str, timeout: int = 30): + """Initialize the relay transport. + + Args: + relay_url: Base URL of the pulsar-relay server + username: Username for authentication + password: Password for authentication + timeout: Default request timeout in seconds + """ + self.relay_url = relay_url.rstrip('/') + self.auth_manager = RelayAuthManager(relay_url, username, password) + self.timeout = timeout + self.session = requests.Session() + + def _get_headers(self) -> Dict[str, str]: + """Get HTTP headers including authentication token. + + Returns: + Dictionary of HTTP headers + """ + token = self.auth_manager.get_token() + return { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + + def post_message( + self, + topic: str, + payload: Dict[str, Any], + ttl: Optional[int] = None, + metadata: Optional[Dict[str, str]] = None + ) -> Dict[str, Any]: + """Post a single message to the relay. + + Args: + topic: Topic name to publish to + payload: Message payload (must be JSON-serializable) + ttl: Time-to-live in seconds (optional) + metadata: Optional metadata dictionary + + Returns: + Response dictionary with message_id, topic, and timestamp + + Raises: + RelayTransportError: If the request fails + """ + url = f"{self.relay_url}/api/v1/messages" + + message_data: Dict[str, Any] = { + 'topic': topic, + 'payload': payload + } + + if ttl is not None: + message_data['ttl'] = ttl + + if metadata is not None: + message_data['metadata'] = metadata + + try: + response = self.session.post( + url, + json=message_data, + headers=self._get_headers(), + timeout=self.timeout + ) + + if response.status_code == 401: + # Token might have expired, invalidate and retry once + log.debug("Received 401, invalidating token and retrying") + self.auth_manager.invalidate() + response = self.session.post( + url, + json=message_data, + headers=self._get_headers(), + timeout=self.timeout + ) + + response.raise_for_status() + result = response.json() + + log.debug("Posted message to topic '%s': message_id=%s", topic, result.get('message_id')) + return result + + except requests.RequestException as e: + log.error("Failed to post message to topic '%s': %s", topic, e) + raise RelayTransportError(f"Failed to post message: {e}") + + def post_bulk_messages(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: + """Post multiple messages in a single request. + + Args: + messages: List of message dictionaries, each containing 'topic' and 'payload' + + Returns: + Response dictionary with results and summary + + Raises: + RelayTransportError: If the request fails + """ + url = f"{self.relay_url}/api/v1/messages/bulk" + + request_data = {'messages': messages} + + try: + response = self.session.post( + url, + json=request_data, + headers=self._get_headers(), + timeout=self.timeout + ) + + if response.status_code == 401: + self.auth_manager.invalidate() + response = self.session.post( + url, + json=request_data, + headers=self._get_headers(), + timeout=self.timeout + ) + + response.raise_for_status() + result = response.json() + + log.debug("Posted %d messages in bulk", len(messages)) + return result + + except requests.RequestException as e: + log.error("Failed to post bulk messages: %s", e) + raise RelayTransportError(f"Failed to post bulk messages: {e}") + + def long_poll( + self, + topics: List[str], + since: Optional[Dict[str, str]] = None, + timeout: int = 30 + ) -> List[Dict[str, Any]]: + """Poll for messages from specified topics. + + This is a blocking call that waits up to 'timeout' seconds for new messages. + + Args: + topics: List of topic names to subscribe to + since: Optional dict mapping topic names to last seen message IDs + timeout: Maximum seconds to wait for messages (1-60) + + Returns: + List of message dictionaries + + Raises: + RelayTransportError: If the request fails + """ + url = f"{self.relay_url}/messages/poll" + + poll_data = { + 'topics': topics, + 'timeout': min(max(timeout, 1), 60) # Clamp to 1-60 range + } + + if since is not None: + poll_data['since'] = since + + try: + response = self.session.post( + url, + json=poll_data, + headers=self._get_headers(), + timeout=timeout + 5 # Add buffer to request timeout + ) + + if response.status_code == 401: + self.auth_manager.invalidate() + response = self.session.post( + url, + json=poll_data, + headers=self._get_headers(), + timeout=timeout + 5 + ) + + response.raise_for_status() + result = response.json() + + messages = result.get('messages', []) + if messages: + log.debug("Received %d messages from long poll", len(messages)) + + return messages + + except requests.Timeout: + # Timeout is expected in long polling when no messages arrive + log.debug("Long poll timeout (no messages)") + return [] + + except requests.RequestException as e: + log.error("Failed to long poll: %s", e) + raise RelayTransportError(f"Failed to long poll: {e}") + + def close(self): + """Close the transport and cleanup resources.""" + self.session.close() diff --git a/pulsar/messaging/__init__.py b/pulsar/messaging/__init__.py index 6c88cbb1..081f5a90 100644 --- a/pulsar/messaging/__init__.py +++ b/pulsar/messaging/__init__.py @@ -5,17 +5,33 @@ import logging -from ..messaging import bind_amqp +from ..messaging import ( + bind_amqp, + bind_relay, +) +from .relay_state import RelayState log = logging.getLogger(__name__) def bind_app(app, queue_id, conf=None): connection_string = __id_to_connection_string(app, queue_id) - queue_state = QueueState() - for manager in app.managers.values(): - bind_amqp.bind_manager_to_queue(manager, queue_state, connection_string, conf) - return queue_state + + # Check if this is a relay connection + if connection_string and connection_string.startswith('http://') or connection_string.startswith('https://'): + relay_url = connection_string + log.info("Detected relay connection string, binding to pulsar-relay at %s", relay_url) + + relay_state = RelayState() + for manager in app.managers.values(): + bind_relay.bind_manager_to_relay(manager, relay_state, relay_url, conf or {}) + return relay_state + else: + # Use AMQP binding + queue_state = QueueState() + for manager in app.managers.values(): + bind_amqp.bind_manager_to_queue(manager, queue_state, connection_string, conf) + return queue_state class QueueState: @@ -38,7 +54,7 @@ def join(self, timeout=None): for t in self.threads: t.join(timeout) if t.is_alive(): - log.warn("Failed to join thread [%s]." % t) + log.warning("Failed to join thread [%s]." % t) def __id_to_connection_string(app, queue_id): diff --git a/pulsar/messaging/bind_relay.py b/pulsar/messaging/bind_relay.py new file mode 100644 index 00000000..2d90fdcf --- /dev/null +++ b/pulsar/messaging/bind_relay.py @@ -0,0 +1,208 @@ +"""Pulsar server-side integration with pulsar-relay. + +This module provides functionality to bind Pulsar job managers to the +pulsar-relay, allowing them to receive control messages (setup, status +requests, kill) and publish status updates. +""" +import functools +import logging +import threading +import time + +from pulsar import manager_endpoint_util +from pulsar.client.transport.relay import RelayTransport +from .relay_state import RelayState + +log = logging.getLogger(__name__) + + +def bind_manager_to_relay(manager, relay_state: RelayState, relay_url, conf): + """Bind a specific manager to the relay. + + Args: + manager: Pulsar job manager instance + relay_state: RelayState for managing consumer threads + relay_url: URL of the pulsar-relay server + conf: Configuration dictionary with relay credentials + """ + manager_name = manager.name + log.info("bind_manager_to_relay called for relay [%s] and manager [%s]", relay_url, manager_name) + + # Extract relay credentials + username = conf.get('message_queue_username', 'admin') + password = conf.get('message_queue_password') + if not password: + raise Exception("message_queue_password is required for relay communication") + + # Create relay transport + relay_transport = RelayTransport(relay_url, username, password) + + # Define message handlers + process_setup_messages = functools.partial(__process_setup_message, manager) + process_kill_messages = functools.partial(__process_kill_message, manager) + process_status_messages = functools.partial(__process_status_message, manager) + + # Determine topics based on manager name + setup_topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup" + status_request_topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request" + kill_topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill" + status_update_topic = f"job_status_update_{manager_name}" if manager_name != "_default_" else "job_status_update" + + # Start consumer threads if message_queue_consume is enabled + if conf.get("message_queue_consume", True): + log.info("Starting relay consumer threads for manager '%s'", manager_name) + + # Single consumer thread for all control messages + consumer_thread = start_consumer( + relay_transport, + relay_state, + [setup_topic, status_request_topic, kill_topic], + { + setup_topic: process_setup_messages, + status_request_topic: process_status_messages, + kill_topic: process_kill_messages, + } + ) + + relay_state.threads.append(consumer_thread) + + # Bind status change callback to publish status updates to relay + if conf.get("message_queue_publish", True): + log.info("Binding status change callback for manager '%s'", manager_name) + + def bind_on_status_change(new_status, job_id): + job_id = job_id or 'unknown' + try: + message = "Publishing Pulsar state change with status %s for job_id %s via relay" + log.debug(message, new_status, job_id) + payload = manager_endpoint_util.full_status(manager, new_status, job_id) + relay_transport.post_message(status_update_topic, payload) + except Exception: + log.exception("Failure to publish Pulsar state change for job_id %s via relay." % job_id) + raise + + manager.set_state_change_callback(bind_on_status_change) + + +def start_consumer(relay_transport, relay_state: RelayState, topics, handlers): + """Start a consumer thread that polls for messages. + + Args: + relay_transport: RelayTransport instance + relay_state: RelayState for checking if consumer should continue + topics: List of topics to subscribe to + handlers: Dict mapping topics to handler functions + + Returns: + Thread object + """ + def consume(): + log.info("Starting relay consumer for topics: %s", topics) + + while relay_state.active: + try: + # Long poll for messages (30 second timeout) + messages = relay_transport.long_poll(topics, timeout=30) + + for message in messages: + topic = message.get('topic') + payload = message.get('payload', {}) + + handler = handlers.get(topic) + if handler: + try: + handler(payload) + except Exception: + job_id = payload.get('job_id', 'unknown') + log.exception("Failed to process message for job_id %s from topic %s", job_id, topic) + else: + log.warning("No handler found for topic '%s'", topic) + + except Exception: + if relay_state.active: + log.exception("Exception while polling relay, will retry after delay.") + # Brief sleep before retrying + time.sleep(5) + else: + log.debug("Exception during shutdown, stopping consumer.") + break + + log.info("Finished consuming relay messages - no more messages will be processed.") + + thread = threading.Thread( + name="relay-consumer-%s" % "-".join(topics), + target=consume + ) + thread.daemon = True + thread.start() + return thread + + +def __process_setup_message(manager, body): + """Process a job setup message. + + Args: + manager: Job manager instance + body: Message payload containing job setup parameters + """ + job_id = __client_job_id_from_body(body) + if not job_id: + log.error('Could not parse job id from body: %s', body) + return + + try: + log.info("Processing setup message for job_id %s", job_id) + manager_endpoint_util.submit_job(manager, body) + except Exception: + log.exception("Failed to process setup message for job_id %s", job_id) + + +def __process_status_message(manager, body): + """Process a status request message. + + Args: + manager: Job manager instance + body: Message payload containing job_id + """ + job_id = __client_job_id_from_body(body) + if not job_id: + log.error('Could not parse job id from body: %s', body) + return + + try: + log.debug("Processing status request for job_id %s", job_id) + manager.trigger_state_change_callback(job_id) + except Exception: + log.exception("Failed to process status message for job_id %s", job_id) + + +def __process_kill_message(manager, body): + """Process a job kill message. + + Args: + manager: Job manager instance + body: Message payload containing job_id + """ + job_id = __client_job_id_from_body(body) + if not job_id: + log.error('Could not parse job id from body: %s', body) + return + + try: + log.info("Processing kill request for job_id %s", job_id) + manager.kill(job_id) + except Exception: + log.exception("Failed to process kill message for job_id %s", job_id) + + +def __client_job_id_from_body(body): + """Extract job_id from message body. + + Args: + body: Message payload dictionary + + Returns: + job_id string or None if not found + """ + job_id = body.get("job_id", None) + return job_id diff --git a/pulsar/messaging/relay_state.py b/pulsar/messaging/relay_state.py new file mode 100644 index 00000000..81b441ef --- /dev/null +++ b/pulsar/messaging/relay_state.py @@ -0,0 +1,36 @@ +"""State management for pulsar-relay message consumers. + +Similar to QueueState for AMQP, this manages the lifecycle of relay +consumer threads on the Pulsar server side. +""" + + +class RelayState: + """Manages state for pulsar-relay message consumers. + + This object is passed to consumer loops and used to signal when + they should stop processing messages. + """ + + def __init__(self): + """Initialize relay state.""" + self.active = True + self.threads = [] + + def deactivate(self): + """Mark the relay state as inactive, signaling consumers to stop.""" + self.active = False + + def join(self, timeout=None): + """Join all consumer threads. + + Args: + timeout: Optional timeout in seconds for joining threads + """ + import logging + log = logging.getLogger(__name__) + + for t in self.threads: + t.join(timeout) + if t.is_alive(): + log.warning("Failed to join relay consumer thread [%s].", t) diff --git a/requirements.txt b/requirements.txt index 5c135f1b..0e44942b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ paramiko typing-extensions pydantic-tes>=0.1.5 pyjwt +requests tuspy google-cloud-batch