From b06dde1a5f7c9275d48d244b964907dd19f29bdd Mon Sep 17 00:00:00 2001 From: Peter Beaucage Date: Wed, 4 Dec 2024 16:01:13 -0500 Subject: [PATCH 1/8] Add basic reduction flow to PyHyper --- pyhyper.py | 104 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 45 deletions(-) diff --git a/pyhyper.py b/pyhyper.py index c59a907..061ae0a 100644 --- a/pyhyper.py +++ b/pyhyper.py @@ -3,16 +3,66 @@ import httpx -# import pyFAI -# import PyHyperScattering -from prefect import get_run_logger, task -# from tiled.client import from_profile +import PyHyperScattering +from prefect import get_run_logger, task PATH = "/nsls2/data/dssi/scratch/prefect-outputs/rsoxs/" DATA_SESSION_PATTERN = re.compile("[passGUCP]*-([0-9]+)") +import PyHyperScattering +import xarray as xr +from tiled.client import from_uri +from tiled.client.xarray import write_xarray_dataset + +@task +def load_and_reduce(scanid,override_bcx = None, override_bcy = None, override_dist = None, override_mask = None): + scan = load.loadRun(scanid) + scan_us = scan.unstack('system') # this is expensive, do it only once + if 'energy' in scan_us.dims: + integrator = PyHyperScattering.integrate.PFEnergySeriesIntegrator + else: + integrator = PyHyperScattering.integrate.PFGeneralIntegrator + integrator = integrator(integration_method='csr',geomethod='template_xr',template_xr=scan) + if override_bcx is not None: + integrator.ni_beamcenter_x = override_bcx + if override_bcy is not None: + integrator.ni_beamcenter_y = override_bcy + if override_dist is not None: + integrator.ni_distance = override_dist + if override_mask is not None: + integrator.mask = override_mask + return integrator.integrateImageStack(scan).to_dataset(name='reduced').unstack('system') + + + + +load = PyHyperScattering.load.SST1RSoXSDB(corr_mode='none') +writable_tiled = from_profile('rsoxs')['reduced_sandbox'] +@task +def load_reduce_and_write_to_tiled(scanid): + logger = get_run_logger() + try: + scan = load_and_reduce(scanid) + except Exception as e: + logger.warning(f"Exception during reduction {e}") + write_xarray_dataset(writable_tiled,scan) + return scan + +''' +def auto_reduce_recent_data_if_not_reduced(number): + for scan_ref in range (-number,-1): + try: + if load.c[scan_ref].stop is not None: # if the scan is currently running, stop will be None. + local_scan_id = load.c[scan_ref].metadata['summary']['scan_id'] + if len(analyzed_tiled.search(Eq('attrs.start.scan_id',local_scan_id))) > 0: + continue + load_reduce_and_write_to_tiled(scan_ref) + except Exception as e: + print(f'error in processing {scan_ref}, {e}') + +''' def lookup_directory(start_doc): """ @@ -51,47 +101,11 @@ def lookup_directory(start_doc): # Convert it to a pathlib.Path. return Path(paths[0]) - -####################################################################### -# WIP: Commenting out this function to avoid masking real linter errors -# OK to uncomment when development resumes. -####################################################################### -# @task -# def write_run_artifacts(scan_id): -# """ -# Example live-analysis function -# -# Parameters: -# run_to_plot (int): the local scan id from DataBroker -# """ -# start_doc = tiled_client_raw[scan_id].start -# directory = ( -# lookup_directory(start_doc) -# / start_doc["project_name"] -# / f"{start_doc['scan_id']}" -# ) -# directory.mkdir(parents=True, exist_ok=True) -# -# logger = get_run_logger() -# logger.info(f"starting pyhyper export to {directory}") -# -# logger.info(f"{PyHyperScattering.__version__}") -# -# c = from_profile("nsls2") -# logger.info("Loaded RSoXS Profile...") -# -# logger.info("created RSoXS catalog loader...") -# -# # except Exception: -# # logger.warning("Couldn't save as NeXus file.") -# logger.info("Done!") -# return integratedimages -# -# -# @flow -# def pyhyper_flow(scan_id=36106): -# write_run_artifacts(scan_id) -# log_status() +@flow +def pyhyper_flow(scan_id=36106): + scan = load_reduce_and_write_to_tiled(scan_id) + # TODO: decide and save these artifacts write_run_artifacts(scan) + log_status() @task From 5c7a6fd4ee150d9879d33ce6c6021a0bce359e3a Mon Sep 17 00:00:00 2001 From: Peter Beaucage Date: Wed, 4 Dec 2024 16:04:32 -0500 Subject: [PATCH 2/8] Trigger PyHyper at end of run --- end_of_run_workflow.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index e60af27..6f782a9 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -2,6 +2,7 @@ # from data_validation import general_data_validation from export import export +from pyhyper import @task @@ -15,4 +16,5 @@ def end_of_run_workflow(stop_doc): uid = stop_doc["run_start"] # general_data_validation(uid) export(uid) + pyhyper_flow(uid) log_completion() From 7570154c04a266293604d6fbeda950b5081dcf36 Mon Sep 17 00:00:00 2001 From: Peter Beaucage Date: Wed, 4 Dec 2024 16:05:01 -0500 Subject: [PATCH 3/8] Fix import --- end_of_run_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 6f782a9..5ec4ecf 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -2,7 +2,7 @@ # from data_validation import general_data_validation from export import export -from pyhyper import +from pyhyper import pyhyper_flow @task From f14a07b195dc36da2396b91743dbbb9925ce707a Mon Sep 17 00:00:00 2001 From: Peter Beaucage Date: Wed, 4 Dec 2024 16:11:20 -0500 Subject: [PATCH 4/8] Tidy up imports --- pyhyper.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pyhyper.py b/pyhyper.py index 061ae0a..e248c52 100644 --- a/pyhyper.py +++ b/pyhyper.py @@ -10,9 +10,7 @@ PATH = "/nsls2/data/dssi/scratch/prefect-outputs/rsoxs/" DATA_SESSION_PATTERN = re.compile("[passGUCP]*-([0-9]+)") -import PyHyperScattering -import xarray as xr -from tiled.client import from_uri +from tiled.client import from_profile from tiled.client.xarray import write_xarray_dataset @task From 46061aac905867552cc3a5e3dec8b3b7ecb8a5cf Mon Sep 17 00:00:00 2001 From: Peter Beaucage Date: Wed, 4 Dec 2024 16:15:45 -0500 Subject: [PATCH 5/8] Lint --- pyhyper.py | 67 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/pyhyper.py b/pyhyper.py index e248c52..2b21d8a 100644 --- a/pyhyper.py +++ b/pyhyper.py @@ -2,8 +2,6 @@ from pathlib import Path import httpx - - import PyHyperScattering from prefect import get_run_logger, task @@ -13,30 +11,38 @@ from tiled.client import from_profile from tiled.client.xarray import write_xarray_dataset + @task -def load_and_reduce(scanid,override_bcx = None, override_bcy = None, override_dist = None, override_mask = None): - scan = load.loadRun(scanid) - scan_us = scan.unstack('system') # this is expensive, do it only once - if 'energy' in scan_us.dims: - integrator = PyHyperScattering.integrate.PFEnergySeriesIntegrator - else: - integrator = PyHyperScattering.integrate.PFGeneralIntegrator - integrator = integrator(integration_method='csr',geomethod='template_xr',template_xr=scan) - if override_bcx is not None: - integrator.ni_beamcenter_x = override_bcx - if override_bcy is not None: - integrator.ni_beamcenter_y = override_bcy - if override_dist is not None: - integrator.ni_distance = override_dist - if override_mask is not None: - integrator.mask = override_mask - return integrator.integrateImageStack(scan).to_dataset(name='reduced').unstack('system') - - - - -load = PyHyperScattering.load.SST1RSoXSDB(corr_mode='none') -writable_tiled = from_profile('rsoxs')['reduced_sandbox'] +def load_and_reduce( + scanid, override_bcx=None, override_bcy=None, override_dist=None, override_mask=None +): + scan = load.loadRun(scanid) + scan_us = scan.unstack("system") # this is expensive, do it only once + if "energy" in scan_us.dims: + integrator = PyHyperScattering.integrate.PFEnergySeriesIntegrator + else: + integrator = PyHyperScattering.integrate.PFGeneralIntegrator + integrator = integrator( + integration_method="csr", geomethod="template_xr", template_xr=scan + ) + if override_bcx is not None: + integrator.ni_beamcenter_x = override_bcx + if override_bcy is not None: + integrator.ni_beamcenter_y = override_bcy + if override_dist is not None: + integrator.ni_distance = override_dist + if override_mask is not None: + integrator.mask = override_mask + return ( + integrator.integrateImageStack(scan) + .to_dataset(name="reduced") + .unstack("system") + ) + + +load = PyHyperScattering.load.SST1RSoXSDB(corr_mode="none") +writable_tiled = from_profile("rsoxs")["reduced_sandbox"] + @task def load_reduce_and_write_to_tiled(scanid): @@ -45,10 +51,11 @@ def load_reduce_and_write_to_tiled(scanid): scan = load_and_reduce(scanid) except Exception as e: logger.warning(f"Exception during reduction {e}") - write_xarray_dataset(writable_tiled,scan) + write_xarray_dataset(writable_tiled, scan) return scan - -''' + + +""" def auto_reduce_recent_data_if_not_reduced(number): for scan_ref in range (-number,-1): try: @@ -60,7 +67,8 @@ def auto_reduce_recent_data_if_not_reduced(number): except Exception as e: print(f'error in processing {scan_ref}, {e}') -''' +""" + def lookup_directory(start_doc): """ @@ -99,6 +107,7 @@ def lookup_directory(start_doc): # Convert it to a pathlib.Path. return Path(paths[0]) + @flow def pyhyper_flow(scan_id=36106): scan = load_reduce_and_write_to_tiled(scan_id) From 8d4a5d499472366166ce258958bebf07b4ef72c9 Mon Sep 17 00:00:00 2001 From: Peter Beaucage Date: Wed, 4 Dec 2024 16:23:26 -0500 Subject: [PATCH 6/8] More lint --- pyhyper.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/pyhyper.py b/pyhyper.py index 2b21d8a..f33584d 100644 --- a/pyhyper.py +++ b/pyhyper.py @@ -3,13 +3,14 @@ import httpx import PyHyperScattering -from prefect import get_run_logger, task +from prefect import get_run_logger, task, flow +from tiled.client import from_profile +from tiled.client.xarray import write_xarray_dataset + PATH = "/nsls2/data/dssi/scratch/prefect-outputs/rsoxs/" DATA_SESSION_PATTERN = re.compile("[passGUCP]*-([0-9]+)") -from tiled.client import from_profile -from tiled.client.xarray import write_xarray_dataset @task @@ -57,15 +58,15 @@ def load_reduce_and_write_to_tiled(scanid): """ def auto_reduce_recent_data_if_not_reduced(number): - for scan_ref in range (-number,-1): - try: - if load.c[scan_ref].stop is not None: # if the scan is currently running, stop will be None. - local_scan_id = load.c[scan_ref].metadata['summary']['scan_id'] - if len(analyzed_tiled.search(Eq('attrs.start.scan_id',local_scan_id))) > 0: - continue - load_reduce_and_write_to_tiled(scan_ref) - except Exception as e: - print(f'error in processing {scan_ref}, {e}') + for scan_ref in range (-number,-1): + try: + if load.c[scan_ref].stop is not None: # if the scan is currently running, stop will be None. + local_scan_id = load.c[scan_ref].metadata['summary']['scan_id'] + if len(analyzed_tiled.search(Eq('attrs.start.scan_id',local_scan_id))) > 0: + continue + load_reduce_and_write_to_tiled(scan_ref) + except Exception as e: + print(f'error in processing {scan_ref}, {e}') """ @@ -110,7 +111,8 @@ def lookup_directory(start_doc): @flow def pyhyper_flow(scan_id=36106): - scan = load_reduce_and_write_to_tiled(scan_id) + load_reduce_and_write_to_tiled(scan_id) + # scan = load_reduce_and_write_to_tiled(scan_id) # TODO: decide and save these artifacts write_run_artifacts(scan) log_status() From fa646ec7d06cd03801c20608f2b10ed52daa14cc Mon Sep 17 00:00:00 2001 From: Peter Beaucage Date: Wed, 4 Dec 2024 16:25:26 -0500 Subject: [PATCH 7/8] black changes --- pyhyper.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyhyper.py b/pyhyper.py index f33584d..a1b0020 100644 --- a/pyhyper.py +++ b/pyhyper.py @@ -3,11 +3,10 @@ import httpx import PyHyperScattering -from prefect import get_run_logger, task, flow +from prefect import flow, get_run_logger, task from tiled.client import from_profile from tiled.client.xarray import write_xarray_dataset - PATH = "/nsls2/data/dssi/scratch/prefect-outputs/rsoxs/" DATA_SESSION_PATTERN = re.compile("[passGUCP]*-([0-9]+)") From 88f0ead8e9ceb06d4ea946cbd59bfa22a9b16627 Mon Sep 17 00:00:00 2001 From: Peter Beaucage Date: Thu, 5 Dec 2024 14:48:20 -0500 Subject: [PATCH 8/8] Switch to string lookup so UID works --- pyhyper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyhyper.py b/pyhyper.py index a1b0020..f9894e8 100644 --- a/pyhyper.py +++ b/pyhyper.py @@ -16,7 +16,7 @@ def load_and_reduce( scanid, override_bcx=None, override_bcy=None, override_dist=None, override_mask=None ): - scan = load.loadRun(scanid) + scan = load.loadRun(load.c[scanid]) scan_us = scan.unstack("system") # this is expensive, do it only once if "energy" in scan_us.dims: integrator = PyHyperScattering.integrate.PFEnergySeriesIntegrator