Skip to content

Commit 47e823d

Browse files
authored
Merge pull request #77 from NREL/dt/convert-storage-generic
Make TimeSeriesStorage.convert_storage more generic
2 parents 776ea2f + 318cd97 commit 47e823d

6 files changed

+34
-57
lines changed

src/infrasys/arrow_storage.py

-12
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,6 @@ def _get_single_time_series(
139139
normalization=metadata.normalization,
140140
)
141141

142-
def get_raw_single_time_series(self, time_series_uuid: UUID) -> NDArray:
143-
fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}")
144-
with pa.OSFile(str(fpath), "r") as source: # type: ignore
145-
base_ts = pa.ipc.open_file(source).get_record_batch(0)
146-
logger.trace("Reading time series from {}", fpath)
147-
columns = base_ts.column_names
148-
if len(columns) != 1:
149-
msg = f"Bug: expected a single column: {columns=}"
150-
raise Exception(msg)
151-
column = columns[0]
152-
return base_ts[column].to_numpy()
153-
154142
def _convert_to_record_batch(self, time_series_array: NDArray, column: str) -> pa.RecordBatch:
155143
"""Create record batch to save array to disk."""
156144
pa_array = pa.array(time_series_array)

src/infrasys/in_memory_time_series_storage.py

+1-18
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@
22

33
from datetime import datetime
44
from pathlib import Path
5-
import numpy as np
65
from numpy.typing import NDArray
76
from typing import Optional, TypeAlias
87
from uuid import UUID
98

109
from loguru import logger
1110
from infrasys.arrow_storage import ArrowTimeSeriesStorage
1211

13-
from infrasys.exceptions import ISNotStored, ISOperationNotAllowed
12+
from infrasys.exceptions import ISNotStored
1413
from infrasys.time_series_models import (
1514
SingleTimeSeries,
1615
SingleTimeSeriesMetadata,
@@ -43,15 +42,6 @@ def add_time_series(self, metadata: TimeSeriesMetadata, time_series: TimeSeriesD
4342
msg = f"add_time_series not implemented for {type(time_series)}"
4443
raise NotImplementedError(msg)
4544

46-
def add_raw_single_time_series(
47-
self, time_series_uuid: UUID, time_series_data: DataStoreType
48-
) -> None:
49-
if time_series_uuid not in self._arrays:
50-
self._arrays[time_series_uuid] = time_series_data
51-
logger.debug("Added {} to store", time_series_uuid)
52-
else:
53-
logger.debug("{} was already stored", time_series_uuid)
54-
5545
def get_time_series(
5646
self,
5747
metadata: TimeSeriesMetadata,
@@ -62,13 +52,6 @@ def get_time_series(
6252
return self._get_single_time_series(metadata, start_time, length)
6353
raise NotImplementedError(str(metadata.get_time_series_data_type()))
6454

65-
def get_raw_single_time_series(self, time_series_uuid: UUID) -> NDArray:
66-
data_array = self._arrays[time_series_uuid]
67-
if not isinstance(data_array, np.ndarray):
68-
msg = f"Can't retrieve type: {type(data_array)} as single_time_series"
69-
raise ISOperationNotAllowed(msg)
70-
return data_array
71-
7255
def remove_time_series(self, uuid: UUID) -> None:
7356
time_series = self._arrays.pop(uuid, None)
7457
if time_series is None:

src/infrasys/time_series_manager.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,15 @@ def convert_storage(self, **kwargs) -> None:
308308
for time_series_uuid in self.metadata_store.unique_uuids_by_type(
309309
SingleTimeSeries.__name__
310310
):
311-
new_storage.add_raw_single_time_series(
312-
time_series_uuid, self._storage.get_raw_single_time_series(time_series_uuid)
311+
metadata = self.metadata_store.list_metadata_with_time_series_uuid(
312+
time_series_uuid, limit=1
313313
)
314+
if len(metadata) != 1:
315+
msg = f"Expected 1 metadata for {time_series_uuid}, got {len(metadata)}"
316+
raise Exception(msg)
317+
318+
time_series = self._storage.get_time_series(metadata[0])
319+
new_storage.add_time_series(metadata[0], time_series)
314320

315321
self._storage = new_storage
316322
return None

src/infrasys/time_series_metadata_store.py

+19
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,25 @@ def list_metadata(
288288
rows = execute(cur, query, params=params).fetchall()
289289
return [_deserialize_time_series_metadata(x[0]) for x in rows]
290290

291+
def list_metadata_with_time_series_uuid(
292+
self, time_series_uuid: UUID, limit: int | None = None
293+
) -> list[TimeSeriesMetadata]:
294+
"""Return metadata attached to the given time_series_uuid.
295+
296+
Parameters
297+
----------
298+
time_series_uuid
299+
The UUID of the time series.
300+
limit
301+
The maximum number of metadata to return. If None, all metadata are returned.
302+
"""
303+
params = (str(time_series_uuid),)
304+
limit_str = "" if limit is None else f"LIMIT {limit}"
305+
query = f"SELECT metadata FROM {self.TABLE_NAME} WHERE time_series_uuid = ? {limit_str}"
306+
cur = self._con.cursor()
307+
rows = execute(cur, query, params=params).fetchall()
308+
return [_deserialize_time_series_metadata(x[0]) for x in rows]
309+
291310
def _list_metadata_no_sql_json(
292311
self,
293312
*components: Component | SupplementalAttribute,

src/infrasys/time_series_storage_base.py

+1-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import abc
44
from datetime import datetime
55
from pathlib import Path
6-
from typing import Optional, Any
6+
from typing import Optional
77
from uuid import UUID
88

99
from infrasys.time_series_models import TimeSeriesData, TimeSeriesMetadata
@@ -16,14 +16,6 @@ class TimeSeriesStorageBase(abc.ABC):
1616
def add_time_series(self, metadata: TimeSeriesMetadata, time_series: TimeSeriesData) -> None:
1717
"""Store a time series array."""
1818

19-
@abc.abstractmethod
20-
def add_raw_single_time_series(self, time_series_uuid: UUID, time_series_data: Any) -> None:
21-
"""Store a time series array from raw data."""
22-
23-
@abc.abstractmethod
24-
def get_raw_single_time_series(self, time_series_uuid: UUID) -> Any:
25-
"""Get the raw time series data for a given uuid."""
26-
2719
@abc.abstractmethod
2820
def get_time_series_directory(self) -> Path | None:
2921
"""Return the directory containing time series files. Will be none for in-memory time

tests/test_in_memory_storage.py

+5-16
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,6 @@
88
import pytest
99

1010

11-
def get_data_and_uuids(system):
12-
uuids = system._time_series_mgr.metadata_store.unique_uuids_by_type(SingleTimeSeries.__name__)
13-
data = {
14-
uuid: system._time_series_mgr._storage.get_raw_single_time_series(uuid) for uuid in uuids
15-
}
16-
return uuids, data
17-
18-
1911
@pytest.mark.parametrize(
2012
"original_kwargs,new_kwargs,original_stype,new_stype",
2113
[
@@ -46,14 +38,11 @@ def test_memory_convert_storage_time_series(
4638
with pytest.raises(ISAlreadyAttached):
4739
system.add_time_series(test_time_series_data, test_generator)
4840

49-
original_uuids, original_data = get_data_and_uuids(system)
50-
5141
system.convert_storage(**new_kwargs)
5242

53-
assert isinstance(system._time_series_mgr._storage, new_stype)
54-
new_uuids, new_data = get_data_and_uuids(system)
43+
assert isinstance(system.time_series.storage, new_stype)
5544

56-
assert set(original_uuids) == set(new_uuids)
57-
58-
for uuid in new_uuids:
59-
assert np.array_equal(original_data[uuid], new_data[uuid])
45+
ts2 = system.get_time_series(
46+
test_generator, time_series_type=SingleTimeSeries, variable_name="load"
47+
)
48+
assert np.array_equal(ts2.data_array, test_time_series_data.data_array)

0 commit comments

Comments
 (0)