Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,190 @@ In the event that the connection to the AMQP server is lost during message
publish, the Pulsar server can retry the connection, governed by the
``amqp_publish*`` options documented in `app.yml.sample`_.

Message Queue (Pulsar-Proxy)
-----------------------------

Pulsar can also communicate with Galaxy via **pulsar-proxy**, an HTTP-based
message proxy. This mode is similar to the AMQP message queue mode but uses
HTTP long-polling instead of a message broker like RabbitMQ. This is ideal when:

* Galaxy cannot directly reach Pulsar (e.g., due to firewall restrictions)
* You want to avoid deploying and managing a RabbitMQ server
* You prefer HTTP-based communication for simplicity and observability

Architecture
````````````

In this mode:

1. **Galaxy → Pulsar**: Galaxy posts control messages (job setup, status requests,
kill commands) to the proxy via HTTP POST
2. **Pulsar → Galaxy**: Pulsar polls the proxy via HTTP long-polling to receive
these messages
3. **Pulsar → Galaxy**: Pulsar posts status updates to the proxy
4. **Galaxy → Pulsar**: Galaxy polls the proxy to receive status updates
Comment on lines +241 to +246
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be:

  1. Galaxy -> proxy
  2. proxy -> Pulsar
  3. Pulsar -> proxy
  4. Proxy -> Galaxy

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, and I thought I changed the language to relay 🤔

5. **File Transfers**: Pulsar transfers files directly to/from Galaxy via HTTP
(not through the proxy)

::

Galaxy ──POST messages──> pulsar-proxy ──poll──> Pulsar Server
Galaxy <────────direct HTTP for file transfers─────────┘

Pulsar Configuration
````````````````````

To configure Pulsar to use pulsar-proxy, set the ``message_queue_url`` in
``app.yml`` with a ``http://`` or ``https://`` prefix::

message_queue_url: http://proxy-server.example.org:9000
message_queue_username: admin
message_queue_password: your_secure_password

The ``http://`` / ``https://`` prefix tells Pulsar to use the proxy communication mode instead
of AMQP.

.. note::

Unlike AMQP mode, the pulsar-proxy mode does **not** require the ``kombu``
Python dependency. It only requires the ``requests`` library, which is a
standard dependency of Pulsar.

Galaxy Configuration
````````````````````

In Galaxy's job configuration (``job_conf.yml``), configure a Pulsar destination
with proxy parameters::

runners:
pulsar:
load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner
# Proxy connection
proxy_url: http://proxy-server.example.org:9000
proxy_username: admin
proxy_password: your_secure_password


execution:
default: pulsar_proxy
environments:
pulsar_proxy:
runner: pulsar
# Galaxy's URL (for Pulsar to reach back for file transfers)
url: http://galaxy-server.example.org:8080
# Remote job staging directory
jobs_directory: /data/pulsar/staging


Authentication
``````````````

The pulsar-proxy uses JWT (JSON Web Token) authentication. Galaxy and Pulsar
authenticate with the proxy using the username and password provided in the
configuration. Tokens are automatically managed and refreshed as needed.

.. tip::

In production, always use HTTPS for the proxy URL to encrypt credentials
and message content during transit::

message_queue_url: https://proxy-server.example.org:443

Security Considerations
```````````````````````

* **Use HTTPS**: Always use HTTPS for the proxy URL in production
* **Strong Passwords**: Use strong, unique passwords for proxy authentication
* **Network Isolation**: Deploy the proxy in a DMZ accessible to both Galaxy
and Pulsar
* **Firewall Rules**:
* Galaxy → Proxy: Allow outbound HTTPS
* Pulsar → Proxy: Allow outbound HTTPS
* Pulsar → Galaxy: Allow outbound HTTP/HTTPS for file transfers

Multiple Pulsar Instances
``````````````````````````

You can deploy multiple Pulsar instances with different managers, all using the
same proxy. Messages are routed by topic names that include the manager name.

For example, configure two Pulsar servers:

**Pulsar Server 1** (``app.yml``)::

message_queue_url: http://proxy-server:9000
message_queue_username: admin
message_queue_password: password
managers:
cluster_a:
type: queued_slurm

**Pulsar Server 2** (``app.yml``)::

message_queue_url: http://proxy-server:9000
message_queue_username: admin
message_queue_password: password
managers:
cluster_b:
type: queued_condor

In Galaxy's job configuration, route jobs to specific clusters using the
``manager`` parameter::

execution:
environments:
cluster_a_jobs:
runner: pulsar
proxy_url: http://proxy-server:9000
manager: cluster_a
# ... other settings

cluster_b_jobs:
runner: pulsar
proxy_url: http://proxy-server:9000
manager: cluster_b
# ... other settings

Topic Naming
````````````

Messages are organized by topic with automatic naming based on the manager name:

* Job setup: ``job_setup_{manager_name}`` or ``job_setup`` (for default manager)
* Status requests: ``job_status_request_{manager_name}``
* Kill commands: ``job_kill_{manager_name}``
* Status updates: ``job_status_update_{manager_name}``

This allows multiple Pulsar instances to share the same proxy without message
conflicts.

Comparison with AMQP Mode
``````````````````````````

+------------------------+---------------------------+-------------------------+
| Feature | AMQP (RabbitMQ) | Pulsar-Proxy |
+========================+===========================+=========================+
| Protocol | AMQP over TCP | HTTP/HTTPS |
+------------------------+---------------------------+-------------------------+
| Dependencies | kombu, RabbitMQ server | requests (built-in) |
+------------------------+---------------------------+-------------------------+
| Deployment Complexity | Moderate (broker setup) | Simple (HTTP service) |
+------------------------+---------------------------+-------------------------+
| Message Delivery | Push-based | Long-polling |
+------------------------+---------------------------+-------------------------+
| Observability | Queue monitoring tools | HTTP access logs |
+------------------------+---------------------------+-------------------------+
| SSL/TLS | Via AMQPS | Via HTTPS |
+------------------------+---------------------------+-------------------------+
| Firewall Friendly | Moderate | High (standard HTTP) |
+------------------------+---------------------------+-------------------------+

For more information on deploying pulsar-proxy, see the `pulsar-proxy documentation`_.

.. _pulsar-proxy documentation: https://github.com/galaxyproject/pulsar-proxy

Caching (Experimental)
----------------------

Expand Down
72 changes: 72 additions & 0 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,78 @@ def kill(self):
pass


class ProxyJobClient(BaseMessageJobClient):
"""Client that communicates with Pulsar via pulsar-proxy.

This client posts control messages (setup, status, kill) to the proxy,
which are then consumed by the Pulsar server. File transfers happen
directly between Pulsar and Galaxy via HTTP.
"""

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""Submit a job by posting a setup message to the proxy.

Args:
command_line: Command to execute on Pulsar
dependencies_description: Tool dependencies
env: Environment variables
remote_staging: Remote staging configuration
job_config: Job configuration
dynamic_file_sources: Dynamic file sources
token_endpoint: Token endpoint for file access

Returns:
None (async operation)
"""
launch_params = self._build_setup_message(
command_line,
dependencies_description=dependencies_description,
env=env,
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)

# Determine topic name based on manager
manager_name = self.client_manager.manager_name
topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup"

# Post message to proxy
self.client_manager.proxy_transport.post_message(topic, launch_params)
log.info("Job %s published to proxy topic '%s'", self.job_id, topic)
return None

def get_status(self):
"""Request job status by posting a status request message to the proxy.

Returns:
Cached status if available, None otherwise
"""
manager_name = self.client_manager.manager_name
topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request"

status_params = {
'job_id': self.job_id,
}

self.client_manager.proxy_transport.post_message(topic, status_params)
log.debug("Job status request for %s published to proxy topic '%s'", self.job_id, topic)

# Return cached status if available
return self.client_manager.status_cache.get(self.job_id, {}).get('status', None)

def kill(self):
"""Kill a job by posting a kill message to the proxy."""
manager_name = self.client_manager.manager_name
topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill"

kill_params = {'job_id': self.job_id}
self.client_manager.proxy_transport.post_message(topic, kill_params)
log.info("Job kill request for %s published to proxy topic '%s'", self.job_id, topic)


class ExecutionType(str, Enum):
# containers run one after each other with similar configuration
# like in TES or AWS Batch
Expand Down
Loading
Loading