Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added NonSequentialTimeSeries Model #76

Merged
merged 5 commits into from
Mar 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/infrasys/__init__.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
from .system import System
from .time_series_models import (
SingleTimeSeries,
NonSequentialTimeSeries,
TimeSeriesStorageType,
TimeSeriesKey,
SingleTimeSeriesKey,
@@ -26,6 +27,7 @@
"Location",
"NormalizationModel",
"SingleTimeSeries",
"NonSequentialTimeSeries",
"SingleTimeSeriesKey",
"SupplementalAttribute",
"System",
94 changes: 85 additions & 9 deletions src/infrasys/arrow_storage.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from pathlib import Path
from tempfile import mkdtemp
from typing import Any, Optional
from uuid import UUID
from functools import singledispatchmethod

import numpy as np
from numpy.typing import NDArray
@@ -17,6 +17,8 @@
from infrasys.time_series_models import (
SingleTimeSeries,
SingleTimeSeriesMetadata,
NonSequentialTimeSeries,
NonSequentialTimeSeriesMetadata,
TimeSeriesData,
TimeSeriesMetadata,
TimeSeriesStorageType,
@@ -31,6 +33,7 @@ class ArrowTimeSeriesStorage(TimeSeriesStorageBase):

def __init__(self, directory: Path) -> None:
self._ts_directory = directory
self._ts_metadata: str | None = None

@classmethod
def create_with_temp_directory(
@@ -57,16 +60,37 @@ def add_time_series(
time_series: TimeSeriesData,
connection: Any = None,
) -> None:
if isinstance(time_series, SingleTimeSeries):
self._add_single_time_series(metadata.time_series_uuid, time_series.data_array)
self._add_time_series(time_series)

@singledispatchmethod
def _add_time_series(self, time_series) -> None:
msg = f"Bug: need to implement add_time_series for {type(time_series)}"
raise NotImplementedError(msg)

@_add_time_series.register(SingleTimeSeries)
def _(self, time_series):
time_series_data = time_series.data_array
time_series_uuid = time_series.uuid
fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}")
if not fpath.exists():
arrow_batch = self._convert_to_record_batch_single_time_series(
time_series_data, str(time_series_uuid)
)
with pa.OSFile(str(fpath), "wb") as sink: # type: ignore
with pa.ipc.new_file(sink, arrow_batch.schema) as writer:
writer.write(arrow_batch)
logger.trace("Saving time series to {}", fpath)
logger.debug("Added {} to time series storage", time_series_uuid)
else:
msg = f"Bug: need to implement add_time_series for {type(time_series)}"
raise NotImplementedError(msg)
logger.debug("{} was already stored", time_series_uuid)

def _add_single_time_series(self, time_series_uuid: UUID, time_series_data: NDArray) -> None:
@_add_time_series.register(NonSequentialTimeSeries)
def _(self, time_series):
time_series_data = (time_series.data_array, time_series.timestamps_array)
time_series_uuid = time_series.uuid
fpath = self._ts_directory.joinpath(f"{time_series_uuid}{EXTENSION}")
if not fpath.exists():
arrow_batch = self._convert_to_record_batch(time_series_data, str(time_series_uuid))
arrow_batch = self._convert_to_record_batch_nonsequential_time_series(time_series_data)
with pa.OSFile(str(fpath), "wb") as sink: # type: ignore
with pa.ipc.new_file(sink, arrow_batch.schema) as writer:
writer.write(arrow_batch)
@@ -87,6 +111,8 @@ def get_time_series(
metadata=metadata, start_time=start_time, length=length
)

elif isinstance(metadata, NonSequentialTimeSeriesMetadata):
return self._get_nonsequential_time_series(metadata=metadata)
msg = f"Bug: need to implement get_time_series for {type(metadata)}"
raise NotImplementedError(msg)

@@ -147,12 +173,62 @@ def _get_single_time_series(
normalization=metadata.normalization,
)

def _convert_to_record_batch(self, time_series_array: NDArray, column: str) -> pa.RecordBatch:
"""Create record batch to save array to disk."""
def _get_nonsequential_time_series(
self,
metadata: NonSequentialTimeSeriesMetadata,
) -> NonSequentialTimeSeries:
fpath = self._ts_directory.joinpath(f"{metadata.time_series_uuid}{EXTENSION}")
with pa.memory_map(str(fpath), "r") as source:
base_ts = pa.ipc.open_file(source).get_record_batch(0)
logger.trace("Reading time series from {}", fpath)
columns = base_ts.column_names
if len(columns) != 2:
msg = f"Bug: expected two columns: {columns=}"
raise Exception(msg)
data_column, timestamps_column = columns[0], columns[1]
data, timestamps = (
base_ts[data_column],
base_ts[timestamps_column],
)
if metadata.quantity_metadata is not None:
np_data_array = metadata.quantity_metadata.quantity_type(
data, metadata.quantity_metadata.units
)
else:
np_data_array = np.array(data)
np_time_array = np.array(timestamps).astype("O") # convert to datetime object
return NonSequentialTimeSeries(
uuid=metadata.time_series_uuid,
variable_name=metadata.variable_name,
data=np_data_array,
timestamps=np_time_array,
normalization=metadata.normalization,
)

def _convert_to_record_batch_single_time_series(
self, time_series_array: NDArray, column: str
) -> pa.RecordBatch:
"""Create record batch for SingleTimeSeries to save array to disk."""
pa_array = pa.array(time_series_array)
schema = pa.schema([pa.field(column, pa_array.type)])
return pa.record_batch([pa_array], schema=schema)

def _convert_to_record_batch_nonsequential_time_series(
self, time_series_array: tuple[NDArray, NDArray]
) -> pa.RecordBatch:
"""Create record batch for NonSequentialTimeSeries to save array to disk."""
data_array, timestamps_array = time_series_array
pa_data_array = pa.array(data_array)
pa_timestamps_array = pa.array(timestamps_array)

schema = pa.schema(
[
pa.field("data", pa_data_array.type),
pa.field("timestamp", pa_timestamps_array.type),
]
)
return pa.record_batch([pa_data_array, pa_timestamps_array], schema=schema)


def clean_tmp_folder(folder: Path | str) -> None:
shutil.rmtree(folder)
54 changes: 46 additions & 8 deletions src/infrasys/in_memory_time_series_storage.py
Original file line number Diff line number Diff line change
@@ -2,31 +2,32 @@

from datetime import datetime
from pathlib import Path
from typing import Any
from typing import Any, TypeAlias

from numpy.typing import NDArray
from typing import TypeAlias
from uuid import UUID

from loguru import logger

from infrasys.exceptions import ISNotStored
from infrasys.time_series_models import (
SingleTimeSeries,
SingleTimeSeriesMetadata,
NonSequentialTimeSeries,
NonSequentialTimeSeriesMetadata,
TimeSeriesData,
TimeSeriesMetadata,
)
from infrasys.time_series_storage_base import TimeSeriesStorageBase

DataStoreType: TypeAlias = NDArray
DataStoreType: TypeAlias = NDArray | tuple[NDArray, NDArray]


class InMemoryTimeSeriesStorage(TimeSeriesStorageBase):
"""Stores time series in memory."""

def __init__(self) -> None:
self._arrays: dict[UUID, DataStoreType] = {} # Time series UUID, not metadata UUID
self._ts_metadata_type: str | None = None

def get_time_series_directory(self) -> None:
return None
@@ -37,13 +38,20 @@ def add_time_series(
time_series: TimeSeriesData,
connection: Any = None,
) -> None:
if isinstance(time_series, SingleTimeSeries):
if isinstance(time_series, (SingleTimeSeries, NonSequentialTimeSeries)):
if metadata.time_series_uuid not in self._arrays:
self._arrays[metadata.time_series_uuid] = time_series.data_array
self._arrays[metadata.time_series_uuid] = (
(
time_series.data_array,
time_series.timestamps,
)
if hasattr(time_series, "timestamps")
else time_series.data_array
)
self._ts_metadata_type = metadata.type
logger.debug("Added {} to store", time_series.summary)
else:
logger.debug("{} was already stored", time_series.summary)

else:
msg = f"add_time_series not implemented for {type(time_series)}"
raise NotImplementedError(msg)
@@ -57,6 +65,8 @@ def get_time_series(
) -> TimeSeriesData:
if isinstance(metadata, SingleTimeSeriesMetadata):
return self._get_single_time_series(metadata, start_time, length)
elif isinstance(metadata, NonSequentialTimeSeriesMetadata):
return self._get_nonsequential_time_series(metadata)
raise NotImplementedError(str(metadata.get_time_series_data_type()))

def remove_time_series(self, metadata: TimeSeriesMetadata, connection: Any = None) -> None:
@@ -77,7 +87,8 @@ def _get_single_time_series(
start_time: datetime | None = None,
length: int | None = None,
) -> SingleTimeSeries:
ts_data = self._arrays.get(metadata.time_series_uuid)
ts_data: NDArray | None
ts_data = self._arrays.get(metadata.time_series_uuid) # type: ignore
if ts_data is None:
msg = f"No time series with {metadata.time_series_uuid} is stored"
raise ISNotStored(msg)
@@ -99,3 +110,30 @@ def _get_single_time_series(
data=ts_data,
normalization=metadata.normalization,
)

def _get_nonsequential_time_series(
self,
metadata: NonSequentialTimeSeriesMetadata,
) -> NonSequentialTimeSeries:
ts_data, ts_timestamps = self._arrays.get(metadata.time_series_uuid, (None, None))
if ts_data is None:
msg = f"No time series data with {metadata.time_series_uuid} is stored"
raise ISNotStored(msg)

if ts_timestamps is None:
msg = f"No time series timestamps with {metadata.time_series_uuid} is stored"
raise ISNotStored(msg)

if metadata.quantity_metadata is not None:
ts_data = metadata.quantity_metadata.quantity_type(
ts_data, metadata.quantity_metadata.units
)
assert ts_data is not None
assert ts_timestamps is not None
return NonSequentialTimeSeries(
uuid=metadata.time_series_uuid,
variable_name=metadata.variable_name,
data=ts_data,
timestamps=ts_timestamps,
normalization=metadata.normalization,
)
36 changes: 25 additions & 11 deletions src/infrasys/time_series_manager.py
Original file line number Diff line number Diff line change
@@ -20,6 +20,9 @@
SingleTimeSeries,
SingleTimeSeriesKey,
SingleTimeSeriesMetadata,
NonSequentialTimeSeries,
NonSequentialTimeSeriesMetadata,
NonSequentialTimeSeriesKey,
TimeSeriesData,
TimeSeriesKey,
TimeSeriesMetadata,
@@ -477,18 +480,19 @@ def convert_storage(self, in_place: bool = True, **kwargs) -> TimeSeriesStorageB
The new storage instance.
"""
new_storage = self.create_new_storage(**kwargs)
for time_series_uuid in self.metadata_store.unique_uuids_by_type(
SingleTimeSeries.__name__
):
metadata = self.metadata_store.list_metadata_with_time_series_uuid(
time_series_uuid, limit=1
)
if len(metadata) != 1:
msg = f"Expected 1 metadata for {time_series_uuid}, got {len(metadata)}"
raise Exception(msg)
for time_series_type in (SingleTimeSeries, NonSequentialTimeSeries):
for time_series_uuid in self.metadata_store.unique_uuids_by_type(
time_series_type.__name__
):
metadata = self.metadata_store.list_metadata_with_time_series_uuid(
time_series_uuid, limit=1
)
if len(metadata) != 1:
msg = f"Expected 1 metadata for {time_series_uuid}, got {len(metadata)}"
raise Exception(msg)

time_series = self._storage.get_time_series(metadata[0])
new_storage.add_time_series(metadata[0], time_series)
time_series = self._storage.get_time_series(metadata[0])
new_storage.add_time_series(metadata[0], time_series)

if in_place:
self._storage = new_storage
@@ -514,6 +518,16 @@ def _(metadata: SingleTimeSeriesMetadata) -> TimeSeriesKey:
)


@make_time_series_key.register(NonSequentialTimeSeriesMetadata)
def _(metadata: NonSequentialTimeSeriesMetadata) -> TimeSeriesKey:
return NonSequentialTimeSeriesKey(
length=metadata.length,
user_attributes=metadata.user_attributes,
variable_name=metadata.variable_name,
time_series_type=NonSequentialTimeSeries,
)


def _get_data_connection(conn: DatabaseConnection | None) -> Any:
return None if conn is None else conn.data_conn

19 changes: 16 additions & 3 deletions src/infrasys/time_series_metadata_store.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,11 @@
SerializedTypeMetadata,
TYPE_METADATA,
)
from infrasys.time_series_models import TimeSeriesMetadata
from infrasys.time_series_models import (
TimeSeriesMetadata,
SingleTimeSeriesMetadataBase,
NonSequentialTimeSeriesMetadataBase,
)
from infrasys.utils.sqlite import execute


@@ -104,13 +108,22 @@ def add(
msg = f"Time series with {metadata=} is already stored."
raise ISAlreadyAttached(msg)

if isinstance(metadata, SingleTimeSeriesMetadataBase):
resolution = str(metadata.resolution)
initial_time = str(metadata.initial_time)
elif isinstance(metadata, NonSequentialTimeSeriesMetadataBase):
resolution = None
initial_time = None
else:
raise NotImplementedError

rows = [
(
None, # auto-assigned by sqlite
str(metadata.time_series_uuid),
metadata.type,
str(metadata.initial_time),
str(metadata.resolution),
initial_time,
resolution,
metadata.variable_name,
str(owner.uuid),
owner.__class__.__name__,
187 changes: 181 additions & 6 deletions src/infrasys/time_series_models.py
Original file line number Diff line number Diff line change
@@ -261,13 +261,11 @@ class TimeSeriesMetadata(InfraSysBaseModel, abc.ABC):
"""Defines common metadata for all time series."""

variable_name: str
initial_time: datetime
resolution: timedelta
time_series_uuid: UUID
user_attributes: dict[str, Any] = {}
quantity_metadata: Optional[QuantityMetadata] = None
normalization: NormalizationModel = None
type: Literal["SingleTimeSeries", "SingleTimeSeriesScalingFactor"]
type: Literal["SingleTimeSeries", "SingleTimeSeriesScalingFactor", "NonSequentialTimeSeries"]

@property
def label(self) -> str:
@@ -290,6 +288,8 @@ class SingleTimeSeriesMetadataBase(TimeSeriesMetadata, abc.ABC):
"""Base class for SingleTimeSeries metadata."""

length: int
initial_time: datetime
resolution: timedelta
type: Literal["SingleTimeSeries", "SingleTimeSeriesScalingFactor"]

@classmethod
@@ -380,20 +380,195 @@ def get_time_series_type_str() -> str:
]


class NonSequentialTimeSeries(TimeSeriesData):
"""Defines a non-sequential time array with a single dimension of floats."""

data: NDArray | pint.Quantity
timestamps: NDArray

@computed_field
def length(self) -> int:
"""Return the length of the data."""
return len(self.data)

def __eq__(self, other: Any) -> bool:
if not isinstance(other, NonSequentialTimeSeries):
raise NotImplementedError
is_equal = True
for field in self.model_fields_set:
if field == "data":
if not (self.data == other.data).all():
is_equal = False
break
elif field == "timestamps":
if not all(t1 == t2 for t1, t2 in zip(self.timestamps, other.timestamps)):
is_equal = False
break
else:
if not getattr(self, field) == getattr(other, field):
is_equal = False
break
return is_equal

@field_validator("data", mode="before")
@classmethod
def check_data(cls, data) -> NDArray | pint.Quantity:
"""Check time series data."""
if len(data) < 2:
msg = f"NonSequentialTimeSeries length must be at least 2: {len(data)}"
raise ValueError(msg)

if isinstance(data, pint.Quantity):
if not isinstance(data.magnitude, np.ndarray):
return type(data)(np.array(data.magnitude), units=data.units)
return data

if not isinstance(data, np.ndarray):
return np.array(data)

return data

@field_validator("timestamps", mode="before")
@classmethod
def check_timestamp(cls, timestamps: Sequence[datetime] | NDArray) -> NDArray:
"""Check non-sequential timestamps."""
if len(timestamps) < 2:
msg = f"Time index must have at least 2 timestamps: {len(timestamps)}"
raise ValueError(msg)

if len(timestamps) != len(set(timestamps)):
msg = "Duplicate timestamps found. Timestamps must be unique."
raise ValueError(msg)

time_array = np.array(timestamps, dtype="datetime64[ns]")
if not np.all(np.diff(time_array) > np.timedelta64(0, "s")):
msg = "Timestamps must be in chronological order."
raise ValueError(msg)

if not isinstance(timestamps, np.ndarray):
return np.array(timestamps)

return timestamps

@classmethod
def from_array(
cls,
data: ISArray,
timestamps: Sequence[datetime] | NDArray,
variable_name: str,
normalization: NormalizationModel = None,
) -> "NonSequentialTimeSeries":
"""Method of NonSequentialTimeSeries that creates an instance from an array and timestamps.
Parameters
----------
data
Sequence that contains the values of the time series
timestamps
Sequence that contains the non-sequential timestamps
variable_name
Name assigned to the values of the time series (e.g., active_power)
normalization
Normalization model to normalize the data
Returns
-------
NonSequentialTimeSeries
"""
if normalization is not None:
npa = data if isinstance(data, np.ndarray) else np.array(data)
data = normalization.normalize_array(npa)

return NonSequentialTimeSeries(
data=data, # type: ignore
timestamps=timestamps, # type: ignore
variable_name=variable_name,
normalization=normalization,
)

@staticmethod
def get_time_series_metadata_type() -> Type:
"Get the metadata type of the NonSequentialTimeSeries"
return NonSequentialTimeSeriesMetadata

@property
def data_array(self) -> NDArray:
"Get the data array NonSequentialTimeSeries"
if isinstance(self.data, pint.Quantity):
return self.data.magnitude
return self.data

@property
def timestamps_array(self) -> NDArray:
"Get the timestamps array NonSequentialTimeSeries"
return self.timestamps


class NonSequentialTimeSeriesMetadataBase(TimeSeriesMetadata, abc.ABC):
"""Base class for NonSequentialTimeSeries metadata."""

length: int
type: Literal["NonSequentialTimeSeries"]

@classmethod
def from_data(
cls, time_series: NonSequentialTimeSeries, **user_attributes
) -> "NonSequentialTimeSeriesMetadataBase":
"""Construct a NonSequentialTimeSeriesMetadata from a NonSequentialTimeSeries."""
quantity_metadata = (
QuantityMetadata(
module=type(time_series.data).__module__,
quantity_type=type(time_series.data),
units=str(time_series.data.units),
)
if isinstance(time_series.data, pint.Quantity)
else None
)
return cls(
variable_name=time_series.variable_name,
length=time_series.length, # type: ignore
time_series_uuid=time_series.uuid,
user_attributes=user_attributes,
quantity_metadata=quantity_metadata,
normalization=time_series.normalization,
type=cls.get_time_series_type_str(), # type: ignore
)

@staticmethod
def get_time_series_data_type() -> Type:
return NonSequentialTimeSeries


class NonSequentialTimeSeriesMetadata(NonSequentialTimeSeriesMetadataBase):
"""Defines the metadata for a NonSequentialTimeSeries."""

type: Literal["NonSequentialTimeSeries"] = "NonSequentialTimeSeries"

@staticmethod
def get_time_series_type_str() -> str:
return "NonSequentialTimeSeries"


class TimeSeriesKey(InfraSysBaseModel):
"""Base class for time series keys."""

variable_name: str
initial_time: datetime
resolution: timedelta
time_series_type: Type[SingleTimeSeries]
time_series_type: Type[TimeSeriesData]
user_attributes: dict[str, Any] = {}


class SingleTimeSeriesKey(TimeSeriesKey):
"""Keys for SingleTimeSeries."""

length: int
initial_time: datetime
resolution: timedelta


class NonSequentialTimeSeriesKey(TimeSeriesKey):
"""Keys for SingleTimeSeries."""

length: int


class DatabaseConnection(InfraSysBaseModel):
2 changes: 1 addition & 1 deletion src/infrasys/value_curves.py
Original file line number Diff line number Diff line change
@@ -221,7 +221,7 @@ def to_input_output(

xs = self.function_data.x_coords
ys = np.multiply(xs[1:], self.function_data.y_coords).tolist()
ys.insert(0, c)
ys.insert(0, c) # type:ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pesap Do you know why we need this?


return InputOutputCurve(
function_data=PiecewiseLinearData(
19 changes: 18 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
from loguru import logger

from infrasys.location import Location
from infrasys.time_series_models import SingleTimeSeries
from infrasys.time_series_models import SingleTimeSeries, NonSequentialTimeSeries
from .models.simple_system import SimpleSystem, SimpleBus, SimpleGenerator, SimpleSubsystem


@@ -35,6 +35,23 @@ def simple_system_with_time_series(simple_system) -> SimpleSystem:
return simple_system


@pytest.fixture
def simple_system_with_nonsequential_time_series(simple_system) -> SimpleSystem:
"""Creates a system with time series data."""
variable_name = "active_power"
length = 10
df = range(length)
timestamps = [
datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(length)
]
ts = NonSequentialTimeSeries.from_array(
data=df, variable_name=variable_name, timestamps=timestamps
)
gen = simple_system.get_component(SimpleGenerator, "test-gen")
simple_system.add_time_series(ts, gen)
return simple_system


@pytest.fixture(autouse=True)
def propagate_logs():
"""Enable logging for the package"""
130 changes: 125 additions & 5 deletions tests/test_arrow_storage.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,11 @@
from infrasys.arrow_storage import ArrowTimeSeriesStorage
from infrasys.in_memory_time_series_storage import InMemoryTimeSeriesStorage
from infrasys.system import System
from infrasys.time_series_models import SingleTimeSeries, TimeSeriesStorageType
from infrasys.time_series_models import (
SingleTimeSeries,
NonSequentialTimeSeries,
TimeSeriesStorageType,
)

from .models.simple_system import SimpleSystem, SimpleBus, SimpleGenerator

@@ -24,7 +28,7 @@ def test_system() -> System:
return system


def test_file_creation(test_system: System):
def test_file_creation_with_single_time_series(test_system: System):
gen1 = test_system.get_component(SimpleGenerator, "gen1")
ts = SingleTimeSeries.from_array(
data=range(8784),
@@ -41,7 +45,26 @@ def test_file_creation(test_system: System):
assert time_series_fpath.exists()


def test_copy_files(tmp_path):
def test_file_creation_with_nonsequential_time_series(test_system: System):
gen1 = test_system.get_component(SimpleGenerator, "gen1")
timestamps = [
datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(10)
]
ts = NonSequentialTimeSeries.from_array(
data=range(10),
timestamps=timestamps,
variable_name="active_power",
)
test_system.time_series.add(ts, gen1, scenario="one", model_year="2030")
time_series = test_system.time_series.get(gen1, time_series_type=NonSequentialTimeSeries)
assert isinstance(test_system.time_series.storage, ArrowTimeSeriesStorage)
base_directory = test_system.get_time_series_directory()
assert isinstance(base_directory, Path)
time_series_fpath = base_directory.joinpath(str(time_series.uuid) + ".arrow")
assert time_series_fpath.exists()


def test_copy_files_with_single_time_series(tmp_path):
"""Test that we can copy the time series from tmp to specified folder"""
system = SimpleSystem()
bus = SimpleBus(name="test-bus", voltage=1.1)
@@ -68,7 +91,36 @@ def test_copy_files(tmp_path):
assert time_series_fpath.exists()


def test_read_deserialize_time_series(tmp_path):
def test_copy_files_with_nonsequential_timeseries(tmp_path):
"""Test that we can copy the time series from tmp to specified folder"""
system = SimpleSystem()
bus = SimpleBus(name="test-bus", voltage=1.1)
gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True)
system.add_components(bus, gen1)
timestamps = [
datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(10)
]
ts = NonSequentialTimeSeries.from_array(
data=range(10),
timestamps=timestamps,
variable_name="active_power",
)
system.time_series.add(ts, gen1, scenario="two", model_year="2030")
filename = tmp_path / "system.json"
system.to_json(filename)

logger.info("Starting deserialization")
system2 = SimpleSystem.from_json(filename)
gen1b = system2.get_component(SimpleGenerator, gen1.name)
time_series = system2.time_series.get(gen1b, time_series_type=NonSequentialTimeSeries)
time_series_fpath = (
tmp_path / system2.get_time_series_directory() / (str(time_series.uuid) + ".arrow")
)

assert time_series_fpath.exists()


def test_read_deserialize_single_time_series(tmp_path):
"""Test that we can read from a deserialized system."""
system = SimpleSystem()
bus = SimpleBus(name="test-bus", voltage=1.1)
@@ -96,7 +148,37 @@ def test_read_deserialize_time_series(tmp_path):
assert np.array_equal(deserialize_ts.data, np.array(range(length)))


def test_copied_storage_system(simple_system_with_time_series):
def test_read_deserialize_nonsequential_time_series(tmp_path):
"""Test that we can read from a deserialized system."""
system = SimpleSystem()
bus = SimpleBus(name="test-bus", voltage=1.1)
gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True)
system.add_components(bus, gen1)
timestamps = [
datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(10)
]
ts = NonSequentialTimeSeries.from_array(
data=range(10),
timestamps=timestamps,
variable_name="active_power",
)
system.time_series.add(ts, gen1, scenario="high", model_year="2030")
filename = tmp_path / "system.json"
system.to_json(filename)

system2 = SimpleSystem.from_json(filename, time_series_directory=tmp_path)
gen1b = system2.get_component(SimpleGenerator, gen1.name)
deserialize_ts = system2.time_series.get(gen1b, time_series_type=NonSequentialTimeSeries)
assert isinstance(deserialize_ts, NonSequentialTimeSeries)
assert isinstance(deserialize_ts.data, np.ndarray)
assert isinstance(deserialize_ts.timestamps, np.ndarray)
length = ts.length
assert isinstance(length, int)
assert np.array_equal(deserialize_ts.data, np.array(range(length)))
assert np.array_equal(deserialize_ts.timestamps, np.array(timestamps))


def test_copied_storage_system_single_time_series(simple_system_with_time_series):
assert isinstance(
simple_system_with_time_series._time_series_mgr._storage, ArrowTimeSeriesStorage
)
@@ -114,3 +196,41 @@ def test_copied_storage_system(simple_system_with_time_series):

data_array_2 = simple_system_with_time_series.list_time_series(gen_component)[0].data
assert np.array_equal(data_array_1, data_array_2)


def test_copied_storage_system_nonsequential_time_series(
simple_system_with_nonsequential_time_series,
):
assert isinstance(
simple_system_with_nonsequential_time_series._time_series_mgr._storage,
ArrowTimeSeriesStorage,
)
gen_component = next(
simple_system_with_nonsequential_time_series.get_components(SimpleGenerator)
)

data_array_1 = simple_system_with_nonsequential_time_series.list_time_series(
gen_component, time_series_type=NonSequentialTimeSeries
)[0].data
timestamps_array_1 = simple_system_with_nonsequential_time_series.list_time_series(
gen_component, time_series_type=NonSequentialTimeSeries
)[0].timestamps

simple_system_with_nonsequential_time_series.convert_storage(
time_series_type=NonSequentialTimeSeries,
time_series_storage_type=TimeSeriesStorageType.MEMORY,
)

assert isinstance(
simple_system_with_nonsequential_time_series._time_series_mgr._storage,
InMemoryTimeSeriesStorage,
)

data_array_2 = simple_system_with_nonsequential_time_series.list_time_series(
gen_component, time_series_type=NonSequentialTimeSeries
)[0].data
timestamps_array_2 = simple_system_with_nonsequential_time_series.list_time_series(
gen_component, time_series_type=NonSequentialTimeSeries
)[0].timestamps
assert np.array_equal(data_array_1, data_array_2)
assert np.array_equal(timestamps_array_1, timestamps_array_2)
63 changes: 60 additions & 3 deletions tests/test_in_memory_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from infrasys.chronify_time_series_storage import ChronifyTimeSeriesStorage
from .models.simple_system import SimpleSystem, SimpleBus, SimpleGenerator
from infrasys.time_series_models import SingleTimeSeries, TimeSeriesStorageType
from infrasys.time_series_models import (
SingleTimeSeries,
NonSequentialTimeSeries,
TimeSeriesStorageType,
)
from infrasys.exceptions import ISAlreadyAttached
from infrasys.arrow_storage import ArrowTimeSeriesStorage
from infrasys.in_memory_time_series_storage import InMemoryTimeSeriesStorage
@@ -44,7 +48,9 @@
),
],
)
def test_convert_storage_time_series(original_kwargs, new_kwargs, original_stype, new_stype):
def test_convert_storage_single_time_series(
original_kwargs, new_kwargs, original_stype, new_stype
):
test_bus = SimpleBus.example()
test_generator = SimpleGenerator.example()
system = SimpleSystem(auto_add_composed_components=True, **original_kwargs)
@@ -67,9 +73,60 @@ def test_convert_storage_time_series(original_kwargs, new_kwargs, original_stype

system.convert_storage(**new_kwargs)

assert isinstance(system.time_series.storage, new_stype)
assert isinstance(system._time_series_mgr._storage, new_stype)

ts2 = system.get_time_series(
test_generator, time_series_type=SingleTimeSeries, variable_name="load"
)
assert np.array_equal(ts2.data_array, test_time_series_data.data_array)


@pytest.mark.parametrize(
"original_kwargs,new_kwargs,original_stype,new_stype",
[
(
{"time_series_storage_type": TimeSeriesStorageType.MEMORY},
{},
InMemoryTimeSeriesStorage,
ArrowTimeSeriesStorage,
),
(
{},
{"time_series_storage_type": TimeSeriesStorageType.MEMORY},
ArrowTimeSeriesStorage,
InMemoryTimeSeriesStorage,
),
],
)
def test_convert_storage_nonsequential_time_series(
original_kwargs, new_kwargs, original_stype, new_stype
):
test_bus = SimpleBus.example()
test_generator = SimpleGenerator.example()
system = SimpleSystem(auto_add_composed_components=True, **original_kwargs)

assert isinstance(system._time_series_mgr._storage, original_stype)

system.get_components()
system.add_components(test_bus)
system.add_components(test_generator)

timestamps = np.array(
[datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(24)],
)
test_time_series_data = NonSequentialTimeSeries(
data=np.arange(24),
timestamps=timestamps,
variable_name="load",
)
system.add_time_series(test_time_series_data, test_generator)
with pytest.raises(ISAlreadyAttached):
system.add_time_series(test_time_series_data, test_generator)
system.convert_storage(**new_kwargs)

assert isinstance(system._time_series_mgr._storage, new_stype)
ts2 = system.get_time_series(
test_generator, time_series_type=NonSequentialTimeSeries, variable_name="load"
)
assert np.array_equal(ts2.data_array, test_time_series_data.data_array)
assert np.array_equal(ts2.timestamps, test_time_series_data.timestamps)
134 changes: 134 additions & 0 deletions tests/test_nonsequential_time_series.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""Test related to arrow storage module."""

from datetime import datetime, timedelta

import pytest
import numpy as np

from infrasys.normalization import NormalizationMax
from infrasys.quantities import ActivePower
from infrasys.time_series_models import NonSequentialTimeSeries


@pytest.fixture(name="timestamps")
def sample_timestamps():
"Sample timestamps sequence"
base_datetime = datetime(year=2020, month=1, day=1)
return [base_datetime + timedelta(hours=4 * i) for i in range(4)]


@pytest.fixture(name="quantity_data")
def sample_quantity_data():
"Sample infrasys quantity data"
return ActivePower(range(4), "kilowatts")


@pytest.fixture(name="data")
def sample_data():
"Sample data sequence"
return range(4)


@pytest.fixture(name="variable_name")
def sample_variable_name():
"Sample variable name"
return "active_power"


def test_nonsequential_time_series_attributes(data, timestamps, variable_name):
"Test NOnSequentialTimeseries with Infrasys Quantities as data"
length = 4
ts = NonSequentialTimeSeries.from_array(
data=data,
variable_name=variable_name,
timestamps=timestamps,
)
assert isinstance(ts, NonSequentialTimeSeries)
assert ts.length == length
assert isinstance(ts.data, np.ndarray)
assert isinstance(ts.timestamps, np.ndarray)


def test_invalid_sequence_length(data, timestamps, variable_name):
"""Check that time series has at least 2 elements."""
with pytest.raises(ValueError, match="length must be at least 2"):
NonSequentialTimeSeries.from_array(
data=[data[0]], variable_name=variable_name, timestamps=[timestamps[0]]
)


def test_duplicate_timestamps(data, variable_name):
"""Check that time series has unique timestamps"""
timestamps = [
datetime(2020, 5, 17),
datetime(2020, 5, 17),
datetime(2020, 5, 18),
datetime(2020, 5, 20),
]
with pytest.raises(ValueError, match="Timestamps must be unique"):
NonSequentialTimeSeries.from_array(
data=data, variable_name=variable_name, timestamps=timestamps
)


def test_chronological_timestamps(data, variable_name):
"""Check that time series has unique timestamps"""
timestamps = [
datetime(2020, 6, 17),
datetime(2020, 5, 17),
datetime(2020, 5, 18),
datetime(2020, 5, 20),
]
with pytest.raises(ValueError, match="chronological order"):
NonSequentialTimeSeries.from_array(
data=data, variable_name=variable_name, timestamps=timestamps
)


def test_nonsequential_time_series_attributes_with_quantity(
quantity_data, timestamps, variable_name
):
"Test NonSequentialTimeseries with Infrasys Quantities as data"
length = 4

ts = NonSequentialTimeSeries.from_array(
data=quantity_data,
variable_name=variable_name,
timestamps=timestamps,
)
assert isinstance(ts, NonSequentialTimeSeries)
assert ts.length == length
assert isinstance(ts.data, ActivePower)
assert isinstance(ts.timestamps, np.ndarray)


def test_normalization(data, timestamps, variable_name):
"Test normalization approach on sample data for NonSequentialTimeSeries"
length = 4
max_val = data[-1]
ts = NonSequentialTimeSeries.from_array(
data=data,
timestamps=timestamps,
variable_name=variable_name,
normalization=NormalizationMax(),
)
assert isinstance(ts, NonSequentialTimeSeries)
assert ts.length == length
for i, val in enumerate(ts.data):
assert val == data[i] / max_val


def test_normalization_quantity(quantity_data, timestamps, variable_name):
"Test normalization approach on sample quantity data for NonSequentialTimeSeries"
length = 4
max_val = quantity_data.magnitude[-1]
ts = NonSequentialTimeSeries.from_array(
data=quantity_data,
timestamps=timestamps,
variable_name=variable_name,
normalization=NormalizationMax(),
)
assert isinstance(ts, NonSequentialTimeSeries)
assert ts.length == length
for i, val in enumerate(ts.data):
assert val == quantity_data.magnitude[i] / max_val
99 changes: 91 additions & 8 deletions tests/test_serialization.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import random
import os
from datetime import datetime, timedelta
from typing import Type

import numpy as np
from numpy._typing import NDArray
@@ -11,12 +12,12 @@
from pydantic import WithJsonSchema
from typing_extensions import Annotated

from infrasys import Location, SingleTimeSeries
from infrasys import Location, SingleTimeSeries, NonSequentialTimeSeries
from infrasys.component import Component
from infrasys.quantities import Distance, ActivePower
from infrasys.exceptions import ISOperationNotAllowed
from infrasys.normalization import NormalizationMax
from infrasys.time_series_models import TimeSeriesStorageType
from infrasys.time_series_models import TimeSeriesStorageType, TimeSeriesData
from .models.simple_system import (
SimpleSystem,
SimpleBus,
@@ -30,6 +31,12 @@
TimeSeriesStorageType.MEMORY,
)

# chronify not yet implemented for nonsequentialtimeseries
TS_STORAGE_OPTIONS_NONSEQUENTIAL = (
TimeSeriesStorageType.ARROW,
TimeSeriesStorageType.MEMORY,
)


class ComponentWithPintQuantity(Component):
"""Test component with a container of quantities."""
@@ -86,7 +93,7 @@ def test_serialization(tmp_path):


@pytest.mark.parametrize("time_series_storage_type", TS_STORAGE_OPTIONS)
def test_serialize_time_series(tmp_path, time_series_storage_type):
def test_serialize_single_time_series(tmp_path, time_series_storage_type):
system = SimpleSystem(time_series_storage_type=time_series_storage_type)
bus = SimpleBus(name="test-bus", voltage=1.1)
gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True)
@@ -114,6 +121,8 @@ def check_deserialize_with_read_only_time_series(
gen2_name: str,
variable_name: str,
expected_ts_data: NDArray | pint.Quantity,
expected_ts_timestamps: NDArray | None = None,
time_series_type: Type[TimeSeriesData] = SingleTimeSeries,
):
system = SimpleSystem.from_json(filename, time_series_read_only=True)
system_ts_dir = system.get_time_series_directory()
@@ -123,8 +132,46 @@ def check_deserialize_with_read_only_time_series(
with pytest.raises(ISOperationNotAllowed):
system.remove_time_series(gen1b, variable_name=variable_name)

ts2 = system.get_time_series(gen1b, variable_name=variable_name)
ts2 = system.get_time_series(
gen1b, time_series_type=time_series_type, variable_name=variable_name
)
assert np.array_equal(ts2.data, expected_ts_data)
if expected_ts_timestamps is not None:
assert np.array_equal(ts2.timestamps, expected_ts_timestamps)


@pytest.mark.parametrize("time_series_storage_type", TS_STORAGE_OPTIONS_NONSEQUENTIAL)
def test_serialize_nonsequential_time_series(tmp_path, time_series_storage_type):
"Test serialization of NonSequentialTimeSeries"
system = SimpleSystem(time_series_storage_type=time_series_storage_type)
bus = SimpleBus(name="test-bus", voltage=1.1)
gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True)
gen2 = SimpleGenerator(name="gen2", active_power=1.0, rating=1.0, bus=bus, available=True)
system.add_components(bus, gen1, gen2)

variable_name = "active_power"
length = 10
data = range(length)
timestamps = [
datetime(year=2030, month=1, day=1) + timedelta(seconds=5 * i) for i in range(length)
]
ts = NonSequentialTimeSeries.from_array(
data=data, variable_name=variable_name, timestamps=timestamps
)
system.add_time_series(ts, gen1, gen2, scenario="high", model_year="2030")
filename = tmp_path / "system.json"
system.to_json(filename)

check_deserialize_with_read_write_time_series(filename)
check_deserialize_with_read_only_time_series(
filename,
gen1.name,
gen2.name,
variable_name,
ts.data,
ts.timestamps,
time_series_type=NonSequentialTimeSeries,
)


def check_deserialize_with_read_write_time_series(filename):
@@ -161,7 +208,7 @@ def test_serialize_quantity(tmp_path, distance):
assert c2.distance == c1.distance


def test_with_time_series_quantity(tmp_path):
def test_with_single_time_series_quantity(tmp_path):
"""Test serialization of SingleTimeSeries with a Pint quantity."""
system = SimpleSystem(auto_add_composed_components=True)
gen = SimpleGenerator.example()
@@ -179,7 +226,9 @@ def test_with_time_series_quantity(tmp_path):

system2 = SimpleSystem.from_json(sys_file)
gen2 = system2.get_component(SimpleGenerator, gen.name)
ts2 = system2.get_time_series(gen2, variable_name=variable_name)
ts2 = system2.get_time_series(
gen2, time_series_type=SingleTimeSeries, variable_name=variable_name
)
assert isinstance(ts, SingleTimeSeries)
assert ts.length == length
assert ts.resolution == resolution
@@ -188,8 +237,40 @@ def test_with_time_series_quantity(tmp_path):
assert np.array_equal(ts2.data.magnitude, np.array(range(length)))


def test_with_nonsequential_time_series_quantity(tmp_path):
"""Test serialization of SingleTimeSeries with a Pint quantity."""
system = SimpleSystem(auto_add_composed_components=True)
gen = SimpleGenerator.example()
system.add_components(gen)
length = 10
data = ActivePower(range(length), "watts")
variable_name = "active_power"
timestamps = [
datetime(year=2030, month=1, day=1) + timedelta(seconds=100 * i) for i in range(10)
]
ts = NonSequentialTimeSeries.from_array(
data=data, variable_name=variable_name, timestamps=timestamps
)
system.add_time_series(ts, gen)

sys_file = tmp_path / "system.json"
system.to_json(sys_file)

system2 = SimpleSystem.from_json(sys_file)
gen2 = system2.get_component(SimpleGenerator, gen.name)
ts2 = system2.get_time_series(
gen2, time_series_type=NonSequentialTimeSeries, variable_name=variable_name
)
assert isinstance(ts, NonSequentialTimeSeries)
assert ts.length == length
assert isinstance(ts2.data.magnitude, np.ndarray)
assert isinstance(ts2.timestamps, np.ndarray)
assert np.array_equal(ts2.data.magnitude, np.array(range(length)))
assert np.array_equal(ts2.timestamps, np.array(timestamps))


@pytest.mark.parametrize("storage_type", TS_STORAGE_OPTIONS)
def test_system_with_time_series_normalization(tmp_path, storage_type):
def test_system_with_single_time_series_normalization(tmp_path, storage_type):
system = SimpleSystem(
name="test-system",
auto_add_composed_components=True,
@@ -211,7 +292,9 @@ def test_system_with_time_series_normalization(tmp_path, storage_type):

system2 = SimpleSystem.from_json(filename)
gen2 = system2.get_component(SimpleGenerator, gen.name)
ts2 = system2.get_time_series(gen2, variable_name=variable_name)
ts2 = system2.get_time_series(
gen2, time_series_type=SingleTimeSeries, variable_name=variable_name
)
assert ts2.normalization.max_value == length - 1


11 changes: 8 additions & 3 deletions tests/test_system.py
Original file line number Diff line number Diff line change
@@ -205,7 +205,7 @@ def check_attached_components(my_sys, parent_type, child_type):
system2.get_component(SimpleBus, gen.bus.name)


def test_time_series_attach_from_array():
def test_single_time_series_attach_from_array():
system = SimpleSystem()
bus = SimpleBus(name="test-bus", voltage=1.1)
gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True)
@@ -220,10 +220,15 @@ def test_time_series_attach_from_array():
system.add_time_series(ts, gen1, gen2)
assert system.has_time_series(gen1, variable_name=variable_name)
assert system.has_time_series(gen2, variable_name=variable_name)
assert np.array_equal(system.get_time_series(gen1, variable_name=variable_name).data, ts.data)
assert np.array_equal(
system.get_time_series(
gen1, time_series_type=SingleTimeSeries, variable_name=variable_name
).data,
ts.data,
)


def test_time_series():
def test_single_time_series():
system = SimpleSystem()
bus = SimpleBus(name="test-bus", voltage=1.1)
gen1 = SimpleGenerator(name="gen1", active_power=1.0, rating=1.0, bus=bus, available=True)