diff --git a/src/infrasys/__init__.py b/src/infrasys/__init__.py index bac6ac6..f38a191 100644 --- a/src/infrasys/__init__.py +++ b/src/infrasys/__init__.py @@ -13,6 +13,7 @@ from .system import System from .time_series_models import ( SingleTimeSeries, + NonSequentialTimeSeries, TimeSeriesStorageType, TimeSeriesKey, SingleTimeSeriesKey, @@ -26,6 +27,7 @@ "Location", "NormalizationModel", "SingleTimeSeries", + "NonSequentialTimeSeries", "SingleTimeSeriesKey", "SupplementalAttribute", "System", diff --git a/src/infrasys/arrow_storage.py b/src/infrasys/arrow_storage.py index 857598b..89705f4 100644 --- a/src/infrasys/arrow_storage.py +++ b/src/infrasys/arrow_storage.py @@ -6,7 +6,7 @@ from pathlib import Path from tempfile import mkdtemp from typing import Any, Optional -from uuid import UUID +from functools import singledispatchmethod import numpy as np from numpy.typing import NDArray @@ -17,6 +17,8 @@ from infrasys.time_series_models import ( SingleTimeSeries, SingleTimeSeriesMetadata, + NonSequentialTimeSeries, + NonSequentialTimeSeriesMetadata, TimeSeriesData, TimeSeriesMetadata, TimeSeriesStorageType, @@ -31,6 +33,7 @@ class ArrowTimeSeriesStorage(TimeSeriesStorageBase): def __init__(self, directory: Path) -> None: self._ts_directory = directory + self._ts_metadata: str | None = None @classmethod def create_with_temp_directory( @@ -57,16 +60,37 @@ def add_time_series( time_series: TimeSeriesData, connection: Any = None, ) -> None: - if isinstance(time_series, SingleTimeSeries): - self._add_single_time_series(metadata.time_series_uuid, time_series.data_array) + self._add_time_series(time_series) + + @singledispatchmethod + def _add_time_series(self, time_series) -> None: + msg = f"Bug: need to implement add_time_series for {type(time_series)}" + raise NotImplementedError(msg) + + @_add_time_series.register(SingleTimeSeries) + def _(self, time_series): + time_series_data = time_series.data_array + time_series_uuid = time_series.uuid + fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}") + if not fpath.exists(): + arrow_batch = self._convert_to_record_batch_single_time_series( + time_series_data, str(time_series_uuid) + ) + with pa.OSFile(str(fpath), "wb") as sink: # type: ignore + with pa.ipc.new_file(sink, arrow_batch.schema) as writer: + writer.write(arrow_batch) + logger.trace("Saving time series to {}", fpath) + logger.debug("Added {} to time series storage", time_series_uuid) else: - msg = f"Bug: need to implement add_time_series for {type(time_series)}" - raise NotImplementedError(msg) + logger.debug("{} was already stored", time_series_uuid) - def _add_single_time_series(self, time_series_uuid: UUID, time_series_data: NDArray) -> None: + @_add_time_series.register(NonSequentialTimeSeries) + def _(self, time_series): + time_series_data = (time_series.data_array, time_series.timestamps_array) + time_series_uuid = time_series.uuid fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}") if not fpath.exists(): - arrow_batch = self._convert_to_record_batch(time_series_data, str(time_series_uuid)) + arrow_batch = self._convert_to_record_batch_nonsequential_time_series(time_series_data) with pa.OSFile(str(fpath), "wb") as sink: # type: ignore with pa.ipc.new_file(sink, arrow_batch.schema) as writer: writer.write(arrow_batch) @@ -87,6 +111,8 @@ def get_time_series( metadata=metadata, start_time=start_time, length=length ) + elif isinstance(metadata, NonSequentialTimeSeriesMetadata): + return self._get_nonsequential_time_series(metadata=metadata) msg = f"Bug: need to implement get_time_series for {type(metadata)}" raise NotImplementedError(msg) @@ -147,12 +173,62 @@ def _get_single_time_series( normalization=metadata.normalization, ) - def _convert_to_record_batch(self, time_series_array: NDArray, column: str) -> pa.RecordBatch: - """Create record batch to save array to disk.""" + def _get_nonsequential_time_series( + self, + metadata: NonSequentialTimeSeriesMetadata, + ) -> NonSequentialTimeSeries: + fpath = self._ts_directory.joinpath(f"{metadata.time_series_uuid}{EXTENSION}") + with pa.memory_map(str(fpath), "r") as source: + base_ts = pa.ipc.open_file(source).get_record_batch(0) + logger.trace("Reading time series from {}", fpath) + columns = base_ts.column_names + if len(columns) != 2: + msg = f"Bug: expected two columns: {columns=}" + raise Exception(msg) + data_column, timestamps_column = columns[0], columns[1] + data, timestamps = ( + base_ts[data_column], + base_ts[timestamps_column], + ) + if metadata.quantity_metadata is not None: + np_data_array = metadata.quantity_metadata.quantity_type( + data, metadata.quantity_metadata.units + ) + else: + np_data_array = np.array(data) + np_time_array = np.array(timestamps).astype("O") # convert to datetime object + return NonSequentialTimeSeries( + uuid=metadata.time_series_uuid, + variable_name=metadata.variable_name, + data=np_data_array, + timestamps=np_time_array, + normalization=metadata.normalization, + ) + + def _convert_to_record_batch_single_time_series( + self, time_series_array: NDArray, column: str + ) -> pa.RecordBatch: + """Create record batch for SingleTimeSeries to save array to disk.""" pa_array = pa.array(time_series_array) schema = pa.schema([pa.field(column, pa_array.type)]) return pa.record_batch([pa_array], schema=schema) + def _convert_to_record_batch_nonsequential_time_series( + self, time_series_array: tuple[NDArray, NDArray] + ) -> pa.RecordBatch: + """Create record batch for NonSequentialTimeSeries to save array to disk.""" + data_array, timestamps_array = time_series_array + pa_data_array = pa.array(data_array) + pa_timestamps_array = pa.array(timestamps_array) + + schema = pa.schema( + [ + pa.field("data", pa_data_array.type), + pa.field("timestamp", pa_timestamps_array.type), + ] + ) + return pa.record_batch([pa_data_array, pa_timestamps_array], schema=schema) + def clean_tmp_folder(folder: Path | str) -> None: shutil.rmtree(folder) diff --git a/src/infrasys/in_memory_time_series_storage.py b/src/infrasys/in_memory_time_series_storage.py index 76be811..a777464 100644 --- a/src/infrasys/in_memory_time_series_storage.py +++ b/src/infrasys/in_memory_time_series_storage.py @@ -2,24 +2,24 @@ from datetime import datetime from pathlib import Path -from typing import Any +from typing import Any, TypeAlias from numpy.typing import NDArray -from typing import TypeAlias from uuid import UUID - from loguru import logger from infrasys.exceptions import ISNotStored from infrasys.time_series_models import ( SingleTimeSeries, SingleTimeSeriesMetadata, + NonSequentialTimeSeries, + NonSequentialTimeSeriesMetadata, TimeSeriesData, TimeSeriesMetadata, ) from infrasys.time_series_storage_base import TimeSeriesStorageBase -DataStoreType: TypeAlias = NDArray +DataStoreType: TypeAlias = NDArray | tuple[NDArray, NDArray] class InMemoryTimeSeriesStorage(TimeSeriesStorageBase): @@ -27,6 +27,7 @@ class InMemoryTimeSeriesStorage(TimeSeriesStorageBase): def __init__(self) -> None: self._arrays: dict[UUID, DataStoreType] = {} # Time series UUID, not metadata UUID + self._ts_metadata_type: str | None = None def get_time_series_directory(self) -> None: return None @@ -37,13 +38,20 @@ def add_time_series( time_series: TimeSeriesData, connection: Any = None, ) -> None: - if isinstance(time_series, SingleTimeSeries): + if isinstance(time_series, (SingleTimeSeries, NonSequentialTimeSeries)): if metadata.time_series_uuid not in self._arrays: - self._arrays[metadata.time_series_uuid] = time_series.data_array + self._arrays[metadata.time_series_uuid] = ( + ( + time_series.data_array, + time_series.timestamps, + ) + if hasattr(time_series, "timestamps") + else time_series.data_array + ) + self._ts_metadata_type = metadata.type logger.debug("Added {} to store", time_series.summary) else: logger.debug("{} was already stored", time_series.summary) - else: msg = f"add_time_series not implemented for {type(time_series)}" raise NotImplementedError(msg) @@ -57,6 +65,8 @@ def get_time_series( ) -> TimeSeriesData: if isinstance(metadata, SingleTimeSeriesMetadata): return self._get_single_time_series(metadata, start_time, length) + elif isinstance(metadata, NonSequentialTimeSeriesMetadata): + return self._get_nonsequential_time_series(metadata) raise NotImplementedError(str(metadata.get_time_series_data_type())) def remove_time_series(self, metadata: TimeSeriesMetadata, connection: Any = None) -> None: @@ -77,7 +87,8 @@ def _get_single_time_series( start_time: datetime | None = None, length: int | None = None, ) -> SingleTimeSeries: - ts_data = self._arrays.get(metadata.time_series_uuid) + ts_data: NDArray | None + ts_data = self._arrays.get(metadata.time_series_uuid) # type: ignore if ts_data is None: msg = f"No time series with {metadata.time_series_uuid} is stored" raise ISNotStored(msg) @@ -99,3 +110,30 @@ def _get_single_time_series( data=ts_data, normalization=metadata.normalization, ) + + def _get_nonsequential_time_series( + self, + metadata: NonSequentialTimeSeriesMetadata, + ) -> NonSequentialTimeSeries: + ts_data, ts_timestamps = self._arrays.get(metadata.time_series_uuid, (None, None)) + if ts_data is None: + msg = f"No time series data with {metadata.time_series_uuid} is stored" + raise ISNotStored(msg) + + if ts_timestamps is None: + msg = f"No time series timestamps with {metadata.time_series_uuid} is stored" + raise ISNotStored(msg) + + if metadata.quantity_metadata is not None: + ts_data = metadata.quantity_metadata.quantity_type( + ts_data, metadata.quantity_metadata.units + ) + assert ts_data is not None + assert ts_timestamps is not None + return NonSequentialTimeSeries( + uuid=metadata.time_series_uuid, + variable_name=metadata.variable_name, + data=ts_data, + timestamps=ts_timestamps, + normalization=metadata.normalization, + ) diff --git a/src/infrasys/time_series_manager.py b/src/infrasys/time_series_manager.py index 60d84aa..412ba8d 100644 --- a/src/infrasys/time_series_manager.py +++ b/src/infrasys/time_series_manager.py @@ -20,6 +20,9 @@ SingleTimeSeries, SingleTimeSeriesKey, SingleTimeSeriesMetadata, + NonSequentialTimeSeries, + NonSequentialTimeSeriesMetadata, + NonSequentialTimeSeriesKey, TimeSeriesData, TimeSeriesKey, TimeSeriesMetadata, @@ -477,18 +480,19 @@ def convert_storage(self, in_place: bool = True, **kwargs) -> TimeSeriesStorageB The new storage instance. """ new_storage = self.create_new_storage(**kwargs) - for time_series_uuid in self.metadata_store.unique_uuids_by_type( - SingleTimeSeries.__name__ - ): - metadata = self.metadata_store.list_metadata_with_time_series_uuid( - time_series_uuid, limit=1 - ) - if len(metadata) != 1: - msg = f"Expected 1 metadata for {time_series_uuid}, got {len(metadata)}" - raise Exception(msg) + for time_series_type in (SingleTimeSeries, NonSequentialTimeSeries): + for time_series_uuid in self.metadata_store.unique_uuids_by_type( + time_series_type.__name__ + ): + metadata = self.metadata_store.list_metadata_with_time_series_uuid( + time_series_uuid, limit=1 + ) + if len(metadata) != 1: + msg = f"Expected 1 metadata for {time_series_uuid}, got {len(metadata)}" + raise Exception(msg) - time_series = self._storage.get_time_series(metadata[0]) - new_storage.add_time_series(metadata[0], time_series) + time_series = self._storage.get_time_series(metadata[0]) + new_storage.add_time_series(metadata[0], time_series) if in_place: self._storage = new_storage @@ -514,6 +518,16 @@ def _(metadata: SingleTimeSeriesMetadata) -> TimeSeriesKey: ) +@make_time_series_key.register(NonSequentialTimeSeriesMetadata) +def _(metadata: NonSequentialTimeSeriesMetadata) -> TimeSeriesKey: + return NonSequentialTimeSeriesKey( + length=metadata.length, + user_attributes=metadata.user_attributes, + variable_name=metadata.variable_name, + time_series_type=NonSequentialTimeSeries, + ) + + def _get_data_connection(conn: DatabaseConnection | None) -> Any: return None if conn is None else conn.data_conn diff --git a/src/infrasys/time_series_metadata_store.py b/src/infrasys/time_series_metadata_store.py index e054e8a..827fb78 100644 --- a/src/infrasys/time_series_metadata_store.py +++ b/src/infrasys/time_series_metadata_store.py @@ -19,7 +19,11 @@ SerializedTypeMetadata, TYPE_METADATA, ) -from infrasys.time_series_models import TimeSeriesMetadata +from infrasys.time_series_models import ( + TimeSeriesMetadata, + SingleTimeSeriesMetadataBase, + NonSequentialTimeSeriesMetadataBase, +) from infrasys.utils.sqlite import execute @@ -104,13 +108,22 @@ def add( msg = f"Time series with {metadata=} is already stored." raise ISAlreadyAttached(msg) + if isinstance(metadata, SingleTimeSeriesMetadataBase): + resolution = str(metadata.resolution) + initial_time = str(metadata.initial_time) + elif isinstance(metadata, NonSequentialTimeSeriesMetadataBase): + resolution = None + initial_time = None + else: + raise NotImplementedError + rows = [ ( None, # auto-assigned by sqlite str(metadata.time_series_uuid), metadata.type, - str(metadata.initial_time), - str(metadata.resolution), + initial_time, + resolution, metadata.variable_name, str(owner.uuid), owner.__class__.__name__, diff --git a/src/infrasys/time_series_models.py b/src/infrasys/time_series_models.py index bdecbbf..39dc571 100644 --- a/src/infrasys/time_series_models.py +++ b/src/infrasys/time_series_models.py @@ -261,13 +261,11 @@ class TimeSeriesMetadata(InfraSysBaseModel, abc.ABC): """Defines common metadata for all time series.""" variable_name: str - initial_time: datetime - resolution: timedelta time_series_uuid: UUID user_attributes: dict[str, Any] = {} quantity_metadata: Optional[QuantityMetadata] = None normalization: NormalizationModel = None - type: Literal["SingleTimeSeries", "SingleTimeSeriesScalingFactor"] + type: Literal["SingleTimeSeries", "SingleTimeSeriesScalingFactor", "NonSequentialTimeSeries"] @property def label(self) -> str: @@ -290,6 +288,8 @@ class SingleTimeSeriesMetadataBase(TimeSeriesMetadata, abc.ABC): """Base class for SingleTimeSeries metadata.""" length: int + initial_time: datetime + resolution: timedelta type: Literal["SingleTimeSeries", "SingleTimeSeriesScalingFactor"] @classmethod @@ -380,13 +380,180 @@ def get_time_series_type_str() -> str: ] +class NonSequentialTimeSeries(TimeSeriesData): + """Defines a non-sequential time array with a single dimension of floats.""" + + data: NDArray | pint.Quantity + timestamps: NDArray + + @computed_field + def length(self) -> int: + """Return the length of the data.""" + return len(self.data) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, NonSequentialTimeSeries): + raise NotImplementedError + is_equal = True + for field in self.model_fields_set: + if field == "data": + if not (self.data == other.data).all(): + is_equal = False + break + elif field == "timestamps": + if not all(t1 == t2 for t1, t2 in zip(self.timestamps, other.timestamps)): + is_equal = False + break + else: + if not getattr(self, field) == getattr(other, field): + is_equal = False + break + return is_equal + + @field_validator("data", mode="before") + @classmethod + def check_data(cls, data) -> NDArray | pint.Quantity: + """Check time series data.""" + if len(data) < 2: + msg = f"NonSequentialTimeSeries length must be at least 2: {len(data)}" + raise ValueError(msg) + + if isinstance(data, pint.Quantity): + if not isinstance(data.magnitude, np.ndarray): + return type(data)(np.array(data.magnitude), units=data.units) + return data + + if not isinstance(data, np.ndarray): + return np.array(data) + + return data + + @field_validator("timestamps", mode="before") + @classmethod + def check_timestamp(cls, timestamps: Sequence[datetime] | NDArray) -> NDArray: + """Check non-sequential timestamps.""" + if len(timestamps) < 2: + msg = f"Time index must have at least 2 timestamps: {len(timestamps)}" + raise ValueError(msg) + + if len(timestamps) != len(set(timestamps)): + msg = "Duplicate timestamps found. Timestamps must be unique." + raise ValueError(msg) + + time_array = np.array(timestamps, dtype="datetime64[ns]") + if not np.all(np.diff(time_array) > np.timedelta64(0, "s")): + msg = "Timestamps must be in chronological order." + raise ValueError(msg) + + if not isinstance(timestamps, np.ndarray): + return np.array(timestamps) + + return timestamps + + @classmethod + def from_array( + cls, + data: ISArray, + timestamps: Sequence[datetime] | NDArray, + variable_name: str, + normalization: NormalizationModel = None, + ) -> "NonSequentialTimeSeries": + """Method of NonSequentialTimeSeries that creates an instance from an array and timestamps. + + Parameters + ---------- + data + Sequence that contains the values of the time series + timestamps + Sequence that contains the non-sequential timestamps + variable_name + Name assigned to the values of the time series (e.g., active_power) + normalization + Normalization model to normalize the data + + Returns + ------- + NonSequentialTimeSeries + """ + if normalization is not None: + npa = data if isinstance(data, np.ndarray) else np.array(data) + data = normalization.normalize_array(npa) + + return NonSequentialTimeSeries( + data=data, # type: ignore + timestamps=timestamps, # type: ignore + variable_name=variable_name, + normalization=normalization, + ) + + @staticmethod + def get_time_series_metadata_type() -> Type: + "Get the metadata type of the NonSequentialTimeSeries" + return NonSequentialTimeSeriesMetadata + + @property + def data_array(self) -> NDArray: + "Get the data array NonSequentialTimeSeries" + if isinstance(self.data, pint.Quantity): + return self.data.magnitude + return self.data + + @property + def timestamps_array(self) -> NDArray: + "Get the timestamps array NonSequentialTimeSeries" + return self.timestamps + + +class NonSequentialTimeSeriesMetadataBase(TimeSeriesMetadata, abc.ABC): + """Base class for NonSequentialTimeSeries metadata.""" + + length: int + type: Literal["NonSequentialTimeSeries"] + + @classmethod + def from_data( + cls, time_series: NonSequentialTimeSeries, **user_attributes + ) -> "NonSequentialTimeSeriesMetadataBase": + """Construct a NonSequentialTimeSeriesMetadata from a NonSequentialTimeSeries.""" + quantity_metadata = ( + QuantityMetadata( + module=type(time_series.data).__module__, + quantity_type=type(time_series.data), + units=str(time_series.data.units), + ) + if isinstance(time_series.data, pint.Quantity) + else None + ) + return cls( + variable_name=time_series.variable_name, + length=time_series.length, # type: ignore + time_series_uuid=time_series.uuid, + user_attributes=user_attributes, + quantity_metadata=quantity_metadata, + normalization=time_series.normalization, + type=cls.get_time_series_type_str(), # type: ignore + ) + + @staticmethod + def get_time_series_data_type() -> Type: + return NonSequentialTimeSeries + + +class NonSequentialTimeSeriesMetadata(NonSequentialTimeSeriesMetadataBase): + """Defines the metadata for a NonSequentialTimeSeries.""" + + type: Literal["NonSequentialTimeSeries"] = "NonSequentialTimeSeries" + + @staticmethod + def get_time_series_type_str() -> str: + return "NonSequentialTimeSeries" + + class TimeSeriesKey(InfraSysBaseModel): """Base class for time series keys.""" variable_name: str - initial_time: datetime - resolution: timedelta - time_series_type: Type[SingleTimeSeries] + time_series_type: Type[TimeSeriesData] user_attributes: dict[str, Any] = {} @@ -394,6 +561,14 @@ class SingleTimeSeriesKey(TimeSeriesKey): """Keys for SingleTimeSeries.""" length: int + initial_time: datetime + resolution: timedelta + + +class NonSequentialTimeSeriesKey(TimeSeriesKey): + """Keys for SingleTimeSeries.""" + + length: int class DatabaseConnection(InfraSysBaseModel): diff --git a/src/infrasys/value_curves.py b/src/infrasys/value_curves.py index 2693d85..ffd0ae2 100644 --- a/src/infrasys/value_curves.py +++ b/src/infrasys/value_curves.py @@ -221,7 +221,7 @@ def to_input_output( xs = self.function_data.x_coords ys = np.multiply(xs[1:], self.function_data.y_coords).tolist() - ys.insert(0, c) + ys.insert(0, c) # type:ignore return InputOutputCurve( function_data=PiecewiseLinearData( diff --git a/tests/conftest.py b/tests/conftest.py index 8f4a51f..3e630f4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,7 +5,7 @@ from loguru import logger from infrasys.location import Location -from infrasys.time_series_models import SingleTimeSeries +from infrasys.time_series_models import SingleTimeSeries, NonSequentialTimeSeries from .models.simple_system import SimpleSystem, SimpleBus, SimpleGenerator, SimpleSubsystem @@ -35,6 +35,23 @@ def simple_system_with_time_series(simple_system) -> SimpleSystem: return simple_system +@pytest.fixture +def simple_system_with_nonsequential_time_series(simple_system) -> SimpleSystem: + """Creates a system with time series data.""" + variable_name = "active_power" + length = 10 + df = range(length) + timestamps = [ + datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(length) + ] + ts = NonSequentialTimeSeries.from_array( + data=df, variable_name=variable_name, timestamps=timestamps + ) + gen = simple_system.get_component(SimpleGenerator, "test-gen") + simple_system.add_time_series(ts, gen) + return simple_system + + @pytest.fixture(autouse=True) def propagate_logs(): """Enable logging for the package""" diff --git a/tests/test_arrow_storage.py b/tests/test_arrow_storage.py index e24d9a0..c414567 100644 --- a/tests/test_arrow_storage.py +++ b/tests/test_arrow_storage.py @@ -10,7 +10,11 @@ from infrasys.arrow_storage import ArrowTimeSeriesStorage from infrasys.in_memory_time_series_storage import InMemoryTimeSeriesStorage from infrasys.system import System -from infrasys.time_series_models import SingleTimeSeries, TimeSeriesStorageType +from infrasys.time_series_models import ( + SingleTimeSeries, + NonSequentialTimeSeries, + TimeSeriesStorageType, +) from .models.simple_system import SimpleSystem, SimpleBus, SimpleGenerator @@ -24,7 +28,7 @@ def test_system() -> System: return system -def test_file_creation(test_system: System): +def test_file_creation_with_single_time_series(test_system: System): gen1 = test_system.get_component(SimpleGenerator, "gen1") ts = SingleTimeSeries.from_array( data=range(8784), @@ -41,7 +45,26 @@ def test_file_creation(test_system: System): assert time_series_fpath.exists() -def test_copy_files(tmp_path): +def test_file_creation_with_nonsequential_time_series(test_system: System): + gen1 = test_system.get_component(SimpleGenerator, "gen1") + timestamps = [ + datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(10) + ] + ts = NonSequentialTimeSeries.from_array( + data=range(10), + timestamps=timestamps, + variable_name="active_power", + ) + test_system.time_series.add(ts, gen1, scenario="one", model_year="2030") + time_series = test_system.time_series.get(gen1, time_series_type=NonSequentialTimeSeries) + assert isinstance(test_system.time_series.storage, ArrowTimeSeriesStorage) + base_directory = test_system.get_time_series_directory() + assert isinstance(base_directory, Path) + time_series_fpath = base_directory.joinpath(str(time_series.uuid) + ".arrow") + assert time_series_fpath.exists() + + +def test_copy_files_with_single_time_series(tmp_path): """Test that we can copy the time series from tmp to specified folder""" system = SimpleSystem() bus = SimpleBus(name="test-bus", voltage=1.1) @@ -68,7 +91,36 @@ def test_copy_files(tmp_path): assert time_series_fpath.exists() -def test_read_deserialize_time_series(tmp_path): +def test_copy_files_with_nonsequential_timeseries(tmp_path): + """Test that we can copy the time series from tmp to specified folder""" + system = SimpleSystem() + bus = SimpleBus(name="test-bus", voltage=1.1) + gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True) + system.add_components(bus, gen1) + timestamps = [ + datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(10) + ] + ts = NonSequentialTimeSeries.from_array( + data=range(10), + timestamps=timestamps, + variable_name="active_power", + ) + system.time_series.add(ts, gen1, scenario="two", model_year="2030") + filename = tmp_path / "system.json" + system.to_json(filename) + + logger.info("Starting deserialization") + system2 = SimpleSystem.from_json(filename) + gen1b = system2.get_component(SimpleGenerator, gen1.name) + time_series = system2.time_series.get(gen1b, time_series_type=NonSequentialTimeSeries) + time_series_fpath = ( + tmp_path / system2.get_time_series_directory() / (str(time_series.uuid) + ".arrow") + ) + + assert time_series_fpath.exists() + + +def test_read_deserialize_single_time_series(tmp_path): """Test that we can read from a deserialized system.""" system = SimpleSystem() bus = SimpleBus(name="test-bus", voltage=1.1) @@ -96,7 +148,37 @@ def test_read_deserialize_time_series(tmp_path): assert np.array_equal(deserialize_ts.data, np.array(range(length))) -def test_copied_storage_system(simple_system_with_time_series): +def test_read_deserialize_nonsequential_time_series(tmp_path): + """Test that we can read from a deserialized system.""" + system = SimpleSystem() + bus = SimpleBus(name="test-bus", voltage=1.1) + gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True) + system.add_components(bus, gen1) + timestamps = [ + datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(10) + ] + ts = NonSequentialTimeSeries.from_array( + data=range(10), + timestamps=timestamps, + variable_name="active_power", + ) + system.time_series.add(ts, gen1, scenario="high", model_year="2030") + filename = tmp_path / "system.json" + system.to_json(filename) + + system2 = SimpleSystem.from_json(filename, time_series_directory=tmp_path) + gen1b = system2.get_component(SimpleGenerator, gen1.name) + deserialize_ts = system2.time_series.get(gen1b, time_series_type=NonSequentialTimeSeries) + assert isinstance(deserialize_ts, NonSequentialTimeSeries) + assert isinstance(deserialize_ts.data, np.ndarray) + assert isinstance(deserialize_ts.timestamps, np.ndarray) + length = ts.length + assert isinstance(length, int) + assert np.array_equal(deserialize_ts.data, np.array(range(length))) + assert np.array_equal(deserialize_ts.timestamps, np.array(timestamps)) + + +def test_copied_storage_system_single_time_series(simple_system_with_time_series): assert isinstance( simple_system_with_time_series._time_series_mgr._storage, ArrowTimeSeriesStorage ) @@ -114,3 +196,41 @@ def test_copied_storage_system(simple_system_with_time_series): data_array_2 = simple_system_with_time_series.list_time_series(gen_component)[0].data assert np.array_equal(data_array_1, data_array_2) + + +def test_copied_storage_system_nonsequential_time_series( + simple_system_with_nonsequential_time_series, +): + assert isinstance( + simple_system_with_nonsequential_time_series._time_series_mgr._storage, + ArrowTimeSeriesStorage, + ) + gen_component = next( + simple_system_with_nonsequential_time_series.get_components(SimpleGenerator) + ) + + data_array_1 = simple_system_with_nonsequential_time_series.list_time_series( + gen_component, time_series_type=NonSequentialTimeSeries + )[0].data + timestamps_array_1 = simple_system_with_nonsequential_time_series.list_time_series( + gen_component, time_series_type=NonSequentialTimeSeries + )[0].timestamps + + simple_system_with_nonsequential_time_series.convert_storage( + time_series_type=NonSequentialTimeSeries, + time_series_storage_type=TimeSeriesStorageType.MEMORY, + ) + + assert isinstance( + simple_system_with_nonsequential_time_series._time_series_mgr._storage, + InMemoryTimeSeriesStorage, + ) + + data_array_2 = simple_system_with_nonsequential_time_series.list_time_series( + gen_component, time_series_type=NonSequentialTimeSeries + )[0].data + timestamps_array_2 = simple_system_with_nonsequential_time_series.list_time_series( + gen_component, time_series_type=NonSequentialTimeSeries + )[0].timestamps + assert np.array_equal(data_array_1, data_array_2) + assert np.array_equal(timestamps_array_1, timestamps_array_2) diff --git a/tests/test_in_memory_storage.py b/tests/test_in_memory_storage.py index 336cb6c..81564b9 100644 --- a/tests/test_in_memory_storage.py +++ b/tests/test_in_memory_storage.py @@ -1,6 +1,10 @@ from infrasys.chronify_time_series_storage import ChronifyTimeSeriesStorage from .models.simple_system import SimpleSystem, SimpleBus, SimpleGenerator -from infrasys.time_series_models import SingleTimeSeries, TimeSeriesStorageType +from infrasys.time_series_models import ( + SingleTimeSeries, + NonSequentialTimeSeries, + TimeSeriesStorageType, +) from infrasys.exceptions import ISAlreadyAttached from infrasys.arrow_storage import ArrowTimeSeriesStorage from infrasys.in_memory_time_series_storage import InMemoryTimeSeriesStorage @@ -44,7 +48,9 @@ ), ], ) -def test_convert_storage_time_series(original_kwargs, new_kwargs, original_stype, new_stype): +def test_convert_storage_single_time_series( + original_kwargs, new_kwargs, original_stype, new_stype +): test_bus = SimpleBus.example() test_generator = SimpleGenerator.example() system = SimpleSystem(auto_add_composed_components=True, **original_kwargs) @@ -67,9 +73,60 @@ def test_convert_storage_time_series(original_kwargs, new_kwargs, original_stype system.convert_storage(**new_kwargs) - assert isinstance(system.time_series.storage, new_stype) + assert isinstance(system._time_series_mgr._storage, new_stype) ts2 = system.get_time_series( test_generator, time_series_type=SingleTimeSeries, variable_name="load" ) assert np.array_equal(ts2.data_array, test_time_series_data.data_array) + + +@pytest.mark.parametrize( + "original_kwargs,new_kwargs,original_stype,new_stype", + [ + ( + {"time_series_storage_type": TimeSeriesStorageType.MEMORY}, + {}, + InMemoryTimeSeriesStorage, + ArrowTimeSeriesStorage, + ), + ( + {}, + {"time_series_storage_type": TimeSeriesStorageType.MEMORY}, + ArrowTimeSeriesStorage, + InMemoryTimeSeriesStorage, + ), + ], +) +def test_convert_storage_nonsequential_time_series( + original_kwargs, new_kwargs, original_stype, new_stype +): + test_bus = SimpleBus.example() + test_generator = SimpleGenerator.example() + system = SimpleSystem(auto_add_composed_components=True, **original_kwargs) + + assert isinstance(system._time_series_mgr._storage, original_stype) + + system.get_components() + system.add_components(test_bus) + system.add_components(test_generator) + + timestamps = np.array( + [datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(24)], + ) + test_time_series_data = NonSequentialTimeSeries( + data=np.arange(24), + timestamps=timestamps, + variable_name="load", + ) + system.add_time_series(test_time_series_data, test_generator) + with pytest.raises(ISAlreadyAttached): + system.add_time_series(test_time_series_data, test_generator) + system.convert_storage(**new_kwargs) + + assert isinstance(system._time_series_mgr._storage, new_stype) + ts2 = system.get_time_series( + test_generator, time_series_type=NonSequentialTimeSeries, variable_name="load" + ) + assert np.array_equal(ts2.data_array, test_time_series_data.data_array) + assert np.array_equal(ts2.timestamps, test_time_series_data.timestamps) diff --git a/tests/test_nonsequential_time_series.py b/tests/test_nonsequential_time_series.py new file mode 100644 index 0000000..cd18507 --- /dev/null +++ b/tests/test_nonsequential_time_series.py @@ -0,0 +1,134 @@ +"""Test related to arrow storage module.""" + +from datetime import datetime, timedelta + +import pytest +import numpy as np + +from infrasys.normalization import NormalizationMax +from infrasys.quantities import ActivePower +from infrasys.time_series_models import NonSequentialTimeSeries + + +@pytest.fixture(name="timestamps") +def sample_timestamps(): + "Sample timestamps sequence" + base_datetime = datetime(year=2020, month=1, day=1) + return [base_datetime + timedelta(hours=4 * i) for i in range(4)] + + +@pytest.fixture(name="quantity_data") +def sample_quantity_data(): + "Sample infrasys quantity data" + return ActivePower(range(4), "kilowatts") + + +@pytest.fixture(name="data") +def sample_data(): + "Sample data sequence" + return range(4) + + +@pytest.fixture(name="variable_name") +def sample_variable_name(): + "Sample variable name" + return "active_power" + + +def test_nonsequential_time_series_attributes(data, timestamps, variable_name): + "Test NOnSequentialTimeseries with Infrasys Quantities as data" + length = 4 + ts = NonSequentialTimeSeries.from_array( + data=data, + variable_name=variable_name, + timestamps=timestamps, + ) + assert isinstance(ts, NonSequentialTimeSeries) + assert ts.length == length + assert isinstance(ts.data, np.ndarray) + assert isinstance(ts.timestamps, np.ndarray) + + +def test_invalid_sequence_length(data, timestamps, variable_name): + """Check that time series has at least 2 elements.""" + with pytest.raises(ValueError, match="length must be at least 2"): + NonSequentialTimeSeries.from_array( + data=[data[0]], variable_name=variable_name, timestamps=[timestamps[0]] + ) + + +def test_duplicate_timestamps(data, variable_name): + """Check that time series has unique timestamps""" + timestamps = [ + datetime(2020, 5, 17), + datetime(2020, 5, 17), + datetime(2020, 5, 18), + datetime(2020, 5, 20), + ] + with pytest.raises(ValueError, match="Timestamps must be unique"): + NonSequentialTimeSeries.from_array( + data=data, variable_name=variable_name, timestamps=timestamps + ) + + +def test_chronological_timestamps(data, variable_name): + """Check that time series has unique timestamps""" + timestamps = [ + datetime(2020, 6, 17), + datetime(2020, 5, 17), + datetime(2020, 5, 18), + datetime(2020, 5, 20), + ] + with pytest.raises(ValueError, match="chronological order"): + NonSequentialTimeSeries.from_array( + data=data, variable_name=variable_name, timestamps=timestamps + ) + + +def test_nonsequential_time_series_attributes_with_quantity( + quantity_data, timestamps, variable_name +): + "Test NonSequentialTimeseries with Infrasys Quantities as data" + length = 4 + + ts = NonSequentialTimeSeries.from_array( + data=quantity_data, + variable_name=variable_name, + timestamps=timestamps, + ) + assert isinstance(ts, NonSequentialTimeSeries) + assert ts.length == length + assert isinstance(ts.data, ActivePower) + assert isinstance(ts.timestamps, np.ndarray) + + +def test_normalization(data, timestamps, variable_name): + "Test normalization approach on sample data for NonSequentialTimeSeries" + length = 4 + max_val = data[-1] + ts = NonSequentialTimeSeries.from_array( + data=data, + timestamps=timestamps, + variable_name=variable_name, + normalization=NormalizationMax(), + ) + assert isinstance(ts, NonSequentialTimeSeries) + assert ts.length == length + for i, val in enumerate(ts.data): + assert val == data[i] / max_val + + +def test_normalization_quantity(quantity_data, timestamps, variable_name): + "Test normalization approach on sample quantity data for NonSequentialTimeSeries" + length = 4 + max_val = quantity_data.magnitude[-1] + ts = NonSequentialTimeSeries.from_array( + data=quantity_data, + timestamps=timestamps, + variable_name=variable_name, + normalization=NormalizationMax(), + ) + assert isinstance(ts, NonSequentialTimeSeries) + assert ts.length == length + for i, val in enumerate(ts.data): + assert val == quantity_data.magnitude[i] / max_val diff --git a/tests/test_serialization.py b/tests/test_serialization.py index eedc5e3..4ced325 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -3,6 +3,7 @@ import random import os from datetime import datetime, timedelta +from typing import Type import numpy as np from numpy._typing import NDArray @@ -11,12 +12,12 @@ from pydantic import WithJsonSchema from typing_extensions import Annotated -from infrasys import Location, SingleTimeSeries +from infrasys import Location, SingleTimeSeries, NonSequentialTimeSeries from infrasys.component import Component from infrasys.quantities import Distance, ActivePower from infrasys.exceptions import ISOperationNotAllowed from infrasys.normalization import NormalizationMax -from infrasys.time_series_models import TimeSeriesStorageType +from infrasys.time_series_models import TimeSeriesStorageType, TimeSeriesData from .models.simple_system import ( SimpleSystem, SimpleBus, @@ -30,6 +31,12 @@ TimeSeriesStorageType.MEMORY, ) +# chronify not yet implemented for nonsequentialtimeseries +TS_STORAGE_OPTIONS_NONSEQUENTIAL = ( + TimeSeriesStorageType.ARROW, + TimeSeriesStorageType.MEMORY, +) + class ComponentWithPintQuantity(Component): """Test component with a container of quantities.""" @@ -86,7 +93,7 @@ def test_serialization(tmp_path): @pytest.mark.parametrize("time_series_storage_type", TS_STORAGE_OPTIONS) -def test_serialize_time_series(tmp_path, time_series_storage_type): +def test_serialize_single_time_series(tmp_path, time_series_storage_type): system = SimpleSystem(time_series_storage_type=time_series_storage_type) bus = SimpleBus(name="test-bus", voltage=1.1) gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True) @@ -114,6 +121,8 @@ def check_deserialize_with_read_only_time_series( gen2_name: str, variable_name: str, expected_ts_data: NDArray | pint.Quantity, + expected_ts_timestamps: NDArray | None = None, + time_series_type: Type[TimeSeriesData] = SingleTimeSeries, ): system = SimpleSystem.from_json(filename, time_series_read_only=True) system_ts_dir = system.get_time_series_directory() @@ -123,8 +132,46 @@ def check_deserialize_with_read_only_time_series( with pytest.raises(ISOperationNotAllowed): system.remove_time_series(gen1b, variable_name=variable_name) - ts2 = system.get_time_series(gen1b, variable_name=variable_name) + ts2 = system.get_time_series( + gen1b, time_series_type=time_series_type, variable_name=variable_name + ) assert np.array_equal(ts2.data, expected_ts_data) + if expected_ts_timestamps is not None: + assert np.array_equal(ts2.timestamps, expected_ts_timestamps) + + +@pytest.mark.parametrize("time_series_storage_type", TS_STORAGE_OPTIONS_NONSEQUENTIAL) +def test_serialize_nonsequential_time_series(tmp_path, time_series_storage_type): + "Test serialization of NonSequentialTimeSeries" + system = SimpleSystem(time_series_storage_type=time_series_storage_type) + bus = SimpleBus(name="test-bus", voltage=1.1) + gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True) + gen2 = SimpleGenerator(name="gen2", active_power=1.0, rating=1.0, bus=bus, available=True) + system.add_components(bus, gen1, gen2) + + variable_name = "active_power" + length = 10 + data = range(length) + timestamps = [ + datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(length) + ] + ts = NonSequentialTimeSeries.from_array( + data=data, variable_name=variable_name, timestamps=timestamps + ) + system.add_time_series(ts, gen1, gen2, scenario="high", model_year="2030") + filename = tmp_path / "system.json" + system.to_json(filename) + + check_deserialize_with_read_write_time_series(filename) + check_deserialize_with_read_only_time_series( + filename, + gen1.name, + gen2.name, + variable_name, + ts.data, + ts.timestamps, + time_series_type=NonSequentialTimeSeries, + ) def check_deserialize_with_read_write_time_series(filename): @@ -161,7 +208,7 @@ def test_serialize_quantity(tmp_path, distance): assert c2.distance == c1.distance -def test_with_time_series_quantity(tmp_path): +def test_with_single_time_series_quantity(tmp_path): """Test serialization of SingleTimeSeries with a Pint quantity.""" system = SimpleSystem(auto_add_composed_components=True) gen = SimpleGenerator.example() @@ -179,7 +226,9 @@ def test_with_time_series_quantity(tmp_path): system2 = SimpleSystem.from_json(sys_file) gen2 = system2.get_component(SimpleGenerator, gen.name) - ts2 = system2.get_time_series(gen2, variable_name=variable_name) + ts2 = system2.get_time_series( + gen2, time_series_type=SingleTimeSeries, variable_name=variable_name + ) assert isinstance(ts, SingleTimeSeries) assert ts.length == length assert ts.resolution == resolution @@ -188,8 +237,40 @@ def test_with_time_series_quantity(tmp_path): assert np.array_equal(ts2.data.magnitude, np.array(range(length))) +def test_with_nonsequential_time_series_quantity(tmp_path): + """Test serialization of SingleTimeSeries with a Pint quantity.""" + system = SimpleSystem(auto_add_composed_components=True) + gen = SimpleGenerator.example() + system.add_components(gen) + length = 10 + data = ActivePower(range(length), "watts") + variable_name = "active_power" + timestamps = [ + datetime(year=2030, month=1, day=1) + timedelta(seconds=100 * i) for i in range(10) + ] + ts = NonSequentialTimeSeries.from_array( + data=data, variable_name=variable_name, timestamps=timestamps + ) + system.add_time_series(ts, gen) + + sys_file = tmp_path / "system.json" + system.to_json(sys_file) + + system2 = SimpleSystem.from_json(sys_file) + gen2 = system2.get_component(SimpleGenerator, gen.name) + ts2 = system2.get_time_series( + gen2, time_series_type=NonSequentialTimeSeries, variable_name=variable_name + ) + assert isinstance(ts, NonSequentialTimeSeries) + assert ts.length == length + assert isinstance(ts2.data.magnitude, np.ndarray) + assert isinstance(ts2.timestamps, np.ndarray) + assert np.array_equal(ts2.data.magnitude, np.array(range(length))) + assert np.array_equal(ts2.timestamps, np.array(timestamps)) + + @pytest.mark.parametrize("storage_type", TS_STORAGE_OPTIONS) -def test_system_with_time_series_normalization(tmp_path, storage_type): +def test_system_with_single_time_series_normalization(tmp_path, storage_type): system = SimpleSystem( name="test-system", auto_add_composed_components=True, @@ -211,7 +292,9 @@ def test_system_with_time_series_normalization(tmp_path, storage_type): system2 = SimpleSystem.from_json(filename) gen2 = system2.get_component(SimpleGenerator, gen.name) - ts2 = system2.get_time_series(gen2, variable_name=variable_name) + ts2 = system2.get_time_series( + gen2, time_series_type=SingleTimeSeries, variable_name=variable_name + ) assert ts2.normalization.max_value == length - 1 diff --git a/tests/test_system.py b/tests/test_system.py index 827f92f..1dcc432 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -205,7 +205,7 @@ def check_attached_components(my_sys, parent_type, child_type): system2.get_component(SimpleBus, gen.bus.name) -def test_time_series_attach_from_array(): +def test_single_time_series_attach_from_array(): system = SimpleSystem() bus = SimpleBus(name="test-bus", voltage=1.1) gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True) @@ -220,10 +220,15 @@ def test_time_series_attach_from_array(): system.add_time_series(ts, gen1, gen2) assert system.has_time_series(gen1, variable_name=variable_name) assert system.has_time_series(gen2, variable_name=variable_name) - assert np.array_equal(system.get_time_series(gen1, variable_name=variable_name).data, ts.data) + assert np.array_equal( + system.get_time_series( + gen1, time_series_type=SingleTimeSeries, variable_name=variable_name + ).data, + ts.data, + ) -def test_time_series(): +def test_single_time_series(): system = SimpleSystem() bus = SimpleBus(name="test-bus", voltage=1.1) gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True)