Skip to content

Commit ea02654

Browse files
authored
Add chronify as time series storage option (#73)
* Add chronify as time series storage option
1 parent 118eb00 commit ea02654

12 files changed

+536
-14
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
- name: Install dependencies
2828
run: |
2929
python -m pip install --upgrade pip
30-
python -m pip install ".[dev]"
30+
python -m pip install ".[chronify,dev]"
3131
- name: Run pytest with coverage
3232
run: |
3333
pytest -v --cov --cov-report=xml

.github/workflows/gh-pages.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
- name: install dependencies
1717
run: |
1818
python -m pip install --upgrade pip
19-
python -m pip install ".[dev]"
19+
python -m pip install ".[chronify,dev]"
2020
- name: build documentation
2121
run: |
2222
cd docs

pyproject.toml

+5
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,22 @@ classifiers = [
2929
dependencies = [
3030
"loguru~=0.7.2",
3131
"numpy >= 2, < 3",
32+
"pandas >= 2, < 3",
3233
"pint~=0.23",
3334
"pyarrow~=19.0",
3435
"pydantic >= 2.7, < 3",
3536
"rich~=13.7.1",
3637
]
3738
[project.optional-dependencies]
39+
chronify = [
40+
"chronify ~= 0.2.3",
41+
]
3842
dev = [
3943
"autodoc_pydantic~=2.0",
4044
"furo",
4145
"mypy >=1.13, < 2",
4246
"myst_parser",
47+
"pandas-stubs",
4348
"pre-commit",
4449
"pyarrow-stubs",
4550
"pytest",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
"""Implementation of arrow storage for time series."""
2+
3+
import atexit
4+
from contextlib import contextmanager
5+
from datetime import datetime, timedelta
6+
from functools import singledispatch
7+
from pathlib import Path
8+
from tempfile import NamedTemporaryFile
9+
from typing import Any, Generator, Self
10+
from uuid import UUID
11+
12+
import pandas as pd
13+
import pint
14+
from chronify import DatetimeRange, Store, TableSchema
15+
from loguru import logger
16+
from sqlalchemy import Connection
17+
18+
from infrasys.exceptions import ISFileExists, ISInvalidParameter
19+
from infrasys.id_manager import IDManager
20+
from infrasys.time_series_models import (
21+
SingleTimeSeries,
22+
SingleTimeSeriesKey,
23+
SingleTimeSeriesMetadata,
24+
TimeSeriesData,
25+
TimeSeriesKey,
26+
TimeSeriesMetadata,
27+
TimeSeriesStorageType,
28+
)
29+
from infrasys.time_series_storage_base import TimeSeriesStorageBase
30+
from infrasys.utils.path_utils import delete_if_exists
31+
32+
33+
_SINGLE_TIME_SERIES_BASE_NAME = "single_time_series"
34+
_TIME_SERIES_FILENAME = "time_series_data.db"
35+
36+
37+
class ChronifyTimeSeriesStorage(TimeSeriesStorageBase):
38+
"""Stores time series in a chronfiy database."""
39+
40+
def __init__(
41+
self,
42+
store: Store,
43+
id_manager: IDManager,
44+
read_only: bool = False,
45+
uuid_lookup: dict[UUID, int] | None = None,
46+
) -> None:
47+
self._store = store
48+
self._read_only = read_only
49+
# infrasys currently uses UUIDs as unique identifies for components and time series.
50+
# Those will eventually use integer IDs instead.
51+
# We don't want to store UUIDs in the chronify database.
52+
# Integer IDs are much smaller and faster for search.
53+
# Manage a mapping of UUIDs to integer IDs until we can remove UUIDs (#80).
54+
self._uuid_lookup: dict[UUID, int] = uuid_lookup or {}
55+
self._id_manager = id_manager
56+
57+
@classmethod
58+
def create_with_temp_directory(
59+
cls,
60+
base_directory: Path | None = None,
61+
engine_name: str = "duckdb",
62+
read_only: bool = False,
63+
) -> Self:
64+
"""Construct ChronifyTimeSeriesStorage with a temporary directory."""
65+
with NamedTemporaryFile(dir=base_directory, suffix=".db") as f:
66+
dst_file = Path(f.name)
67+
logger.debug("Creating database at {}", dst_file)
68+
atexit.register(delete_if_exists, dst_file)
69+
store = Store(engine_name=engine_name, file_path=dst_file)
70+
id_manager = IDManager(next_id=1)
71+
return cls(store, id_manager, read_only=read_only)
72+
73+
@classmethod
74+
def create_with_permanent_directory(
75+
cls,
76+
base_directory: Path,
77+
engine_name: str = "duckdb",
78+
read_only: bool = False,
79+
) -> Self:
80+
"""Construct ChronifyTimeSeriesStorage with a permanent directory."""
81+
dst_file = base_directory / _TIME_SERIES_FILENAME
82+
if dst_file.exists():
83+
msg = f"time series database already exists: {dst_file}"
84+
raise ISFileExists(msg)
85+
logger.debug("Creating database at {}", dst_file)
86+
store = Store(engine_name=engine_name, file_path=dst_file)
87+
id_manager = IDManager(next_id=1)
88+
return cls(store, id_manager, read_only=read_only)
89+
90+
@classmethod
91+
def from_file_to_tmp_file(
92+
cls,
93+
data: dict[str, Any],
94+
dst_dir: Path | None = None,
95+
read_only: bool = False,
96+
) -> Self:
97+
"""Construct ChronifyTimeSeriesStorage after copying from an existing database file."""
98+
id_manager, uuid_lookup = cls._deserialize_ids(data)
99+
with NamedTemporaryFile(dir=dst_dir, suffix=".db") as f:
100+
dst_file = Path(f.name)
101+
orig_store = Store(engine_name=data["engine_name"], file_path=data["filename"])
102+
orig_store.backup(dst_file)
103+
new_store = Store(engine_name=data["engine_name"], file_path=dst_file)
104+
atexit.register(delete_if_exists, dst_file)
105+
return cls(new_store, id_manager, read_only=read_only, uuid_lookup=uuid_lookup)
106+
107+
@classmethod
108+
def from_file(cls, data: dict[str, Any], read_only: bool = False) -> Self:
109+
"""Construct ChronifyTimeSeriesStorage with an existing database file."""
110+
id_manager, uuid_lookup = cls._deserialize_ids(data)
111+
store = Store(engine_name=data["engine_name"], file_path=Path(data["filename"]))
112+
return cls(store, id_manager, read_only=read_only, uuid_lookup=uuid_lookup)
113+
114+
@staticmethod
115+
def _deserialize_ids(data: dict[str, Any]) -> tuple[IDManager, dict[UUID, int]]:
116+
uuid_lookup: dict[UUID, int] = {}
117+
max_id = 0
118+
for key, val in data["uuid_lookup"].items():
119+
uuid_lookup[UUID(key)] = val
120+
if val > max_id:
121+
max_id = val
122+
id_manager = IDManager(next_id=max_id + 1)
123+
return id_manager, uuid_lookup
124+
125+
def get_database_url(self) -> str:
126+
"""Return the path to the underlying database."""
127+
assert self._store.engine.url.database is not None
128+
# We don't expect to use an in-memory db.
129+
return self._store.engine.url.database
130+
131+
def get_time_series_directory(self) -> Path:
132+
assert self._store.engine.url.database is not None
133+
return Path(self._store.engine.url.database).parent
134+
135+
def add_time_series(
136+
self,
137+
metadata: TimeSeriesMetadata,
138+
time_series: TimeSeriesData,
139+
connection: Connection | None = None,
140+
) -> None:
141+
if not isinstance(time_series, SingleTimeSeries):
142+
msg = f"Bug: need to implement add_time_series for {type(time_series)}"
143+
raise NotImplementedError(msg)
144+
145+
if time_series.uuid in self._uuid_lookup:
146+
msg = f"Bug: time series {time_series.uuid} already stored"
147+
raise Exception(msg)
148+
149+
db_id = self._id_manager.get_next_id()
150+
df = self._to_dataframe(time_series, db_id)
151+
schema = _make_table_schema(time_series, _get_table_name(time_series))
152+
# There is no reason to run time checks because we are generating the timestamps
153+
# from initial_time, resolution, and length, so they are guaranteed to be correct.
154+
self._store.ingest_table(df, schema, connection=connection, skip_time_checks=False)
155+
self._uuid_lookup[time_series.uuid] = db_id
156+
logger.debug("Added {} to time series storage", time_series.summary)
157+
158+
def check_timestamps(self, key: TimeSeriesKey, connection: Connection | None = None) -> None:
159+
table_name = _get_table_name(key)
160+
self._store.check_timestamps(table_name, connection=connection)
161+
162+
def get_engine_name(self) -> str:
163+
"""Return the name of the underlying database engine."""
164+
return self._store.engine.name
165+
166+
def get_time_series(
167+
self,
168+
metadata: TimeSeriesMetadata,
169+
start_time: datetime | None = None,
170+
length: int | None = None,
171+
connection: Connection | None = None,
172+
) -> Any:
173+
if isinstance(metadata, SingleTimeSeriesMetadata):
174+
return self._get_single_time_series(
175+
metadata=metadata,
176+
start_time=start_time,
177+
length=length,
178+
connection=connection,
179+
)
180+
181+
msg = f"Bug: need to implement get_time_series for {type(metadata)}"
182+
raise NotImplementedError(msg)
183+
184+
def remove_time_series(
185+
self, metadata: TimeSeriesMetadata, connection: Connection | None = None
186+
) -> None:
187+
db_id = self._get_db_id(metadata.time_series_uuid)
188+
table_name = _get_table_name(metadata)
189+
num_deleted = self._store.delete_rows(table_name, {"id": db_id}, connection=connection)
190+
if num_deleted < 1:
191+
msg = f"Failed to delete rows in the chronfiy database for {metadata.time_series_uuid}"
192+
raise ISInvalidParameter(msg)
193+
194+
def serialize(
195+
self, data: dict[str, Any], dst: Path | str, src: Path | str | None = None
196+
) -> None:
197+
ts_dir = dst if isinstance(dst, Path) else Path(dst)
198+
path = ts_dir / "time_series_data.db"
199+
assert not path.exists(), path
200+
self._store.backup(path)
201+
data["filename"] = str(path)
202+
data["time_series_storage_type"] = TimeSeriesStorageType.CHRONIFY.value
203+
data["engine_name"] = self._store.engine.name
204+
data["uuid_lookup"] = {str(k): v for k, v in self._uuid_lookup.items()}
205+
206+
def _get_single_time_series(
207+
self,
208+
metadata: SingleTimeSeriesMetadata,
209+
start_time: datetime | None = None,
210+
length: int | None = None,
211+
connection: Connection | None = None,
212+
) -> SingleTimeSeries:
213+
table_name = _get_table_name(metadata)
214+
db_id = self._get_db_id(metadata.time_series_uuid)
215+
_, required_len = metadata.get_range(start_time=start_time, length=length)
216+
where_clauses = ["id = ?"]
217+
params: list[Any] = [db_id]
218+
if start_time is not None:
219+
where_clauses.append("timestamp >= ?")
220+
params.append(start_time)
221+
where_clause = " AND ".join(where_clauses)
222+
limit = "" if length is None else f" LIMIT {required_len}"
223+
query = f"""
224+
SELECT timestamp, value
225+
FROM {table_name}
226+
WHERE {where_clause}
227+
ORDER BY timestamp
228+
{limit}
229+
"""
230+
df = self._store.read_query(
231+
table_name,
232+
query,
233+
params=tuple(params),
234+
connection=connection,
235+
)
236+
if len(df) != required_len:
237+
msg = f"Bug: {len(df)=} {length=} {required_len=}"
238+
raise Exception(msg)
239+
values = df["value"].values
240+
if metadata.quantity_metadata is not None:
241+
np_array = metadata.quantity_metadata.quantity_type(
242+
values, metadata.quantity_metadata.units
243+
)
244+
else:
245+
np_array = values
246+
return SingleTimeSeries(
247+
uuid=metadata.time_series_uuid,
248+
variable_name=metadata.variable_name,
249+
resolution=metadata.resolution,
250+
initial_time=start_time or metadata.initial_time,
251+
data=np_array,
252+
normalization=metadata.normalization,
253+
)
254+
255+
@contextmanager
256+
def open_time_series_store(self) -> Generator[Connection, None, None]:
257+
with self._store.engine.begin() as conn:
258+
yield conn
259+
260+
def _to_dataframe(self, time_series: SingleTimeSeries, db_id: int) -> pd.DataFrame:
261+
if isinstance(time_series.data, pint.Quantity):
262+
array = time_series.data.magnitude
263+
else:
264+
array = time_series.data
265+
df = pd.DataFrame({"timestamp": time_series.make_timestamps(), "value": array})
266+
df["id"] = db_id
267+
return df
268+
269+
def _get_db_id(self, time_series_uuid: UUID) -> int:
270+
db_id = self._uuid_lookup.get(time_series_uuid)
271+
if db_id is None:
272+
msg = f"Bug: time series {time_series_uuid} not stored"
273+
raise Exception(msg)
274+
return db_id
275+
276+
277+
@singledispatch
278+
def _get_table_name(time_series) -> str:
279+
msg = f"Bug: {type(time_series)}"
280+
raise NotImplementedError(msg)
281+
282+
283+
@_get_table_name.register(SingleTimeSeries)
284+
def _(time_series) -> str:
285+
return _get_single_time_series_table_name(
286+
time_series.initial_time, time_series.resolution, time_series.length
287+
)
288+
289+
290+
@_get_table_name.register(SingleTimeSeriesMetadata)
291+
def _(metadata) -> str:
292+
return _get_single_time_series_table_name(
293+
metadata.initial_time, metadata.resolution, metadata.length
294+
)
295+
296+
297+
@_get_table_name.register(SingleTimeSeriesKey)
298+
def _(key) -> str:
299+
return _get_single_time_series_table_name(key.initial_time, key.resolution, key.length)
300+
301+
302+
def _get_single_time_series_table_name(
303+
initial_time: datetime,
304+
resolution: timedelta,
305+
length: int,
306+
) -> str:
307+
return "_".join(
308+
(
309+
_SINGLE_TIME_SERIES_BASE_NAME,
310+
initial_time.isoformat().replace("-", "_").replace(":", "_"),
311+
str(resolution.seconds),
312+
str(length),
313+
)
314+
)
315+
316+
317+
@singledispatch
318+
def _get_table_base_name(time_series) -> str:
319+
msg = "Bug: need to implement _get_table_base_name for {type(time_series)}"
320+
raise NotImplementedError(msg)
321+
322+
323+
@_get_table_base_name.register(SingleTimeSeries)
324+
def _(time_series: SingleTimeSeries) -> str:
325+
return _SINGLE_TIME_SERIES_BASE_NAME
326+
327+
328+
@singledispatch
329+
def _make_time_config(time_series) -> Any:
330+
msg = "Bug: need to implement _make_time_config for {type(time_series)}"
331+
raise NotImplementedError(msg)
332+
333+
334+
@_make_time_config.register(SingleTimeSeries)
335+
def _(time_series: SingleTimeSeries) -> DatetimeRange:
336+
return DatetimeRange(
337+
start=time_series.initial_time,
338+
resolution=time_series.resolution,
339+
length=len(time_series.data),
340+
time_column="timestamp",
341+
)
342+
343+
344+
def _make_table_schema(time_series: TimeSeriesData, table_name: str) -> TableSchema:
345+
return TableSchema(
346+
name=table_name,
347+
value_column="value",
348+
time_array_id_columns=["id"],
349+
time_config=_make_time_config(time_series),
350+
)

0 commit comments

Comments
 (0)