Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions data_validation.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
from prefect import task, flow, get_run_logger
import time as ttime
from tiled.client import from_profile
from prefect.blocks.system import Secret
from tiled.client import from_uri


@task
def get_run(uid, api_key=None):
tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = tiled_client["srx/raw"][uid]
return run


@task
def read_stream(run, stream):
return run[stream].read()


@task(retries=2, retry_delay_seconds=10)
def read_all_streams(uid, beamline_acronym):
api_key = Secret.load("tiled-srx-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)
def read_all_streams(uid, api_key=None):
logger = get_run_logger()
run = tiled_client[beamline_acronym]["raw"][uid]
run = get_run(uid, api_key=api_key)
logger.info(f"Validating uid {run.start['uid']}")
start_time = ttime.monotonic()
for stream in run:
logger.info(f"{stream}:")
stream_start_time = ttime.monotonic()
stream_data = run[stream].read()
stream_data = read_stream(run, stream)
stream_elapsed_time = ttime.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
Expand All @@ -24,5 +33,5 @@ def read_all_streams(uid, beamline_acronym):


@flow
def data_validation(uid):
read_all_streams(uid, beamline_acronym="srx")
def data_validation(uid, api_key=None):
read_all_streams(uid, api_key=api_key)
33 changes: 20 additions & 13 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@

from prefect import task, flow, get_run_logger
from prefect.blocks.notifications import SlackWebhook
from prefect.blocks.system import Secret
from prefect.context import FlowRunContext

from xanes_exporter import xanes_exporter
from xrf_hdf5_exporter import xrf_hdf5_exporter
from logscan import logscan

from tiled.client import from_profile
from dotenv import load_dotenv
import os
from data_validation import get_run

CATALOG_NAME = "srx"


def get_api_key_from_env():
with open("/srv/container.secret", "r") as secrets:
load_dotenv(stream=secrets)
api_key = os.environ["TILED_API_KEY"]
return api_key


def slack(func):
"""
Send a message to mon-prefect slack channel about the flow-run status.
Expand All @@ -24,7 +31,7 @@ def slack(func):
the flow. To keep the naming of workflows consistent, the name of this inner function had to match the expected name.
"""

def end_of_run_workflow(stop_doc):
def end_of_run_workflow(stop_doc, api_key=None):
flow_run_name = FlowRunContext.get().flow_run.dict().get("name")

# Load slack credentials that are saved in Prefect.
Expand All @@ -35,10 +42,8 @@ def end_of_run_workflow(stop_doc):
uid = stop_doc["run_start"]

# Get the scan_id.
api_key = Secret.load("tiled-srx-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)[CATALOG_NAME]
tiled_client_raw = tiled_client["raw"]
scan_id = tiled_client_raw[uid].start["scan_id"]
run = get_run(uid, api_key=api_key)
scan_id = run.start["scan_id"]

# Send a message to mon-bluesky if bluesky-run failed.
if stop_doc.get("exit_status") == "fail":
Expand Down Expand Up @@ -74,11 +79,13 @@ def log_completion():

@flow
@slack
def end_of_run_workflow(stop_doc):
def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
uid = stop_doc["run_start"]
if not api_key:
api_key = get_api_key_from_env()

# data_validation(uid, return_state=True)
xanes_exporter(uid)
xrf_hdf5_exporter(uid)
logscan(uid)
# data_validation(uid, return_state=True, api_key=api)
xanes_exporter(uid, api_key=api_key, dry_run=dry_run)
xrf_hdf5_exporter(uid, api_key=api_key, dry_run=dry_run)
logscan(uid, api_key=api_key, dry_run=dry_run)
log_completion()
24 changes: 11 additions & 13 deletions logscan.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from pathlib import Path
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from tiled.client import from_profile


api_key = Secret.load("tiled-srx-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)["srx"]
tiled_client_raw = tiled_client["raw"]
from data_validation import get_run


def find_scanid(logfile_path, scanid):
Expand All @@ -21,10 +16,10 @@ def find_scanid(logfile_path, scanid):


@task
def logscan_detailed(scanid):
def logscan_detailed(scanid, api_key=None, dry_run=False):
logger = get_run_logger()

h = tiled_client_raw[scanid]
h = get_run(scanid, api_key=api_key)

if (
"Beamline Commissioning (beamline staff only)".lower()
Expand Down Expand Up @@ -71,14 +66,17 @@ def logscan_detailed(scanid):
out_str += "\n"

# Write to file
with open(logfile_path, "a") as userlogf:
userlogf.write(out_str)
logger.info(f"Added {h.start['scan_id']} to the logs")
if dry_run:
logger.info(f"Dry run: scan_id: {h.start['scan_id']} output: {out_str}")
else:
with open(logfile_path, "a") as userlogf:
userlogf.write(out_str)
logger.info(f"Added {h.start['scan_id']} to the logs")


@flow(log_prints=True)
def logscan(ref):
def logscan(ref, api_key=None, dry_run=False):
logger = get_run_logger()
logger.info("Start writing logfile...")
logscan_detailed(ref)
logscan_detailed(ref, api_key=api_key, dry_run=dry_run)
logger.info("Finish writing logfile.")
4 changes: 1 addition & 3 deletions prefect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ deployments:
schedule: {}
work_pool:
job_variables:
env:
TILED_SITE_PROFILES: /nsls2/software/etc/tiled/profiles
image: ghcr.io/nsls2/srx-workflows:main
image_pull_policy: Always
network: slirp4netns
volumes:
- /nsls2/data/srx/proposals:/nsls2/data/srx/proposals
- /nsls2/software/etc/tiled:/nsls2/software/etc/tiled
- /srv/prefect3-docker-worker-srx/app:/srv
container_create_kwargs:
userns_mode: "keep-id:uid=402949,gid=402949" # workflow-srx:workflow-srx
auto_remove: true
Expand Down
69 changes: 40 additions & 29 deletions xanes_exporter.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from tiled.client import from_profile

from data_validation import get_run
import time as ttime
import numpy as np
import xraylib as xrl
import pandas as pd


api_key = Secret.load("tiled-srx-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)["srx"]
tiled_client_raw = tiled_client["raw"]


def xanes_textout(
scanid=-1,
header=[],
Expand All @@ -21,6 +14,7 @@ def xanes_textout(
usercolumn={},
usercolumnname=[],
output=True,
api_key=None,
):
"""
scan: can be scan_id (integer) or uid (string). default=-1 (last scan run)
Expand All @@ -35,7 +29,7 @@ def xanes_textout(

"""

h = tiled_client_raw[scanid]
h = get_run(scanid, api_key=api_key)
if (
"Beamline Commissioning (beamline staff only)".lower()
in h.start["proposal"]["type"].lower()
Expand Down Expand Up @@ -121,13 +115,13 @@ def xanes_textout(


@task
def xas_step_exporter(scanid):
def xas_step_exporter(scanid, api_key=None, dry_run=False):
logger = get_run_logger()

# Custom header list
headeritem = []
# Load header for our scan
h = tiled_client_raw[scanid]
h = get_run(scanid, api_key=api_key)

if h.start["scan"].get("type") != "XAS_STEP":
logger.info("Incorrect document type. Not running exporter on this document.")
Expand Down Expand Up @@ -216,22 +210,26 @@ def xas_step_exporter(scanid):
# usercolumnitem['If-{:02}'.format(i)] = roisum
# usercolumnitem['If-{:02}'.format(i)].round(0)

xanes_textout(
scanid=scanid,
header=headeritem,
userheader=userheaderitem,
column=columnitem,
usercolumn=usercolumnitem,
usercolumnname=usercolumnitem.keys(),
output=False,
)
if dry_run:
logger.info("Dry run: Not exporting xanes")
else:
xanes_textout(
scanid=scanid,
header=headeritem,
userheader=userheaderitem,
column=columnitem,
usercolumn=usercolumnitem,
usercolumnname=usercolumnitem.keys(),
output=False,
api_key=api_key,
)


@task
def xas_fly_exporter(uid):
def xas_fly_exporter(uid, api_key=None, dry_run=False):
logger = get_run_logger()
# Get a scan header
hdr = tiled_client_raw[uid]
hdr = get_run(uid, api_key=api_key)
start_doc = hdr.start

# Get proposal directory location
Expand Down Expand Up @@ -320,27 +318,40 @@ def xas_fly_exporter(uid):
staticheader += "# \n# "

# Export data to file
with open(fname, "w") as f:
f.write(staticheader)
df.to_csv(fname, float_format="%.3f", sep=" ", mode="a")
if dry_run:
logger.info("Dry run: xas fly exporter")
if len(df) >= 2:
logger.info(
"Dry run: first and last row: {pd.concat([df.head(1), df.tail(1)])}"
)
elif len(df) == 1:
logger.info("Dry run: row: {df}")
else:
logger.info("Dry run: (no data)")
else:
with open(fname, "w") as f:
f.write(staticheader)
df.to_csv(fname, float_format="%.3f", sep=" ", mode="a")


@flow(log_prints=True)
def xanes_exporter(ref):
def xanes_exporter(ref, api_key=None, dry_run=False):
logger = get_run_logger()
logger.info("Start writing file with xanes_exporter...")

# Get scan type
scan_type = tiled_client_raw[ref].start.get("scan", {}).get("type", "unknown")
scan_type = (
get_run(ref, api_key=api_key).start.get("scan", {}).get("type", "unknown")
)

# Redirect to correction function - or pass
if scan_type == "XAS_STEP":
logger.info("Starting xanes step-scan exporter.")
xas_step_exporter(ref)
xas_step_exporter(ref, api_key=api_key, dry_run=dry_run)
logger.info("Finished writing file with xanes step-scan exporter.")
elif scan_type == "XAS_FLY":
logger.info("Starting xanes fly-scan exporter.")
xas_fly_exporter(ref)
xas_fly_exporter(ref, api_key=api_key, dry_run=dry_run)
logger.info("Finished writing file with xanes fly-scan exporter.")
else:
logger.info(f"xanes exporter for {scan_type=} not available")
29 changes: 14 additions & 15 deletions xrf_hdf5_exporter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret

import glob
import os
Expand All @@ -8,26 +7,23 @@
import dask
from pyxrf.api import make_hdf

from tiled.client import from_profile
from data_validation import get_run

# from pyxrf.api import make_hdf

CATALOG_NAME = "srx"

api_key = Secret.load("tiled-srx-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)[CATALOG_NAME]
tiled_client_raw = tiled_client["raw"]


@task
def export_xrf_hdf5(scanid):
def export_xrf_hdf5(scanid, api_key=None, dry_run=False):
logger = get_run_logger()

logger.info(f"{pyxrf.__file__ = }")

logger.info(f"{dask.__file__ = }")

# Load header for our scan
h = tiled_client_raw[scanid]
h = get_run(scanid, api_key=api_key)

if h.start["scan"]["type"] not in ["XRF_FLY", "XRF_STEP"]:
logger.info(
Expand Down Expand Up @@ -64,17 +60,20 @@ def export_xrf_hdf5(scanid):
os.environ["TILED_API_KEY"] = (
api_key # pyxrf assumes Tiled API key as an environment variable
)
make_hdf(scanid, wd=working_dir, prefix=prefix, catalog_name=CATALOG_NAME)
if dry_run:
logger.info("Dry run: not creating HDF5 file using PyXRF")
else:
make_hdf(scanid, wd=working_dir, prefix=prefix, catalog_name=CATALOG_NAME)

# chmod g+w for created file(s)
# context: https://nsls2.slack.com/archives/C04UUSG88VB/p1718911163624149
for file in glob.glob(f"{working_dir}/{prefix}{scanid}*.h5"):
os.chmod(file, os.stat(file).st_mode | stat.S_IWGRP)
# chmod g+w for created file(s)
# context: https://nsls2.slack.com/archives/C04UUSG88VB/p1718911163624149
for file in glob.glob(f"{working_dir}/{prefix}{scanid}*.h5"):
os.chmod(file, os.stat(file).st_mode | stat.S_IWGRP)


@flow(log_prints=True)
def xrf_hdf5_exporter(scanid):
def xrf_hdf5_exporter(scanid, api_key=None, dry_run=False):
logger = get_run_logger()
logger.info("Start writing file with xrf_hdf5 exporter...")
export_xrf_hdf5(scanid)
export_xrf_hdf5(scanid, api_key=api_key, dry_run=dry_run)
logger.info("Finish writing file with xrf_hdf5 exporter.")
Loading