Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
35a1ad8
Starting to work on ability to request runengine metadata from RE worker
jwlodek Jan 23, 2026
c47cc80
Merge branch 'add-request-re-md' of https://github.com/jwlodek/bluesk…
jwlodek Jan 23, 2026
9190f65
Add some metadata to example profile RE
jwlodek Jan 23, 2026
b7c7544
Add handler for re_metadata message to manager
jwlodek Jan 23, 2026
f4bf1b9
Response should be in the form of a dict, not a string
jwlodek Jan 23, 2026
e24f2a8
Remove unnecessary conditional
jwlodek Jan 23, 2026
cca925f
Add test case for new re_metadata message
jwlodek Jan 23, 2026
c2d15a0
Expand test to confirm that changes to the RE.md are reflected in sub…
jwlodek Jan 23, 2026
86f0800
Add line to test re metadata cli
jwlodek Jan 23, 2026
8296527
Add docstring for _request_re_metadata_handler
jwlodek Jan 23, 2026
44b503c
Change logging message to debug for RE metadata handler
jwlodek Jan 23, 2026
fd7d81f
Minor cleanup of CLI help message
jwlodek Jan 23, 2026
853d49d
Linting fixes
jwlodek Jan 23, 2026
6bed2ca
Adjust unhelpful error messages in unit test
jwlodek Jan 23, 2026
d934656
Add a test covering a situation when runengine metadata is not serial…
jwlodek Jan 23, 2026
340706e
Correctly handle non-serializable type with msgpack encoding, update …
jwlodek Jan 23, 2026
0c99c51
Merge branch 'main' of https://github.com/bluesky/bluesky-queueserver…
jwlodek Jan 23, 2026
461c743
Add descriptive custom error messages for serializtion errors
jwlodek Feb 2, 2026
cad67ae
Remove unneeded .upper()
jwlodek Feb 2, 2026
50591f4
Handle additional edge cases brought up in review
jwlodek Feb 5, 2026
747aa0c
Update the documentation
jwlodek Feb 5, 2026
8d27042
Fix docs build issue
jwlodek Feb 5, 2026
0adb287
Apply suggestion from @dmgav
dmgav Feb 6, 2026
32ffde5
Apply suggestion from @dmgav
dmgav Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions bluesky_queueserver/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,16 @@ async def _worker_request_plans_and_devices_list(self):
plans_and_devices_list, err_msg = None, "Timeout occurred while processing the request"
return plans_and_devices_list, err_msg

async def _worker_request_runengine_metadata(self):
try:
runengine_metadata = await self._comm_to_worker.send_msg("request_runengine_metadata")
err_msg = ""
if runengine_metadata is None:
err_msg = "Failed to obtain the RE metadata from the worker"
except CommTimeoutError:
runengine_metadata, err_msg = None, "Timeout occurred while processing the request"
return runengine_metadata, err_msg

async def _worker_request_task_results(self):
try:
tt = self._comm_to_worker_timeout_long
Expand Down Expand Up @@ -3351,6 +3361,35 @@ async def _re_runs_handler(self, request):

return {"success": success, "msg": msg, "run_list": run_list, "run_list_uid": run_list_uid}

async def _re_metadata_handler(self, request):
"""
Returns the runengine metadata from the RE worker process
"""

# Clients may ask for RE metadata frequently to update some state,
# so make this message debug only.
logger.debug("Returning the runengine metadata dictionary ...")

success, msg, re_metadata = True, "", {}
try:
self._check_request_for_unsupported_params(request=request, param_names=[])

if self._environment_exists:
re_metadata, msg = await self._worker_request_runengine_metadata()

if re_metadata is None:
success, re_metadata = False, {}
else:
success, msg, re_metadata = (
False,
"Environment does not exist. Cannot retrieve Run Engine metadata.",
{},
)
except Exception as ex:
success, msg = False, f"Error: {ex}"

return {"success": success, "msg": msg, "re_metadata": re_metadata}

def _lock_key_invalid_msg(self):
"""
Format error message, which reports an invalid lock key.
Expand Down Expand Up @@ -3684,6 +3723,7 @@ async def _zmq_execute(self, msg):
"re_abort": "_re_abort_handler",
"re_halt": "_re_halt_handler",
"re_runs": "_re_runs_handler",
"re_metadata": "_re_metadata_handler",
"lock": "_lock_handler",
"lock_info": "_lock_info_handler",
"unlock": "_unlock_handler",
Expand Down
3 changes: 2 additions & 1 deletion bluesky_queueserver/manager/qserver_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class QServerExitCodes(enum.Enum):
qserver re stop # STOP execution of a paused plan
qserver re abort # ABORT execution of a paused plan
qserver re halt # HALT execution of a paused plan
qserver re metadata # Request RunEngine metadata

qserver re runs # Get the list of active runs (runs generated by the currently running plans)
qserver re runs active # Get the list of active runs
Expand Down Expand Up @@ -1135,7 +1136,7 @@ def create_msg(params, *, lock_key):
elif command == "re":
if len(params) < 1:
raise CommandParameterError(f"Request '{command}' must include at least one parameter")
supported_params = ("pause", "resume", "stop", "abort", "halt", "runs")
supported_params = ("pause", "resume", "stop", "abort", "halt", "runs", "metadata")
if params[0] in supported_params:
if params[0] == "pause":
method, prms = msg_re_pause(params)
Expand Down
2 changes: 2 additions & 0 deletions bluesky_queueserver/manager/tests/test_qserver_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def test_qserver_cli_and_manager(re_manager): # noqa: F811

assert sp_call(["qserver", "re", "resume"]) == SUCCESS

assert sp_call(["qserver", "re", "metadata"]) == SUCCESS

assert wait_for_condition(
time=60, condition=condition_queue_processing_finished
), "Timeout while waiting for process to finish"
Expand Down
63 changes: 63 additions & 0 deletions bluesky_queueserver/manager/tests/test_zmq_api_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5971,6 +5971,69 @@ def test_zmq_api_re_runs_1(re_manager_pc_copy, tmp_path, test_with_manager_resta
assert wait_for_condition(time=5, condition=condition_environment_closed)


# fmt: off
@pytest.mark.parametrize("test_with_manager_restart", [False, True])
# fmt: on
def test_zmq_api_re_metadata_1(re_manager_pc_copy, tmp_path, test_with_manager_restart): # noqa: F811
Comment thread
jwlodek marked this conversation as resolved.
Outdated
"""
Tests `re_metadata` functionality. First checks if the request correctly fails if the environment is not open.
Then checks to make sure initial metadata could be retrieved. Then, executes a plan that adds `scan_id` and
`versions` keys to the metadata, and makes sure we can read those back, as well as the original key/value.
"""

_, pc_path = re_manager_pc_copy

resp, _ = zmq_request("re_metadata")
assert resp["success"] is False, f"{resp =}"
assert resp["msg"] == "Environment does not exist. Cannot retrieve Run Engine metadata."

# Open the environment
resp, _ = zmq_request("environment_open")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_created)

# Get the initial run_engine metadata
resp, _ = zmq_request("re_metadata")
assert resp["success"] is True, f"{resp =}"

def check_initial_metadata(re_md):
"""Function that checks if initial metadata matches what we expect"""

assert "metadata_key" in re_md, "Expected key not found in RE metadata"
Comment thread
jwlodek marked this conversation as resolved.
Outdated
assert re_md["metadata_key"] == "metadata_value"
assert "versions" in re_md, "Expected to find versions dictionary in metadata"
assert "bluesky" in re_md["versions"], "Expected to find bluesky version in metadata"

re_md = resp["re_metadata"]
check_initial_metadata(re_md)
assert len(re_md) == 2, "Only two metadata keys should be present initially!"

# Add the first 'count' plan
resp, _ = zmq_request("queue_item_add", {"item": _plan1, "user": _user, "user_group": _user_group})
assert resp["success"] is True

# Execute the count plan. This should add some additional metadata items
resp, _ = zmq_request("queue_start")
assert resp["success"] is True
assert wait_for_condition(20, condition=condition_manager_idle)

resp, _ = zmq_request("re_metadata")
assert resp["success"] is True, f"{resp =}"
re_md = resp["re_metadata"]

# Make sure we can retrieve scan ID, and it's an int
# the `count` plan should add this key, and it should start at `1`
assert "scan_id" in re_md, "Scan ID not found in RE metadata"
assert re_md["scan_id"] == 1, "Scan ID for first plan should be 1"

# Make sure our original metadata key is still there
check_initial_metadata(re_md)

resp, _ = zmq_request("environment_close")
assert resp["success"] is True, f"{resp =}"
assert wait_for_condition(time=5, condition=condition_environment_closed)


# fmt: off
@pytest.mark.parametrize("em_lock_code", [False, True])
# fmt: on
Expand Down
8 changes: 8 additions & 0 deletions bluesky_queueserver/manager/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,13 @@ def _request_plans_and_devices_list_handler(self):
self._existing_plans_and_devices_changed = False
return msg_out

def _request_runengine_metadata_handler(self):
"""
Returns the current state of the Run Engine metadata as a dictionary
"""

return dict(self._RE.md)

def _command_close_env_handler(self):
"""
Close RE Worker environment in orderly way.
Expand Down Expand Up @@ -1341,6 +1348,7 @@ def _worker_prepare_for_startup(self):
self._comm_to_manager.add_method(
self._request_plans_and_devices_list_handler, "request_plans_and_devices_list"
)
self._comm_to_manager.add_method(self._request_runengine_metadata_handler, "request_runengine_metadata")
self._comm_to_manager.add_method(self._request_task_results_handler, "request_task_results")
self._comm_to_manager.add_method(self._command_close_env_handler, "command_close_env")
self._comm_to_manager.add_method(self._command_confirm_exit_handler, "command_confirm_exit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
except Exception:
pass

RE = RunEngine()
RE = RunEngine({"metadata_key": "metadata_value"})

bec = BestEffortCallback()
if not is_ipython_mode() or not ipython_matplotlib:
Expand Down
Loading