diff --git a/esoreader.py b/esoreader.py index aa7b1b3..b511e7c 100644 --- a/esoreader.py +++ b/esoreader.py @@ -5,8 +5,11 @@ # Sustainable Architecture and Building Technologies (Suat) at the Institute of # Technology in Architecture, ETH Zuerich. See http://suat.arch.ethz.ch for # more information. +from typing import Optional -''' +import pandas as pd + +r""" esoreader.py A python module for reading \*.eso files generated by EnergyPlus @@ -49,158 +52,176 @@ 'Zone Ventilation Total Heat Loss Energy')[0] idx = dd.index[frequency, key, variable] time_series = data[idx] -''' - - -def read(eso_file_path): - """Read in an .eso file and return the data dictionary and a dictionary - representing the data. - NOTE: this function is here for backward compatibilty reasons. Use - read_from_path() instead to obtain an EsoFile object. - """ - eso = read_from_path(eso_file_path) - return eso.dd, eso.data - +""" -def read_from_path(eso_file_path): - """ - read in a .eso file and return an EsoFile object that can be used - to read in pandas DataFrame and Series objects. - """ - with open(eso_file_path, 'r') as eso_file: - eso = EsoFile(eso_file) - return eso - -class DataDictionary(object): - def __init__(self, version=None, timestamp=None): - ''' +class DataDictionary: + def __init__(self, version: Optional[str] = None, timestamp: Optional[str] = None): + """ variables = dict of ids, int => [reporting_frequency, key, variable, unit] index = dict {(key, variable, reporting_frequency) => id)} - ''' - self.version = version - self.timestamp = timestamp - self.variables = {} - self.index = {} + """ + self.version: Optional[str] = version + self.timestamp: Optional[str] = timestamp + self.variables: dict[int, list[Optional[str]]] = {} + self.index: dict[tuple[str, Optional[str], str], int] = {} def build_index(self): - """builds a reverse index for finding ids. - """ - for id, value in self.variables.items(): - reporting_frequency, key, variable, unit = value - self.index[reporting_frequency, key, variable] = id + """builds a reverse index for finding ids.""" + for var_id, (frequency, key, name, unit) in self.variables.items(): + self.index[(frequency, key, name)] = var_id - def find_variable(self, search): + def find_variable(self, search: str) -> list[tuple[str, Optional[str], str]]: """returns the coordinates (timestep, key, variable_name) in the data dictionary that can be used to find an index. The search is case insensitive.""" - return [(timestep, key, variable_name) - for timestep, key, variable_name in self.index.keys() - if search.lower() in variable_name.lower()] + return [ + (freq, key, name) + for (freq, key, name) in self.index.keys() + if search.lower() in name.lower() + ] + + def __repr__(self) -> str: + return f"" -class EsoFile(object): +class EsoFile: def __init__(self, eso_file): self.eso_file = eso_file self.dd = self._read_data_dictionary() self.dd.build_index() - self.data = self._read_data() + self.data: dict[int, list[float]] = self._read_data() - def find_variable(self, search, key=None, frequency='TimeStep'): + def find_variable( + self, search: str, key: Optional[str] = None, frequency: str = "TimeStep" + ) -> list[tuple[str, Optional[str], str]]: """returns the coordinates (timestep, key, variable_name) in the data dictionary that can be used to find an index. The search is case insensitive and need only be specified partially.""" variables = self.dd.find_variable(search) - variables = [v for v in variables - if v[0].lower() == frequency.lower()] + variables = [v for v in variables if v[0].lower() == frequency.lower()] if key: - variables = [v for v in variables - if v[1].lower() == key.lower()] + variables = [v for v in variables if v[1] and v[1].lower() == key.lower()] return variables - def to_frame(self, search, key=None, frequency='TimeStep', index=None, use_key_for_columns=True): + def to_frame( + self, + search: str, + key: Optional[str] = None, + frequency: str = "TimeStep", + index: Optional[list] = None, + use_key_for_columns: bool = True, + ) -> pd.DataFrame: """ creates a pandas DataFrame objects with a column for every variable that matches the search pattern and key. An None key matches all keys. NOTE: The frequency *has* to be the same for all variables selected. (uses find_variable to select the variables) """ - from pandas import DataFrame variables = self.find_variable(search, key=key, frequency=frequency) if use_key_for_columns: data = {v[1]: self.data[self.dd.index[v]] for v in variables} else: - # use variable name as column name data = {v[2]: self.data[self.dd.index[v]] for v in variables} - df = DataFrame(data) + df = pd.DataFrame(data) if index is not None: df.index = index return df - def _read_reporting_frequency(self, line): - reporting_frequency = None - if '! ' in line: - line = line.split('! ')[0] - if ' !' in line: - line, reporting_frequency = line.split(' !') - # RunPeriod contains more stuff (" [Value,Min,Month,Day,Hour, - # Minute, Max,Month,Day,Hour,Minute]")split it off - reporting_frequency = reporting_frequency.split()[0] - return line, reporting_frequency - - def _read_variable_unit(self, variable): - unit = None - if '[' in variable: - variable, unit = variable.split('[') - unit = unit[:-1] # remove ']' at the end - variable = variable.strip() - return variable, unit - - def _read_data_dictionary(self): + def to_series( + self, search: str, key: Optional[str] = None, frequency: str = "TimeStep" + ) -> pd.Series: + """ + Returns a pandas Series for the first variable matching the search. + """ + variables = self.find_variable(search, key=key, frequency=frequency) + if not variables: + raise ValueError(f"No variable found for search: {search}") + var = variables[0] + return pd.Series(self.data[self.dd.index[var]]) + + def _read_reporting_frequency(self, line: str) -> tuple[str, Optional[str]]: + if "! " in line: + line = line.split("! ")[0] + if " !" in line: + line, freq = line.split(" !") + freq = freq.split()[0] + return line, freq + return line, None + + def _read_variable_unit(self, variable: str) -> tuple[str, Optional[str]]: + if "[" in variable: + variable, unit = variable.split("[") + return variable.strip(), unit.rstrip("]") + return variable, None + + def _read_data_dictionary(self) -> DataDictionary: """parses the head of the eso_file, returning the data dictionary. the file object eso_file is advanced to the position needed by read_data. """ - version, timestamp = [s.strip() for s - in self.eso_file.readline().split(',')[-2:]] + version, timestamp = [ + s.strip() for s in self.eso_file.readline().split(",")[-2:] + ] dd = DataDictionary(version, timestamp) line = self.eso_file.readline().strip() - while line != 'End of Data Dictionary': - line, reporting_frequency = self._read_reporting_frequency(line) - if reporting_frequency: - fields = [f.strip() for f in line.split(',')] - if len(fields) >= 4: - id, nfields, key, variable = fields[:4] - else: - id, nfields, variable = fields[:3] - key = None - variable, unit = self._read_variable_unit(variable) - dd.variables[int(id)] = [reporting_frequency, key, - variable, unit] - else: - # ignore the lines that aren't report variables - pass + while line != "End of Data Dictionary": + line, freq = self._read_reporting_frequency(line) + if freq: + fields = [f.strip() for f in line.split(",")] + try: + if len(fields) >= 4: + var_id, _, key, name = fields[:4] + else: + var_id, _, name = fields[:3] + key = None + name, unit = self._read_variable_unit(name) + dd.variables[int(var_id)] = [freq, key, name, unit] + except ValueError: + pass # skip malformed lines line = self.eso_file.readline().strip() dd.ids = set(dd.variables.keys()) return dd - def _read_data(self): - '''parse the data from the .eso file returning, + def _read_data(self) -> dict[int, list[float]]: + """parse the data from the .eso file returning, NOTE: eso_file should be the same file object that was passed to - read_data_dictionary(eso_file) to obtain dd.''' - data = {} # id => [value] - for id in self.dd.variables.keys(): - data[id] = [] + read_data_dictionary(eso_file) to obtain dd.""" + data = {var_id: [] for var_id in self.dd.variables} for line in self.eso_file: - if line.startswith('End of Data'): + if line.startswith("End of Data"): break - fields = [f.strip() for f in line.split(',')] - id = int(fields[0]) - if id not in self.dd.ids: - # skip entries that are not output:variables + fields = [f.strip() for f in line.split(",")] + try: + var_id = int(fields[0]) + if var_id in self.dd.variables: + data[var_id].append(float(fields[1])) + except (ValueError, IndexError): continue - data[id].append(float(fields[1])) return data + + def __repr__(self) -> str: + return f"" + + +def read_from_path(eso_file_path: str) -> EsoFile: + """ + read in a .eso file and return an EsoFile object that can be used + to read in pandas DataFrame and Series objects. + """ + with open(eso_file_path, "r") as eso_file: + eso = EsoFile(eso_file) + return eso + + +def read(eso_file_path: str) -> tuple[DataDictionary, dict[int, list[float]]]: + """Read in an .eso file and return the data dictionary and a dictionary + representing the data. + NOTE: this function is here for backward compatibilty reasons. Use + read_from_path() instead to obtain an EsoFile object. + """ + eso = read_from_path(eso_file_path) + return eso.dd, eso.data diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/mock_eplusout.eso b/tests/mock_eplusout.eso new file mode 100644 index 0000000..f9548e2 --- /dev/null +++ b/tests/mock_eplusout.eso @@ -0,0 +1,6 @@ +Version,Timestamp,1.0,2025-09-26 +1,4,Zone1,Zone Ventilation Total Heat Loss Energy [J] !TimeStep +End of Data Dictionary +1,100.0 +1,200.0 +End of Data diff --git a/tests/test_eso_reader.py b/tests/test_eso_reader.py new file mode 100644 index 0000000..39800a7 --- /dev/null +++ b/tests/test_eso_reader.py @@ -0,0 +1,113 @@ +import pandas as pd +import pytest + +from esoreader import read_from_path + + +@pytest.fixture +def eso(): + return read_from_path("tests/mock_eplusout.eso") + + +def test_data_dictionary_parsing(eso): + assert len(eso.dd.variables) == 1 + assert eso.dd.variables[1][2] == "Zone Ventilation Total Heat Loss Energy" + assert eso.dd.variables[1][0] == "TimeStep" + + +def test_data_values(eso): + assert eso.data[1] == [100.0, 200.0] + + +def test_find_variable(eso): + result = eso.find_variable("heat loss") + assert len(result) == 1 + assert result[0][1] == "Zone1" + + +def test_to_frame_output(eso): + df = eso.to_frame("heat loss") + assert df.shape == (2, 1) + assert df.columns[0] == "Zone1" + assert df.iloc[0, 0] == 100.0 + assert df.iloc[1, 0] == 200.0 + + +def test_variable_unit_parsing(eso): + # Check that the unit was correctly parsed from the variable name + assert eso.dd.variables[1][3] == "J" + + +def test_find_variable_case_insensitive(eso): + result = eso.find_variable("HEAT LOSS") + assert len(result) == 1 + assert result[0][2] == "Zone Ventilation Total Heat Loss Energy" + + +def test_find_variable_wrong_key(eso): + result = eso.find_variable("heat loss", key="ZoneX") + assert result == [] + + +def test_find_variable_wrong_frequency(eso): + result = eso.find_variable("heat loss", frequency="Hourly") + assert result == [] + + +def test_to_frame_use_variable_name(eso): + df = eso.to_frame("heat loss", use_key_for_columns=False) + assert "Zone Ventilation Total Heat Loss Energy" in df.columns[0] + + +def test_to_frame_with_custom_index(eso): + df = eso.to_frame("heat loss", index=["t1", "t2"]) + assert list(df.index) == ["t1", "t2"] + + +def test_data_dictionary_build_index(eso): + # Confirm that the reverse index was built correctly + key = ("TimeStep", "Zone1", "Zone Ventilation Total Heat Loss Energy") + assert eso.dd.index[key] == 1 + + +def test_reading_version_and_timestamp(eso): + assert eso.dd.version == "1.0" + assert eso.dd.timestamp == "2025-09-26" + + +def test_data_dictionary_find_variable_partial_match(eso): + matches = eso.dd.find_variable("Ventilation") + assert len(matches) == 1 + assert matches[0][2] == "Zone Ventilation Total Heat Loss Energy" + + +def test_data_dictionary_find_variable_no_match(eso): + matches = eso.dd.find_variable("Nonexistent Variable") + assert matches == [] + + +def test_to_series_output(eso): + series = eso.to_series("heat loss") + assert isinstance(series, pd.Series) + assert series.iloc[0] == 100.0 + assert series.iloc[1] == 200.0 + + +def test_to_series_with_key(eso): + series = eso.to_series("heat loss", key="Zone1") + assert series.tolist() == [100.0, 200.0] + + +def test_to_series_case_insensitive(eso): + series = eso.to_series("HEAT LOSS") + assert series.iloc[0] == 100.0 + + +def test_to_series_wrong_key(eso): + with pytest.raises(ValueError, match="No variable found for search: heat loss"): + eso.to_series("heat loss", key="ZoneX") + + +def test_to_series_wrong_frequency(eso): + with pytest.raises(ValueError, match="No variable found for search: heat loss"): + eso.to_series("heat loss", frequency="Hourly") diff --git a/tests/test_esoreader_unittest.py b/tests/test_esoreader_unittest.py new file mode 100644 index 0000000..aa3b034 --- /dev/null +++ b/tests/test_esoreader_unittest.py @@ -0,0 +1,37 @@ +import unittest +from io import StringIO + +from esoreader import EsoFile + + +class TestEsoReader(unittest.TestCase): + + def setUp(self): + self.mock_eso_content = StringIO( + "Version,Timestamp,1.0,2025-09-26\n" + "1,4,Zone1,Zone Ventilation Total Heat Loss Energy [J] !TimeStep\n" + "End of Data Dictionary\n" + "1,100.0\n" + "1,200.0\n" + "End of Data\n" + ) + self.eso = EsoFile(self.mock_eso_content) + + def test_data_dictionary_parsing(self): + self.assertEqual(len(self.eso.dd.variables), 1) + self.assertIn( + ( + self.eso.dd.variables[1][0], + self.eso.dd.variables[1][1], + self.eso.dd.variables[1][2], + ), + self.eso.dd.index, + ) + + def test_data_parsing(self): + self.assertEqual(self.eso.data[1], [100.0, 200.0]) + + def test_find_variable(self): + result = self.eso.find_variable("heat loss") + self.assertEqual(len(result), 1) + self.assertEqual(result[0][1], "Zone1")