Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 120 additions & 99 deletions esoreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
# Sustainable Architecture and Building Technologies (Suat) at the Institute of
# Technology in Architecture, ETH Zuerich. See http://suat.arch.ethz.ch for
# more information.
from typing import Optional

'''
import pandas as pd

r"""
esoreader.py

A python module for reading \*.eso files generated by EnergyPlus
Expand Down Expand Up @@ -49,158 +52,176 @@
'Zone Ventilation Total Heat Loss Energy')[0]
idx = dd.index[frequency, key, variable]
time_series = data[idx]
'''


def read(eso_file_path):
"""Read in an .eso file and return the data dictionary and a dictionary
representing the data.
NOTE: this function is here for backward compatibilty reasons. Use
read_from_path() instead to obtain an EsoFile object.
"""
eso = read_from_path(eso_file_path)
return eso.dd, eso.data

"""

def read_from_path(eso_file_path):
"""
read in a .eso file and return an EsoFile object that can be used
to read in pandas DataFrame and Series objects.
"""
with open(eso_file_path, 'r') as eso_file:
eso = EsoFile(eso_file)
return eso


class DataDictionary(object):
def __init__(self, version=None, timestamp=None):
'''
class DataDictionary:
def __init__(self, version: Optional[str] = None, timestamp: Optional[str] = None):
"""
variables = dict of ids, int => [reporting_frequency,
key, variable, unit]

index = dict {(key, variable, reporting_frequency) => id)}
'''
self.version = version
self.timestamp = timestamp
self.variables = {}
self.index = {}
"""
self.version: Optional[str] = version
self.timestamp: Optional[str] = timestamp
self.variables: dict[int, list[Optional[str]]] = {}
self.index: dict[tuple[str, Optional[str], str], int] = {}

def build_index(self):
"""builds a reverse index for finding ids.
"""
for id, value in self.variables.items():
reporting_frequency, key, variable, unit = value
self.index[reporting_frequency, key, variable] = id
"""builds a reverse index for finding ids."""
for var_id, (frequency, key, name, unit) in self.variables.items():
self.index[(frequency, key, name)] = var_id

def find_variable(self, search):
def find_variable(self, search: str) -> list[tuple[str, Optional[str], str]]:
"""returns the coordinates (timestep, key, variable_name) in the
data dictionary that can be used to find an index. The search is case
insensitive."""
return [(timestep, key, variable_name)
for timestep, key, variable_name in self.index.keys()
if search.lower() in variable_name.lower()]
return [
(freq, key, name)
for (freq, key, name) in self.index.keys()
if search.lower() in name.lower()
]

def __repr__(self) -> str:
return f"<DataDictionary version={self.version} timestamp={self.timestamp} variables={len(self.variables)}>"


class EsoFile(object):
class EsoFile:

def __init__(self, eso_file):
self.eso_file = eso_file
self.dd = self._read_data_dictionary()
self.dd.build_index()
self.data = self._read_data()
self.data: dict[int, list[float]] = self._read_data()

def find_variable(self, search, key=None, frequency='TimeStep'):
def find_variable(
self, search: str, key: Optional[str] = None, frequency: str = "TimeStep"
) -> list[tuple[str, Optional[str], str]]:
"""returns the coordinates (timestep, key, variable_name) in the
data dictionary that can be used to find an index. The search is case
insensitive and need only be specified partially."""
variables = self.dd.find_variable(search)
variables = [v for v in variables
if v[0].lower() == frequency.lower()]
variables = [v for v in variables if v[0].lower() == frequency.lower()]
if key:
variables = [v for v in variables
if v[1].lower() == key.lower()]
variables = [v for v in variables if v[1] and v[1].lower() == key.lower()]
return variables

def to_frame(self, search, key=None, frequency='TimeStep', index=None, use_key_for_columns=True):
def to_frame(
self,
search: str,
key: Optional[str] = None,
frequency: str = "TimeStep",
index: Optional[list] = None,
use_key_for_columns: bool = True,
) -> pd.DataFrame:
"""
creates a pandas DataFrame objects with a column for every variable
that matches the search pattern and key. An None key matches all keys.
NOTE: The frequency *has* to be the same for all variables selected.
(uses find_variable to select the variables)
"""
from pandas import DataFrame
variables = self.find_variable(search, key=key, frequency=frequency)
if use_key_for_columns:
data = {v[1]: self.data[self.dd.index[v]] for v in variables}
else:
# use variable name as column name
data = {v[2]: self.data[self.dd.index[v]] for v in variables}
df = DataFrame(data)
df = pd.DataFrame(data)
if index is not None:
df.index = index
return df

def _read_reporting_frequency(self, line):
reporting_frequency = None
if '! ' in line:
line = line.split('! ')[0]
if ' !' in line:
line, reporting_frequency = line.split(' !')
# RunPeriod contains more stuff (" [Value,Min,Month,Day,Hour,
# Minute, Max,Month,Day,Hour,Minute]")split it off
reporting_frequency = reporting_frequency.split()[0]
return line, reporting_frequency

def _read_variable_unit(self, variable):
unit = None
if '[' in variable:
variable, unit = variable.split('[')
unit = unit[:-1] # remove ']' at the end
variable = variable.strip()
return variable, unit

def _read_data_dictionary(self):
def to_series(
self, search: str, key: Optional[str] = None, frequency: str = "TimeStep"
) -> pd.Series:
"""
Returns a pandas Series for the first variable matching the search.
"""
variables = self.find_variable(search, key=key, frequency=frequency)
if not variables:
raise ValueError(f"No variable found for search: {search}")
var = variables[0]
return pd.Series(self.data[self.dd.index[var]])

def _read_reporting_frequency(self, line: str) -> tuple[str, Optional[str]]:
if "! " in line:
line = line.split("! ")[0]
if " !" in line:
line, freq = line.split(" !")
freq = freq.split()[0]
return line, freq
return line, None

def _read_variable_unit(self, variable: str) -> tuple[str, Optional[str]]:
if "[" in variable:
variable, unit = variable.split("[")
return variable.strip(), unit.rstrip("]")
return variable, None

def _read_data_dictionary(self) -> DataDictionary:
"""parses the head of the eso_file, returning the data dictionary.
the file object eso_file is advanced to the position needed by
read_data.
"""
version, timestamp = [s.strip() for s
in self.eso_file.readline().split(',')[-2:]]
version, timestamp = [
s.strip() for s in self.eso_file.readline().split(",")[-2:]
]
dd = DataDictionary(version, timestamp)
line = self.eso_file.readline().strip()
while line != 'End of Data Dictionary':
line, reporting_frequency = self._read_reporting_frequency(line)
if reporting_frequency:
fields = [f.strip() for f in line.split(',')]
if len(fields) >= 4:
id, nfields, key, variable = fields[:4]
else:
id, nfields, variable = fields[:3]
key = None
variable, unit = self._read_variable_unit(variable)
dd.variables[int(id)] = [reporting_frequency, key,
variable, unit]
else:
# ignore the lines that aren't report variables
pass
while line != "End of Data Dictionary":
line, freq = self._read_reporting_frequency(line)
if freq:
fields = [f.strip() for f in line.split(",")]
try:
if len(fields) >= 4:
var_id, _, key, name = fields[:4]
else:
var_id, _, name = fields[:3]
key = None
name, unit = self._read_variable_unit(name)
dd.variables[int(var_id)] = [freq, key, name, unit]
except ValueError:
pass # skip malformed lines
line = self.eso_file.readline().strip()
dd.ids = set(dd.variables.keys())
return dd

def _read_data(self):
'''parse the data from the .eso file returning,
def _read_data(self) -> dict[int, list[float]]:
"""parse the data from the .eso file returning,
NOTE: eso_file should be the same file object that was passed to
read_data_dictionary(eso_file) to obtain dd.'''
data = {} # id => [value]
for id in self.dd.variables.keys():
data[id] = []
read_data_dictionary(eso_file) to obtain dd."""
data = {var_id: [] for var_id in self.dd.variables}
for line in self.eso_file:
if line.startswith('End of Data'):
if line.startswith("End of Data"):
break
fields = [f.strip() for f in line.split(',')]
id = int(fields[0])
if id not in self.dd.ids:
# skip entries that are not output:variables
fields = [f.strip() for f in line.split(",")]
try:
var_id = int(fields[0])
if var_id in self.dd.variables:
data[var_id].append(float(fields[1]))
except (ValueError, IndexError):
continue
data[id].append(float(fields[1]))
return data

def __repr__(self) -> str:
return f"<EsoFile version={self.dd.version} variables={len(self.dd.variables)}>"


def read_from_path(eso_file_path: str) -> EsoFile:
"""
read in a .eso file and return an EsoFile object that can be used
to read in pandas DataFrame and Series objects.
"""
with open(eso_file_path, "r") as eso_file:
eso = EsoFile(eso_file)
return eso


def read(eso_file_path: str) -> tuple[DataDictionary, dict[int, list[float]]]:
"""Read in an .eso file and return the data dictionary and a dictionary
representing the data.
NOTE: this function is here for backward compatibilty reasons. Use
read_from_path() instead to obtain an EsoFile object.
"""
eso = read_from_path(eso_file_path)
return eso.dd, eso.data
Empty file added tests/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions tests/mock_eplusout.eso
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Version,Timestamp,1.0,2025-09-26
1,4,Zone1,Zone Ventilation Total Heat Loss Energy [J] !TimeStep
End of Data Dictionary
1,100.0
1,200.0
End of Data
Loading