Skip to content

Added NonSequentialTimeSeries Model #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/infrasys/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .system import System
from .time_series_models import (
SingleTimeSeries,
NonSequentialTimeSeries,
TimeSeriesStorageType,
TimeSeriesKey,
SingleTimeSeriesKey,
Expand All @@ -26,6 +27,7 @@
"Location",
"NormalizationModel",
"SingleTimeSeries",
"NonSequentialTimeSeries",
"SingleTimeSeriesKey",
"SupplementalAttribute",
"System",
Expand Down
94 changes: 85 additions & 9 deletions src/infrasys/arrow_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path
from tempfile import mkdtemp
from typing import Any, Optional
from uuid import UUID
from functools import singledispatchmethod

import numpy as np
from numpy.typing import NDArray
Expand All @@ -17,6 +17,8 @@
from infrasys.time_series_models import (
SingleTimeSeries,
SingleTimeSeriesMetadata,
NonSequentialTimeSeries,
NonSequentialTimeSeriesMetadata,
TimeSeriesData,
TimeSeriesMetadata,
TimeSeriesStorageType,
Expand All @@ -31,6 +33,7 @@ class ArrowTimeSeriesStorage(TimeSeriesStorageBase):

def __init__(self, directory: Path) -> None:
self._ts_directory = directory
self._ts_metadata: str | None = None

@classmethod
def create_with_temp_directory(
Expand All @@ -57,16 +60,37 @@ def add_time_series(
time_series: TimeSeriesData,
connection: Any = None,
) -> None:
if isinstance(time_series, SingleTimeSeries):
self._add_single_time_series(metadata.time_series_uuid, time_series.data_array)
self._add_time_series(time_series)

@singledispatchmethod
def _add_time_series(self, time_series) -> None:
msg = f"Bug: need to implement add_time_series for {type(time_series)}"
raise NotImplementedError(msg)

@_add_time_series.register(SingleTimeSeries)
def _(self, time_series):
time_series_data = time_series.data_array
time_series_uuid = time_series.uuid
fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}")
if not fpath.exists():
arrow_batch = self._convert_to_record_batch_single_time_series(
time_series_data, str(time_series_uuid)
)
with pa.OSFile(str(fpath), "wb") as sink: # type: ignore
with pa.ipc.new_file(sink, arrow_batch.schema) as writer:
writer.write(arrow_batch)
logger.trace("Saving time series to {}", fpath)
logger.debug("Added {} to time series storage", time_series_uuid)
else:
msg = f"Bug: need to implement add_time_series for {type(time_series)}"
raise NotImplementedError(msg)
logger.debug("{} was already stored", time_series_uuid)

def _add_single_time_series(self, time_series_uuid: UUID, time_series_data: NDArray) -> None:
@_add_time_series.register(NonSequentialTimeSeries)
def _(self, time_series):
time_series_data = (time_series.data_array, time_series.timestamps_array)
time_series_uuid = time_series.uuid
fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}")
if not fpath.exists():
arrow_batch = self._convert_to_record_batch(time_series_data, str(time_series_uuid))
arrow_batch = self._convert_to_record_batch_nonsequential_time_series(time_series_data)
with pa.OSFile(str(fpath), "wb") as sink: # type: ignore
with pa.ipc.new_file(sink, arrow_batch.schema) as writer:
writer.write(arrow_batch)
Expand All @@ -87,6 +111,8 @@ def get_time_series(
metadata=metadata, start_time=start_time, length=length
)

elif isinstance(metadata, NonSequentialTimeSeriesMetadata):
return self._get_nonsequential_time_series(metadata=metadata)
msg = f"Bug: need to implement get_time_series for {type(metadata)}"
raise NotImplementedError(msg)

Expand Down Expand Up @@ -147,12 +173,62 @@ def _get_single_time_series(
normalization=metadata.normalization,
)

def _convert_to_record_batch(self, time_series_array: NDArray, column: str) -> pa.RecordBatch:
"""Create record batch to save array to disk."""
def _get_nonsequential_time_series(
self,
metadata: NonSequentialTimeSeriesMetadata,
) -> NonSequentialTimeSeries:
fpath = self._ts_directory.joinpath(f"{metadata.time_series_uuid}{EXTENSION}")
with pa.memory_map(str(fpath), "r") as source:
base_ts = pa.ipc.open_file(source).get_record_batch(0)
logger.trace("Reading time series from {}", fpath)
columns = base_ts.column_names
if len(columns) != 2:
msg = f"Bug: expected two columns: {columns=}"
raise Exception(msg)
data_column, timestamps_column = columns[0], columns[1]
data, timestamps = (
base_ts[data_column],
base_ts[timestamps_column],
)
if metadata.quantity_metadata is not None:
np_data_array = metadata.quantity_metadata.quantity_type(
data, metadata.quantity_metadata.units
)
else:
np_data_array = np.array(data)
np_time_array = np.array(timestamps).astype("O") # convert to datetime object
return NonSequentialTimeSeries(
uuid=metadata.time_series_uuid,
variable_name=metadata.variable_name,
data=np_data_array,
timestamps=np_time_array,
normalization=metadata.normalization,
)

def _convert_to_record_batch_single_time_series(
self, time_series_array: NDArray, column: str
) -> pa.RecordBatch:
"""Create record batch for SingleTimeSeries to save array to disk."""
pa_array = pa.array(time_series_array)
schema = pa.schema([pa.field(column, pa_array.type)])
return pa.record_batch([pa_array], schema=schema)

def _convert_to_record_batch_nonsequential_time_series(
self, time_series_array: tuple[NDArray, NDArray]
) -> pa.RecordBatch:
"""Create record batch for NonSequentialTimeSeries to save array to disk."""
data_array, timestamps_array = time_series_array
pa_data_array = pa.array(data_array)
pa_timestamps_array = pa.array(timestamps_array)

schema = pa.schema(
[
pa.field("data", pa_data_array.type),
pa.field("timestamp", pa_timestamps_array.type),
]
)
return pa.record_batch([pa_data_array, pa_timestamps_array], schema=schema)


def clean_tmp_folder(folder: Path | str) -> None:
shutil.rmtree(folder)
Expand Down
54 changes: 46 additions & 8 deletions src/infrasys/in_memory_time_series_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,32 @@

from datetime import datetime
from pathlib import Path
from typing import Any
from typing import Any, TypeAlias

from numpy.typing import NDArray
from typing import TypeAlias
from uuid import UUID

from loguru import logger

from infrasys.exceptions import ISNotStored
from infrasys.time_series_models import (
SingleTimeSeries,
SingleTimeSeriesMetadata,
NonSequentialTimeSeries,
NonSequentialTimeSeriesMetadata,
TimeSeriesData,
TimeSeriesMetadata,
)
from infrasys.time_series_storage_base import TimeSeriesStorageBase

DataStoreType: TypeAlias = NDArray
DataStoreType: TypeAlias = NDArray | tuple[NDArray, NDArray]


class InMemoryTimeSeriesStorage(TimeSeriesStorageBase):
"""Stores time series in memory."""

def __init__(self) -> None:
self._arrays: dict[UUID, DataStoreType] = {} # Time series UUID, not metadata UUID
self._ts_metadata_type: str | None = None

def get_time_series_directory(self) -> None:
return None
Expand All @@ -37,13 +38,20 @@ def add_time_series(
time_series: TimeSeriesData,
connection: Any = None,
) -> None:
if isinstance(time_series, SingleTimeSeries):
if isinstance(time_series, (SingleTimeSeries, NonSequentialTimeSeries)):
if metadata.time_series_uuid not in self._arrays:
self._arrays[metadata.time_series_uuid] = time_series.data_array
self._arrays[metadata.time_series_uuid] = (
(
time_series.data_array,
time_series.timestamps,
)
if hasattr(time_series, "timestamps")
else time_series.data_array
)
self._ts_metadata_type = metadata.type
logger.debug("Added {} to store", time_series.summary)
else:
logger.debug("{} was already stored", time_series.summary)

else:
msg = f"add_time_series not implemented for {type(time_series)}"
raise NotImplementedError(msg)
Expand All @@ -57,6 +65,8 @@ def get_time_series(
) -> TimeSeriesData:
if isinstance(metadata, SingleTimeSeriesMetadata):
return self._get_single_time_series(metadata, start_time, length)
elif isinstance(metadata, NonSequentialTimeSeriesMetadata):
return self._get_nonsequential_time_series(metadata)
raise NotImplementedError(str(metadata.get_time_series_data_type()))

def remove_time_series(self, metadata: TimeSeriesMetadata, connection: Any = None) -> None:
Expand All @@ -77,7 +87,8 @@ def _get_single_time_series(
start_time: datetime | None = None,
length: int | None = None,
) -> SingleTimeSeries:
ts_data = self._arrays.get(metadata.time_series_uuid)
ts_data: NDArray | None
ts_data = self._arrays.get(metadata.time_series_uuid) # type: ignore
if ts_data is None:
msg = f"No time series with {metadata.time_series_uuid} is stored"
raise ISNotStored(msg)
Expand All @@ -99,3 +110,30 @@ def _get_single_time_series(
data=ts_data,
normalization=metadata.normalization,
)

def _get_nonsequential_time_series(
self,
metadata: NonSequentialTimeSeriesMetadata,
) -> NonSequentialTimeSeries:
ts_data, ts_timestamps = self._arrays.get(metadata.time_series_uuid, (None, None))
if ts_data is None:
msg = f"No time series data with {metadata.time_series_uuid} is stored"
raise ISNotStored(msg)

if ts_timestamps is None:
msg = f"No time series timestamps with {metadata.time_series_uuid} is stored"
raise ISNotStored(msg)

if metadata.quantity_metadata is not None:
ts_data = metadata.quantity_metadata.quantity_type(
ts_data, metadata.quantity_metadata.units
)
assert ts_data is not None
assert ts_timestamps is not None
return NonSequentialTimeSeries(
uuid=metadata.time_series_uuid,
variable_name=metadata.variable_name,
data=ts_data,
timestamps=ts_timestamps,
normalization=metadata.normalization,
)
36 changes: 25 additions & 11 deletions src/infrasys/time_series_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
SingleTimeSeries,
SingleTimeSeriesKey,
SingleTimeSeriesMetadata,
NonSequentialTimeSeries,
NonSequentialTimeSeriesMetadata,
NonSequentialTimeSeriesKey,
TimeSeriesData,
TimeSeriesKey,
TimeSeriesMetadata,
Expand Down Expand Up @@ -477,18 +480,19 @@ def convert_storage(self, in_place: bool = True, **kwargs) -> TimeSeriesStorageB
The new storage instance.
"""
new_storage = self.create_new_storage(**kwargs)
for time_series_uuid in self.metadata_store.unique_uuids_by_type(
SingleTimeSeries.__name__
):
metadata = self.metadata_store.list_metadata_with_time_series_uuid(
time_series_uuid, limit=1
)
if len(metadata) != 1:
msg = f"Expected 1 metadata for {time_series_uuid}, got {len(metadata)}"
raise Exception(msg)
for time_series_type in (SingleTimeSeries, NonSequentialTimeSeries):
for time_series_uuid in self.metadata_store.unique_uuids_by_type(
time_series_type.__name__
):
metadata = self.metadata_store.list_metadata_with_time_series_uuid(
time_series_uuid, limit=1
)
if len(metadata) != 1:
msg = f"Expected 1 metadata for {time_series_uuid}, got {len(metadata)}"
raise Exception(msg)

time_series = self._storage.get_time_series(metadata[0])
new_storage.add_time_series(metadata[0], time_series)
time_series = self._storage.get_time_series(metadata[0])
new_storage.add_time_series(metadata[0], time_series)

if in_place:
self._storage = new_storage
Expand All @@ -514,6 +518,16 @@ def _(metadata: SingleTimeSeriesMetadata) -> TimeSeriesKey:
)


@make_time_series_key.register(NonSequentialTimeSeriesMetadata)
def _(metadata: NonSequentialTimeSeriesMetadata) -> TimeSeriesKey:
return NonSequentialTimeSeriesKey(
length=metadata.length,
user_attributes=metadata.user_attributes,
variable_name=metadata.variable_name,
time_series_type=NonSequentialTimeSeries,
)


def _get_data_connection(conn: DatabaseConnection | None) -> Any:
return None if conn is None else conn.data_conn

Expand Down
19 changes: 16 additions & 3 deletions src/infrasys/time_series_metadata_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
SerializedTypeMetadata,
TYPE_METADATA,
)
from infrasys.time_series_models import TimeSeriesMetadata
from infrasys.time_series_models import (
TimeSeriesMetadata,
SingleTimeSeriesMetadataBase,
NonSequentialTimeSeriesMetadataBase,
)
from infrasys.utils.sqlite import execute


Expand Down Expand Up @@ -104,13 +108,22 @@ def add(
msg = f"Time series with {metadata=} is already stored."
raise ISAlreadyAttached(msg)

if isinstance(metadata, SingleTimeSeriesMetadataBase):
resolution = str(metadata.resolution)
initial_time = str(metadata.initial_time)
elif isinstance(metadata, NonSequentialTimeSeriesMetadataBase):
resolution = None
initial_time = None
else:
raise NotImplementedError

rows = [
(
None, # auto-assigned by sqlite
str(metadata.time_series_uuid),
metadata.type,
str(metadata.initial_time),
str(metadata.resolution),
initial_time,
resolution,
metadata.variable_name,
str(owner.uuid),
owner.__class__.__name__,
Expand Down
Loading