-
Notifications
You must be signed in to change notification settings - Fork 9
Tiled integration #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
genematx
wants to merge
19
commits into
bluesky:main
Choose a base branch
from
genematx:tiled-integration
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
1fcc11d
ENH: allow OSX-64 platform
genematx b5627ef
ENH: install bluesky, tiled, ophyd-async; downdgrade python to 3.11.10
genematx d7da5a2
ENH: install ophyd-async
genematx d99f22c
MNT: install bluesky from working branch
genematx 3df3463
ENH: draft of the stream plotting tutorial
genematx 3df35f3
MNT: remove changes
genematx 44c51c3
MNT: fix typo
genematx 749b1a2
ENH: Tiled integration tutorial
genematx 99a99b6
Use PyPI because conda is behind and missing a dep.
danielballan 7505563
Less context
danielballan fa862d5
Tweak title
danielballan f07b832
Run tiled server on thread
danielballan 386e765
Merge pull request #1 from danielballan/tiled-integration
genematx 57bdf7b
MNT: remove pytest
genematx 754bb98
ENH: add threaded Tiled server
genematx 7983bf8
ENH: use threaded Tiled server in the notebook
genematx 7376d55
ENH: set default port to 0
genematx 8d63a68
FIX: rename TiledServer to emphasize it's temporary
genematx 965d3e9
MNT: update Bluesky to the released version
genematx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,4 +13,4 @@ kernelspec: | |
|
|
||
| # Flyscanning Basics | ||
|
|
||
| TODO | ||
| TODO | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| --- | ||
| jupytext: | ||
| text_representation: | ||
| extension: .md | ||
| format_name: myst | ||
| format_version: 0.13 | ||
| jupytext_version: 1.16.4 | ||
| kernelspec: | ||
| display_name: Python 3 (ipykernel) | ||
| language: python | ||
| name: python3 | ||
| --- | ||
|
|
||
| # Saving Bluesky Data in Tiled | ||
|
|
||
| +++ | ||
|
|
||
| In the standard Bluesky data model, it is assumed that each data point produced during an experiment is emitted in a separate Event document. During the acquisition, such high granularity enables fast feedback and supports low-latency downstream agents (e.g. live plotting), however the naive storage of individual documents as records in a database or lines in a file is not optimized for the retrieval of blocks of data spanning multiple rows. Instead, the recently developed solution presented here parses the Event data into a tabular format during ingestion. | ||
|
|
||
| This tutorial introduces TiledWriter -- a specialized callback in Bluesky designed to aggregate and store the incoming data in a way that would facilitate future random access. TiledWriter consumes Bluesky documents (all types of them) stores their contents at rest via API calls into a Tiled server. It transforms the data streams into a tabular form, suitable for the efficient retrieval of columns. | ||
|
|
||
| +++ | ||
|
|
||
| ### Initial Set-Up | ||
|
|
||
| ```{code-cell} ipython3 | ||
| import bluesky.plans as bp | ||
| from ophyd.sim import det, motor | ||
| from bluesky.run_engine import RunEngine | ||
| from bluesky.callbacks.tiled_writer import TiledWriter | ||
| from tiled.client import from_uri | ||
| from pprint import pprint | ||
| ``` | ||
|
|
||
| ```{code-cell} ipython3 | ||
| # Start a local Tiled server | ||
| # The following is equivalent to 'tiled server catalog --temp --api-key=secret' | ||
|
|
||
| from tiled_server import TiledServer | ||
|
|
||
| server = TiledServer(port=8000, api_key='secret', dir_path='tiled_data') | ||
| server.run() | ||
| ``` | ||
|
|
||
| ```{code-cell} ipython3 | ||
| # Initialize a Tiled client | ||
|
|
||
| client = from_uri("http://localhost:8000", api_key="secret") | ||
| client | ||
| ``` | ||
|
|
||
| ```{code-cell} ipython3 | ||
| # Initialize RunEngine and subscribe it to TiledWriter | ||
|
|
||
| RE = RunEngine({}) | ||
| tw = TiledWriter(client) | ||
| RE.subscribe(tw) | ||
|
|
||
| # Keep the documents for monitoring/debug | ||
| docs = [] | ||
| RE.subscribe(lambda name, doc : docs.append( (name, doc) )) | ||
| ``` | ||
|
|
||
| ### Running the Acquisition | ||
|
|
||
| ```{code-cell} ipython3 | ||
| # Run the acquisition | ||
| docs.clear() | ||
| scan_id, = RE(bp.scan([det], motor, -5, 5, 10)) | ||
| print(f"Finished aquisition: {scan_id=}") | ||
| ``` | ||
|
|
||
| Executing the above cell would have produced a stream of Bluesky documents, which have been saved and can be inspected in the `docs` list. Specifically, the Descriptor (`docs[1]`) contains specification of all available `data_keys` produced by the experinment (values of detector, `det`, `motor` and `motor_setpoint`). | ||
|
|
||
| ```{code-cell} ipython3 | ||
| # Data specification from the Descriptor document | ||
| stream_name = docs[1][1]['name'] | ||
| print(f"Stream name: {stream_name}\n") | ||
| print("Specifications of recorded data:") | ||
| pprint(docs[1][1]['data_keys']) | ||
| ``` | ||
|
|
||
| The actual data values, along with their corresponding timestamps, are emmitted via Event documents (`docs[2:-1]`) | ||
|
|
||
| ```{code-cell} ipython3 | ||
| # Example of Event documents | ||
| pprint(docs[2:4]) | ||
| ``` | ||
|
|
||
| ### Accessing Data in Tiled | ||
|
|
||
| +++ | ||
|
|
||
| The data produced during the experiment has been ingested into Tiled and stored in a container under the correpoding scan uuid. | ||
|
|
||
| NOTE: The internal structure of the Bluesky container is not finzalized yet and will change in the future releases. | ||
|
|
||
| ```{code-cell} ipython3 | ||
| bs_run = client[scan_id] | ||
| bs_stream = bs_run[stream_name] | ||
| event_data = bs_stream['internal/events'] | ||
| event_data | ||
| ``` | ||
|
|
||
| It can now be accessed as a usual `pandas.DataFrame`: | ||
|
|
||
| ```{code-cell} ipython3 | ||
| event_data.read() | ||
| ``` | ||
|
|
||
| ### Reading Data Directly from File | ||
|
|
||
| ```{code-cell} ipython3 | ||
| import pandas as pd | ||
|
|
||
| data_fpath = f'./tiled_data/data/{scan_id}/{stream_name}/internal/events/partition-0.csv' | ||
| df = pd.read_csv(data_fpath) | ||
| df | ||
| ``` | ||
|
|
||
| ### Exploring the Tiled Catalog | ||
|
|
||
| ```{code-cell} ipython3 | ||
| import sqlite3 | ||
|
|
||
| db_fpath = f'./tiled_data/catalog.db' | ||
| con = sqlite3.connect(db_fpath) | ||
| cur = con.cursor() | ||
| ``` | ||
|
|
||
| ```{code-cell} ipython3 | ||
| res = cur.execute("SELECT name FROM sqlite_master WHERE type='table';") | ||
| res.fetchall() | ||
| ``` | ||
|
|
||
| ```{code-cell} ipython3 | ||
| res = cur.execute("PRAGMA table_info('assets');") | ||
| res.fetchall() | ||
| ``` | ||
|
|
||
| ```{code-cell} ipython3 | ||
| res = cur.execute("SELECT * FROM assets;") | ||
| res.fetchone() | ||
| ``` | ||
|
|
||
| ```{code-cell} ipython3 | ||
|
|
||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| import contextlib | ||
| import threading | ||
| import time | ||
| import uvicorn | ||
| import tempfile | ||
| import pathlib | ||
|
|
||
| from tiled.server.app import build_app | ||
| from tiled.catalog import from_uri as catalog_from_uri | ||
|
|
||
|
|
||
| class ThreadedServer(uvicorn.Server): | ||
|
|
||
| def install_signal_handlers(self): | ||
| pass | ||
|
|
||
| @contextlib.contextmanager | ||
| def run_in_thread(self): | ||
| thread = threading.Thread(target=self.run) | ||
| thread.start() | ||
| try: | ||
| # Wait for server to start up, or raise TimeoutError. | ||
| for _ in range(100): | ||
| time.sleep(0.1) | ||
| if self.started: | ||
| break | ||
| else: | ||
| raise TimeoutError("Server did not start in 10 seconds.") | ||
| host, port = self.servers[0].sockets[0].getsockname() | ||
| yield f"http://{host}:{port}" | ||
| finally: | ||
| self.should_exit = True | ||
| thread.join() | ||
|
|
||
|
|
||
| class TiledServer: | ||
|
|
||
| def __init__(self, port=8000, dir_path=None, api_key="secret"): | ||
| if dir_path is None: | ||
| dir_path = pathlib.Path(tempfile.TemporaryDirectory().name) | ||
| else: | ||
| dir_path = pathlib.Path(dir_path) | ||
| dir_path.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| self.catalog = catalog_from_uri( | ||
| dir_path / "catalog.db", | ||
| writable_storage=dir_path / "data", | ||
| init_if_not_exists=True, | ||
| ) | ||
| self.app = build_app(self.catalog, authentication={"single_user_api_key": api_key}) | ||
| self._cm = ThreadedServer(uvicorn.Config(self.app, port=port, loop="asyncio")).run_in_thread() | ||
|
|
||
| def run(self): | ||
| return self._cm.__enter__() | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.