Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
35a1ad8
Starting to work on ability to request runengine metadata from RE worker
jwlodek Jan 23, 2026
c47cc80
Merge branch 'add-request-re-md' of https://github.com/jwlodek/bluesk…
jwlodek Jan 23, 2026
9190f65
Add some metadata to example profile RE
jwlodek Jan 23, 2026
b7c7544
Add handler for re_metadata message to manager
jwlodek Jan 23, 2026
f4bf1b9
Response should be in the form of a dict, not a string
jwlodek Jan 23, 2026
e24f2a8
Remove unnecessary conditional
jwlodek Jan 23, 2026
cca925f
Add test case for new re_metadata message
jwlodek Jan 23, 2026
c2d15a0
Expand test to confirm that changes to the RE.md are reflected in sub…
jwlodek Jan 23, 2026
86f0800
Add line to test re metadata cli
jwlodek Jan 23, 2026
8296527
Add docstring for _request_re_metadata_handler
jwlodek Jan 23, 2026
44b503c
Change logging message to debug for RE metadata handler
jwlodek Jan 23, 2026
fd7d81f
Minor cleanup of CLI help message
jwlodek Jan 23, 2026
853d49d
Linting fixes
jwlodek Jan 23, 2026
6bed2ca
Adjust unhelpful error messages in unit test
jwlodek Jan 23, 2026
d934656
Add a test covering a situation when runengine metadata is not serial…
jwlodek Jan 23, 2026
340706e
Correctly handle non-serializable type with msgpack encoding, update …
jwlodek Jan 23, 2026
0c99c51
Merge branch 'main' of https://github.com/bluesky/bluesky-queueserver…
jwlodek Jan 23, 2026
461c743
Add descriptive custom error messages for serializtion errors
jwlodek Feb 2, 2026
cad67ae
Remove unneeded .upper()
jwlodek Feb 2, 2026
50591f4
Handle additional edge cases brought up in review
jwlodek Feb 5, 2026
747aa0c
Update the documentation
jwlodek Feb 5, 2026
8d27042
Fix docs build issue
jwlodek Feb 5, 2026
0adb287
Apply suggestion from @dmgav
dmgav Feb 6, 2026
32ffde5
Apply suggestion from @dmgav
dmgav Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions bluesky_queueserver/manager/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import copy
import enum
import json
import logging
import time as ttime
import uuid
Expand Down Expand Up @@ -1586,6 +1587,18 @@ async def _worker_request_plans_and_devices_list(self):
plans_and_devices_list, err_msg = None, "Timeout occurred while processing the request"
return plans_and_devices_list, err_msg

async def _worker_request_runengine_metadata(self):
try:
runengine_metadata = await self._comm_to_worker.send_msg("request_runengine_metadata")
err_msg = ""
if runengine_metadata is None:
err_msg = "Failed to obtain the RE metadata from the worker"

except CommTimeoutError:
runengine_metadata, err_msg = None, "Timeout occurred while processing the request"

return runengine_metadata, err_msg

async def _worker_request_task_results(self):
try:
tt = self._comm_to_worker_timeout_long
Expand Down Expand Up @@ -3351,6 +3364,48 @@ async def _re_runs_handler(self, request):

return {"success": success, "msg": msg, "run_list": run_list, "run_list_uid": run_list_uid}

async def _re_metadata_handler(self, request):
"""
Returns the runengine metadata from the RE worker process
"""

# Clients may ask for RE metadata frequently to update some state,
# so make this message debug only.
logger.debug("Returning the runengine metadata dictionary ...")

success, msg, re_metadata = True, "", {}
try:
self._check_request_for_unsupported_params(request=request, param_names=[])

if self._environment_exists:
re_metadata, msg = await self._worker_request_runengine_metadata()

if re_metadata is None:
success, re_metadata = False, {}
else:
# Make sure the metadata is serializable given the encoding
try:
if self._zmq_encoding == ZMQEncoding.JSON:
json.dumps(re_metadata)
else:
msgpack.packb(re_metadata)
except Exception as ex:
success, msg, re_metadata = (
False,
f"Failed to serialize RE metadata with {self._zmq_encoding.name}: {ex}",
{},
)
else:
success, msg, re_metadata = (
False,
"Environment does not exist. Cannot retrieve RE metadata.",
{},
)
except Exception as ex:
success, msg, re_metadata = False, f"Error: {ex}", {}

return {"success": success, "msg": msg, "re_metadata": re_metadata}

def _lock_key_invalid_msg(self):
"""
Format error message, which reports an invalid lock key.
Expand Down Expand Up @@ -3684,6 +3739,7 @@ async def _zmq_execute(self, msg):
"re_abort": "_re_abort_handler",
"re_halt": "_re_halt_handler",
"re_runs": "_re_runs_handler",
"re_metadata": "_re_metadata_handler",
"lock": "_lock_handler",
"lock_info": "_lock_info_handler",
"unlock": "_unlock_handler",
Expand Down
3 changes: 2 additions & 1 deletion bluesky_queueserver/manager/qserver_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class QServerExitCodes(enum.Enum):
qserver re stop # STOP execution of a paused plan
qserver re abort # ABORT execution of a paused plan
qserver re halt # HALT execution of a paused plan
qserver re metadata # Request RunEngine metadata

qserver re runs # Get the list of active runs (runs generated by the currently running plans)
qserver re runs active # Get the list of active runs
Expand Down Expand Up @@ -1135,7 +1136,7 @@ def create_msg(params, *, lock_key):
elif command == "re":
if len(params) < 1:
raise CommandParameterError(f"Request '{command}' must include at least one parameter")
supported_params = ("pause", "resume", "stop", "abort", "halt", "runs")
supported_params = ("pause", "resume", "stop", "abort", "halt", "runs", "metadata")
if params[0] in supported_params:
if params[0] == "pause":
method, prms = msg_re_pause(params)
Expand Down
2 changes: 2 additions & 0 deletions bluesky_queueserver/manager/tests/test_qserver_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def test_qserver_cli_and_manager(re_manager): # noqa: F811

assert sp_call(["qserver", "re", "resume"]) == SUCCESS

assert sp_call(["qserver", "re", "metadata"]) == SUCCESS

assert wait_for_condition(
time=60, condition=condition_queue_processing_finished
), "Timeout while waiting for process to finish"
Expand Down
155 changes: 155 additions & 0 deletions bluesky_queueserver/manager/tests/test_zmq_api_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5971,6 +5971,161 @@ def test_zmq_api_re_runs_1(re_manager_pc_copy, tmp_path, test_with_manager_resta
assert wait_for_condition(time=5, condition=condition_environment_closed)


def test_zmq_api_re_metadata_1(re_manager_pc_copy, tmp_path): # noqa: F811
"""
Tests `re_metadata` functionality. First checks if the request correctly fails if the environment is not open.
Then checks to make sure initial metadata could be retrieved. Then, executes a plan that adds `scan_id` and
`versions` keys to the metadata, and makes sure we can read those back, as well as the original key/value.
"""

_, pc_path = re_manager_pc_copy

resp, _ = zmq_request("re_metadata")
assert resp["success"] is False, f"{resp =}"
assert resp["msg"] == "Environment does not exist. Cannot retrieve RE metadata."

# Open the environment
resp, _ = zmq_request("environment_open")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_created)

# Get the initial run_engine metadata
resp, _ = zmq_request("re_metadata")
assert resp["success"] is True, f"{resp =}"

def check_initial_metadata(re_md):
"""Function that checks if initial metadata matches what we expect"""

assert "metadata_key" in re_md, pprint.pformat(re_md)
assert re_md["metadata_key"] == "metadata_value"
assert "versions" in re_md, pprint.pformat(re_md)
assert "bluesky" in re_md["versions"], pprint.pformat(re_md)

re_md = resp["re_metadata"]
check_initial_metadata(re_md)
assert len(re_md) == 2, "Only two metadata keys should be present initially!"

# Add the first 'count' plan
resp, _ = zmq_request("queue_item_add", {"item": _plan1, "user": _user, "user_group": _user_group})
assert resp["success"] is True

# Execute the count plan. This should add some additional metadata items
resp, _ = zmq_request("queue_start")
assert resp["success"] is True
assert wait_for_condition(20, condition=condition_manager_idle)

resp, _ = zmq_request("re_metadata")
assert resp["success"] is True, f"{resp =}"
re_md = resp["re_metadata"]

# Make sure we can retrieve scan ID, and it's an int
# the `count` plan should add this key, and it should start at `1`
assert "scan_id" in re_md, pprint.pformat(re_md)
assert re_md["scan_id"] == 1, "Scan ID for first plan should be 1"

# Make sure our original metadata key is still there
check_initial_metadata(re_md)

resp, _ = zmq_request("environment_close")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_closed)


_add_datetime_to_md = """
from datetime import datetime
RE.md['date'] = datetime.now()
"""


def test_zmq_api_re_metadata_2_non_serializable_md(re_manager_pc_copy, tmp_path): # noqa: F811

_, pc_path = re_manager_pc_copy
# Add a non-serializable type to the runengine metadata
append_code_to_last_startup_file(pc_path, additional_code=_add_datetime_to_md)

encoding = use_zmq_encoding_for_tests()

resp, _ = zmq_request("environment_open")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_created)

resp, _ = zmq_request("re_metadata")
assert resp["success"] is False

# Check that the error message is correct depending on encoding
if encoding == "json":
assert resp["msg"].startswith("Failed to serialize RE metadata with JSON:"), resp
elif encoding == "msgpack":
assert resp["msg"].startswith("Failed to serialize RE metadata with MSGPACK:"), resp

resp, _ = zmq_request("environment_close")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_closed)


def test_zmq_api_re_metadata_3_no_re(re_manager_pc_copy, tmp_path): # noqa: F811
"""
Tests `re_metadata` functionality when Run Engine is not present in the environment.
"""

_, pc_path = re_manager_pc_copy
remove_run_engine_config_from_startup(pc_path)

resp, _ = zmq_request("environment_open")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_created)

resp, _ = zmq_request("re_metadata")
assert resp["success"] is False, f"{resp =}"
assert "Run Engine does not exist in worker environment" in resp["msg"], resp

resp, _ = zmq_request("environment_close")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_closed)


def test_zmq_api_re_metadata_4_no_md(re_manager_pc_copy, tmp_path): # noqa: F811
"""
Tests `re_metadata` functionality when Run Engine does not have 'md' attribute.
"""

_, pc_path = re_manager_pc_copy
append_code_to_last_startup_file(pc_path, additional_code="del RE.md")

resp, _ = zmq_request("environment_open")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_created)

resp, _ = zmq_request("re_metadata")
assert resp["success"] is False, f"{resp =}"
assert "Run Engine does not have a metadata attribute" in resp["msg"], resp

resp, _ = zmq_request("environment_close")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_closed)


def test_zmq_api_re_metadata_5_non_mapping_md(re_manager_pc_copy, tmp_path): # noqa: F811
"""
Tests `re_metadata` functionality when Run Engine has non-mapping type as 'md' attribute.
"""

_, pc_path = re_manager_pc_copy
append_code_to_last_startup_file(pc_path, additional_code="RE.md = 12345")

resp, _ = zmq_request("environment_open")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_created)

resp, _ = zmq_request("re_metadata")
assert resp["success"] is False, f"{resp =}"
assert "Failed to convert Run Engine metadata to dictionary" in resp["msg"], resp

resp, _ = zmq_request("environment_close")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_closed)


# fmt: off
@pytest.mark.parametrize("em_lock_code", [False, True])
# fmt: on
Expand Down
16 changes: 16 additions & 0 deletions bluesky_queueserver/manager/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,21 @@ def _request_plans_and_devices_list_handler(self):
self._existing_plans_and_devices_changed = False
return msg_out

def _request_runengine_metadata_handler(self):
"""
Returns the current state of the Run Engine metadata as a dictionary
"""

if self._RE is None:
raise RuntimeError("Run Engine does not exist in worker environment")
elif not hasattr(self._RE, "md"):
raise AttributeError("Run Engine does not have a metadata attribute")

try:
return dict(self._RE.md)
except Exception as ex:
raise RuntimeError(f"Failed to convert Run Engine metadata to dictionary: {ex}") from ex

def _command_close_env_handler(self):
"""
Close RE Worker environment in orderly way.
Expand Down Expand Up @@ -1341,6 +1356,7 @@ def _worker_prepare_for_startup(self):
self._comm_to_manager.add_method(
self._request_plans_and_devices_list_handler, "request_plans_and_devices_list"
)
self._comm_to_manager.add_method(self._request_runengine_metadata_handler, "request_runengine_metadata")
self._comm_to_manager.add_method(self._request_task_results_handler, "request_task_results")
self._comm_to_manager.add_method(self._command_close_env_handler, "command_close_env")
self._comm_to_manager.add_method(self._command_confirm_exit_handler, "command_confirm_exit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
except Exception:
pass

RE = RunEngine()
RE = RunEngine({"metadata_key": "metadata_value"})

bec = BestEffortCallback()
if not is_ipython_mode() or not ipython_matplotlib:
Expand Down
2 changes: 2 additions & 0 deletions docs/source/cli_tools.rst
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ periodically requests and displays the status of Queue Server.
qserver re runs open # Get the list of open runs (subset of active runs)
qserver re runs closed # Get the list of closed runs (subset of active runs)

qserver re metadata # Get Run Engine metadata

qserver history get # Request plan history
qserver history clear # Clear plan history
qserver history clear 200 # Clear the history, leave the latest 200 items
Expand Down
5 changes: 5 additions & 0 deletions docs/source/qserver_quick_ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ commands and HTTP API::
qserver re runs open # Get the list of open runs (subset of active runs)
qserver re runs closed # Get the list of closed runs (subset of active runs)

As long as the environment is opened, you may also query the current state of the runengine
metadata dictionary.

qserver re metadata # Get the current runengine metadata dictionary

The queue can be stopped at any time. Stopping the queue is a safe operation. When the stopping
sequence is initiated, the currently running plan is finished and the next plan is not be started.
The stopping sequence can be cancelled if it was activated by mistake or decision was changed::
Expand Down
40 changes: 38 additions & 2 deletions docs/source/re_manager_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ Monitor the list of active runs:

- :ref:`method_re_runs`

Get Run Engine metadata:

- :ref:`method_re_metadata`

Run tasks in RE Worker namespace:

- :ref:`method_script_upload`
Expand Down Expand Up @@ -1676,7 +1680,7 @@ Description Request the list of active runs generated by the currently execute
included in the returned parameters
------------ -----------------------------------------------------------------------------------------
Parameters **option**: *'active'*, *'open'* or *'closed'* (optional)
select between full list of 'active' (default) runs, the list of 'open' or 'closed'
Select between full list of 'active' (default) runs, the list of 'open' or 'closed'
Comment thread
dmgav marked this conversation as resolved.
Outdated
runs.
------------ -----------------------------------------------------------------------------------------
Returns **success**: *boolean*
Expand All @@ -1686,7 +1690,7 @@ Returns **success**: *boolean*
error message in case of failure, empty string ('') otherwise.

**run_list**: *list(dict)*
the requested list of runs, list items are dictionaries with keys 'uid' (str),
The requested list of runs, list items are dictionaries with keys 'uid' (str),
Comment thread
dmgav marked this conversation as resolved.
Outdated
'scan_id' (int), 'is_open' (boolean) and 'exit_status' (str or None).
See Bluesky documentation for 'exit_status' values.

Expand All @@ -1698,6 +1702,38 @@ Execution Immediate: no follow-up requests are required.
============ =========================================================================================



.. _method_re_metadata:


**'re_metadata'**
^^^^^^^^^^^^^^^^^

============ =========================================================================================
Method **'re_metadata'**
------------ -----------------------------------------------------------------------------------------
Description Request the current state of the runengine metadata dictionary. The metadata dictionary may be
modified by plans during execution, or by external processes if the metadata is tied to an external
source like a Redis server. The API allows clients to monitor changes
in the metadata to display specific metadata key values to users. By default the bluesky Run Engine
will update the metadata to be an in-process dictionary with a 'versions' key that tracks the versions of bluesky and
related libraries. A transient 'scan_id' key is also added when a plan is run.
------------ -----------------------------------------------------------------------------------------
Parameters ---
------------ -----------------------------------------------------------------------------------------
Returns **success**: *boolean*
indicates if the request was processed successfully.

**msg**: *str*
error message in case of failure, empty string ('') otherwise.

**re_metadata**: *dict*
The requested runengine metadata dictionary. If the request fails, the dictionary is empty.
------------ -----------------------------------------------------------------------------------------
Execution Immediate: no follow-up requests are required.
============ =========================================================================================


.. _method_script_upload:

**'script_upload'**
Expand Down
Loading