-
Notifications
You must be signed in to change notification settings - Fork 3
Update data validation btp #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
c66575e
c84a030
1f5c8b3
242db8d
a2eef40
e8e9450
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,28 +1,35 @@ | ||
| from prefect import task, flow, get_run_logger | ||
| from prefect.blocks.system import Secret | ||
| from bluesky_tiled_plugins.writing.validator import validate | ||
| import time as ttime | ||
| from tiled.client import from_profile | ||
|
|
||
|
|
||
| @task(retries=2, retry_delay_seconds=10) | ||
| def read_all_streams(uid, beamline_acronym): | ||
| @task | ||
| def check_stream(run): | ||
| logger = get_run_logger() | ||
| api_key = Secret.load("tiled-smi-api-key", _sync=True).get() | ||
| tiled_client = from_profile("nsls2", api_key=api_key) | ||
| run = tiled_client[beamline_acronym]["raw"][uid] | ||
| logger.info(f"Validating uid {uid}") | ||
| start_time = ttime.monotonic() | ||
| for stream in run: | ||
| logger.info(f"{stream}:") | ||
| stream_start_time = ttime.monotonic() | ||
| stream_data = run[stream].read() | ||
| stream_elapsed_time = ttime.monotonic() - stream_start_time | ||
| logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") | ||
| logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") | ||
| elapsed_time = ttime.monotonic() - start_time | ||
| logger.info(f"{elapsed_time = }") | ||
|
|
||
|
|
||
| @flow | ||
| def data_validation(uid): | ||
| read_all_streams(uid, beamline_acronym="smi") | ||
| @flow(retries=2, retry_delay_seconds=10) | ||
| def data_validation(uid, beamline_acronym="smi"): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may still need to keep the old validation task that uses the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so you want to keep the task that reads all data as a check for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah okay, if improving the performance was the main reason -- maybe just remove it then. I don't know if anyone paid any attention to this reading failing. |
||
| logger = get_run_logger() | ||
| api_key = Secret.load("tiled-smi-api-key", _sync=True).get() | ||
| tiled_client = from_profile("nsls2", api_key=api_key) | ||
| run_client = tiled_client[beamline_acronym]["migration"][uid] | ||
| run_client_raw = tiled_client[beamline_acronym]["raw"][uid] | ||
| logger.info(f"Launching tasks to check streams and validate uid {uid}") | ||
| start_time = ttime.monotonic() | ||
| check_stream_task = check_stream.submit(run) | ||
| validate_task = validate.submit(run_client, fix_errors=True, try_reading=True, raise_on_error=True) | ||
| logger.info("Waiting for tasks to complete") | ||
| check_stream_task.result() | ||
| validate_task.result() | ||
| elapsed_time = ttime.monotonic() - start_time | ||
| logger.info(f"Finished checking and validating data; total {elapsed_time = }") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a conceptual improvement in defining flows directly (instead of flows comprised of tasks)? Asking for a friend. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the idea is that tasks are smaller components and can be retried - https://docs.prefect.io/v3/concepts/flows#why-both-flows-and-tasks
you could say that we could have made reading the individual streams in the previous read_all_streams() as an individual task, while making the main function as a flow.
no restrictions on flows calling tasks and vice versa! (as well as a flow calling a flow and task calling a task)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note on a task calling a task, it is possible, but not recommended (see discussion here).