diff --git a/.gitignore b/.gitignore index e367926..4e271e4 100644 --- a/.gitignore +++ b/.gitignore @@ -4,8 +4,7 @@ __pycache__/ src/config.bak.yaml src/config.yaml -src/json/optimize_request.json -src/json/optimize_response.json +src/json/*.json src/interfaces/__pycache__ src/interfaces/config/battery_config.json src/interfaces/config/timeofuse_config.json diff --git a/README.md b/README.md index 7113a47..d97c9d6 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # EOS Connect **EOS Connect** is an open-source tool for intelligent energy management and optimization. -It connects to various smart home platforms (like Home Assistant and OpenHAB) to monitor, forecast, and control your energy flows. -EOS Connect fetches real-time and forecast data (PV, load, prices), processes it via the [EOS (Energy Optimization System)](https://github.com/Akkudoktor-EOS/EOS), and automatically controls devices (such as Fronius inverters or batteries supported by [evcc](https://docs.evcc.io/docs/devices/meters)) to optimize your energy usage and costs. +It supports two optimization backends: the full-featured Akkudoktor EOS (default) and the lightweight EVCC Opt (optional, very fast). +EOS Connect fetches real-time and forecast data, processes it via your chosen optimizer, and controls devices to optimize your energy usage and costs. **Key Features:** - **Automated Energy Optimization:** @@ -159,8 +159,10 @@ Get up and running with EOS Connect in just a few steps! ### 2. Install via Home Assistant Add-on - Add the [ohAnd/ha_addons](https://github.com/ohAnd/ha_addons) repository to your Home Assistant add-on store. -- [if needed] Add the [Duetting/ha_eos_addon](https://github.com/Duetting/ha_eos_addon) (or [thecem/ha_eos_addon](https://github.com/thecem/ha_eos_addon)) repository to your Home Assistant add-on store. -- Install both the **EOS Add-on** and the **EOS Connect Add-on**. +- select your preferred optimization backend: + - [if needed] Add the [Duetting/ha_eos_addon](https://github.com/Duetting/ha_eos_addon) (or [thecem/ha_eos_addon](https://github.com/thecem/ha_eos_addon)) repository to your Home Assistant add-on store. + - [if needed] Add [thecem/hassio-evopt](https://github.com/thecem/hassio-evopt) repository to your Home Assistant add-on store. ([found here](https://github.com/evcc-io/evcc/discussions/23213#3-optimizer-im-home-assistant-ha-addon-nutzen)) +- Install both the **EOS Add-on** (or **evcc opt**) and the **EOS Connect Add-on**. - Configure both add-ons via the Home Assistant UI. - Start both add-ons. The EOS Connect web dashboard will be available at [http://homeassistant.local:8081](http://homeassistant.local:8081) (or your HA IP). diff --git a/src/CONFIG_README.md b/src/CONFIG_README.md index 087aa8c..eca9e7c 100644 --- a/src/CONFIG_README.md +++ b/src/CONFIG_README.md @@ -77,11 +77,14 @@ A default config file will be created with the first start, if there is no `conf ### EOS Server Configuration +- **`eos.source`**: + EOS server source - eos_server, evcc_opt, default (default uses eos_server) + - **`eos.server`**: - EOS server address (e.g., `192.168.1.94`). (Mandatory) + EOS or evcc opt server address (e.g., `192.168.1.94`). (Mandatory) - **`eos.port`**: - Port for the EOS server. Default: `8503`. (Mandatory) + port for EOS server (8503) or evcc opt server (7050) - default: `8503` (Mandatory) - **`timeout`**: Timeout for EOS optimization requests, in seconds. Default: `180`. (Mandatory) @@ -389,6 +392,7 @@ load: additional_load_1_consumption: 1500 # consumption for additional load 1 in Wh - default: 0 (If not needed set to `additional_load_1_sensor: ""`) # EOS server configuration eos: + source: eos_server # EOS server source - eos_server, evcc_opt, default (default uses eos_server) server: 192.168.1.94 # EOS server address port: 8503 # port for EOS server - default: 8503 timeout: 180 # timeout for EOS optimize request in seconds - default: 180 @@ -470,6 +474,7 @@ load: car_charge_load_sensor: Wallbox_Power # item / entity for wallbox power data in watts. (If not needed, set to `load.car_charge_load_sensor: ""`) # EOS server configuration eos: + source: eos_server # EOS server source - eos_server, evcc_opt, default (default uses eos_server) server: 192.168.1.94 # EOS server address port: 8503 # port for EOS server - default: 8503 timeout: 180 # timeout for EOS optimize request in seconds - default: 180 diff --git a/src/config.py b/src/config.py index 245fe66..4007d46 100644 --- a/src/config.py +++ b/src/config.py @@ -55,8 +55,9 @@ def create_default_config(self): ), "eos": CommentedMap( { - "server": "192.168.100.100", # Default EOS server address - "port": 8503, # Default port for EOS server + "source": "default", # EOS server source - eos_server, evcc_opt, default + "server": "192.168.100.100", # EOS or evcc opt server address + "port": 8503, # port for EOS server (8503) or evcc opt server (7050) - default: 8503 "timeout": 180, # Default timeout for EOS optimize request } ), @@ -194,9 +195,14 @@ def create_default_config(self): config.yaml_set_comment_before_after_key( "eos", before="EOS server configuration" ) - config["eos"].yaml_add_eol_comment("EOS server address", "server") config["eos"].yaml_add_eol_comment( - "port for EOS server - default: 8503", "port" + "EOS server source - eos_server, evcc_opt, default (default uses eos_server)", + "source", + ) + config["eos"].yaml_add_eol_comment("EOS or evcc opt server address", "server") + config["eos"].yaml_add_eol_comment( + "port for EOS server (8503) or evcc opt server (7050) - default: 8503", + "port", ) config["eos"].yaml_add_eol_comment( "timeout for EOS optimize request in seconds - default: 180", "timeout" diff --git a/src/eos_connect.py b/src/eos_connect.py index 3ee6fcf..082f7f6 100644 --- a/src/eos_connect.py +++ b/src/eos_connect.py @@ -22,7 +22,7 @@ from interfaces.inverter_fronius import FroniusWR from interfaces.inverter_fronius_v2 import FroniusWRV2 from interfaces.evcc_interface import EvccInterface -from interfaces.eos_interface import EosInterface +from interfaces.optimization_interface import OptimizationInterface from interfaces.price_interface import PriceInterface from interfaces.mqtt_interface import MqttInterface from interfaces.pv_interface import PvInterface @@ -106,11 +106,11 @@ def formatTime(self, record, datefmt=None): LOGLEVEL, ) # initialize eos interface -eos_interface = EosInterface( - eos_server=config_manager.config["eos"]["server"], - eos_port=config_manager.config["eos"]["port"], +eos_interface = OptimizationInterface( + config=config_manager.config["eos"], timezone=time_zone, ) + # initialize base control base_control = BaseControl(config_manager.config, time_zone) # initialize the inverter interface @@ -747,7 +747,7 @@ def __run_optimization_loop(self): mqtt_interface.update_publish_topics( {"optimization/state": {"value": self.get_current_state()["request_state"]}} ) - optimized_response, avg_runtime = eos_interface.eos_set_optimize_request( + optimized_response, avg_runtime = eos_interface.optimize( json_optimize_input, config_manager.config["eos"]["timeout"] ) # Store the runtime for use in sleep calculation @@ -1364,6 +1364,9 @@ def get_controls(): "currency_minor_unit": currency_minor_unit, }, "state": optimization_scheduler.get_current_state(), + "used_optimization_source": config_manager.config.get("eos", {}).get( + "source", "eos_server" + ), "eos_connect_version": __version__, "timestamp": datetime.now(time_zone).isoformat(), "api_version": "0.0.2", diff --git a/src/interfaces/eos_interface.py b/src/interfaces/eos_interface.py deleted file mode 100644 index 9314c02..0000000 --- a/src/interfaces/eos_interface.py +++ /dev/null @@ -1,596 +0,0 @@ -""" -This module provides an interface for interacting with an EOS server. -The `EosInterface` class includes methods for setting configuration values, -sending measurement data, sending optimization requests, saving configurations -to a file, and updating configurations from a file. It uses HTTP requests to -communicate with the EOS server. -Classes: - EosInterface: A class that provides methods to interact with the EOS server. -Dependencies: - - logging: For logging messages. - - time: For measuring elapsed time. - - json: For handling JSON data. - - datetime: For working with date and time. - - requests: For making HTTP requests. -Usage: - Create an instance of the `EosInterface` class by providing the EOS server - address, port, and timezone. Use the provided methods to interact with the - EOS server for various operations such as setting configuration values, - sending measurement data, and managing configurations. -""" - -import logging -import time -import json -from datetime import datetime, timedelta -import requests -import pandas as pd -import numpy as np - -logger = logging.getLogger("__main__") -logger.info("[EOS] loading module ") - - -# EOS_API_PUT_LOAD_SERIES = { -# f"http://{EOS_SERVER}:{EOS_SERVER_PORT}/v1/measurement/load-mr/series/by-name" # -# } # ?name=Household - -# EOS_API_GET_CONFIG_VALUES = {f"http://{EOS_SERVER}:{EOS_SERVER_PORT}/v1/config"} - -# EOS_API_PUT_LOAD_PROFILE = { -# f"http://{EOS_SERVER}:{EOS_SERVER_PORT}/v1/measurement/load-mr/value/by-name" -# } - - -class EosInterface: - """ - EosInterface is a class that provides an interface for interacting with an EOS server. - This class includes methods for setting configuration values, sending measurement data, - sending optimization requests, saving configurations to a file, and updating configurations - from a file. It uses HTTP requests to communicate with the EOS server. - Attributes: - eos_server (str): The hostname or IP address of the EOS server. - eos_port (int): The port number of the EOS server. - base_url (str): The base URL constructed from the server and port. - time_zone (timezone): The timezone used for time-related operations. - Methods: - set_config_value(key, value): - send_measurement_to_eos(dataframe): - Send measurement data to the EOS server. - eos_set_optimize_request(payload, timeout=180): - Send an optimization request to the EOS server. - eos_save_config_to_config_file(): - eos_update_config_from_config_file(): - """ - - def __init__(self, eos_server, eos_port, timezone): - self.eos_server = eos_server - self.eos_port = eos_port - self.base_url = f"http://{eos_server}:{eos_port}" - self.time_zone = timezone - self.last_start_solution = None - self.home_appliance_released = False - self.home_appliance_start_hour = None - self.eos_version = ( - ">=2025-04-09" # use as default value in case version check fails - ) - self.eos_version = self.__retrieve_eos_version() - - self.last_control_data = [ - { - "ac_charge_demand": 0, - "dc_charge_demand": 0, - "discharge_allowed": False, - "error": 0, - "hour": -1, - }, - { - "ac_charge_demand": 0, - "dc_charge_demand": 0, - "discharge_allowed": False, - "error": 0, - "hour": -1, - }, - ] - - self.last_optimization_runtimes = [0] * 5 # list to store last 5 runtimes - self.last_optimization_runtime_number = 0 # index for circular list - self.is_first_run = True # Add flag to track first run - - # EOS basic API helper - def set_config_value(self, key, value): - """ - Set a configuration value on the EOS server. - """ - if isinstance(value, list): - value = json.dumps(value) - params = {"key": key, "value": value} - response = requests.put( - self.base_url + "/v1/config/value", params=params, timeout=10 - ) - response.raise_for_status() - logger.info( - "[EOS] Config value set successfully. Key: {key} \t\t => Value: {value}" - ) - - def send_measurement_to_eos(self, dataframe): - """ - Send the measurement data to the EOS server. - """ - params = { - "data": dataframe.to_json(orient="index"), - "dtype": "float64", - "tz": "UTC", - } - response = requests.put( - self.base_url - + "/v1/measurement/load-mr/series/by-name" - + "?name=Household", - params=params, - timeout=10, - ) - response.raise_for_status() - if response.status_code == 200: - logger.debug("[EOS] Measurement data sent to EOS server successfully.") - else: - logger.debug( - "[EOS]" - "Failed to send data to EOS server. Status code: {response.status_code}" - ", Response: {response.text}" - ) - - def eos_set_optimize_request(self, payload, timeout=180): - """ - Send the optimize request to the EOS server. - """ - headers = {"accept": "application/json", "Content-Type": "application/json"} - request_url = ( - self.base_url - + "/optimize" - + "?start_hour=" - + str(datetime.now(self.time_zone).hour) - ) - logger.info( - "[EOS] OPTIMIZE request optimization with: %s - and with timeout: %s", - request_url, - timeout, - ) - response = None # Initialize response variable - try: - start_time = time.time() - response = requests.post( - request_url, headers=headers, json=payload, timeout=timeout - ) - end_time = time.time() - elapsed_time = end_time - start_time - minutes, seconds = divmod(elapsed_time, 60) - logger.info( - "[EOS] OPTIMIZE response retrieved successfully in %d min %.2f sec for current run", - int(minutes), - seconds, - ) - response.raise_for_status() - # Check if the array is still filled with zeros - if all(runtime == 0 for runtime in self.last_optimization_runtimes): - # Fill all entries with the first real value - self.last_optimization_runtimes = [elapsed_time] * 5 - else: - # Store the runtime in the circular list only if successful - self.last_optimization_runtimes[ - self.last_optimization_runtime_number - ] = elapsed_time - self.last_optimization_runtime_number = ( - self.last_optimization_runtime_number + 1 - ) % 5 - # logger.debug( - # "[EOS] OPTIMIZE Last 5 runtimes in seconds: %s", - # self.last_optimization_runtimes, - # ) - avg_runtime = sum(self.last_optimization_runtimes) / 5 - return response.json(), avg_runtime - except requests.exceptions.Timeout: - logger.error("[EOS] OPTIMIZE Request timed out after %s seconds", timeout) - return {"error": "Request timed out - trying again with next run"} - except requests.exceptions.ConnectionError as e: - logger.error( - "[EOS] OPTIMIZE Connection error - EOS server not reachable at %s " - + "will try again with next cycle - error: %s", - request_url, - str(e), - ) - return { - "error": f"EOS server not reachable at {self.base_url} " - + "will try again with next cycle" - } - except requests.exceptions.RequestException as e: - logger.error("[EOS] OPTIMIZE Request failed: %s", e) - if response is not None: - logger.error("[EOS] OPTIMIZE Response status: %s", response.status_code) - logger.debug( - "[EOS] OPTIMIZE ERROR - response of EOS is:" - + "\n---RESPONSE-------------------------------------------------\n %s" - + "\n------------------------------------------------------------", - response.text, - ) - logger.debug( - "[EOS] OPTIMIZE ERROR - payload for the request was:" - + "\n---REQUEST--------------------------------------------------\n %s" - + "\n------------------------------------------------------------", - payload, - ) - return {"error": str(e)} - - def examine_response_to_control_data(self, optimized_response_in): - """ - Examines the optimized response data for control parameters such as AC charge demand, - DC charge demand, and discharge allowance for the current hour. - Args: - optimized_response_in (dict): A dictionary containing control data with keys - "ac_charge", "dc_charge", and "discharge_allowed". - Each key maps to a list or dictionary where the - current hour's data can be accessed. - Returns: - tuple: A tuple containing: - - ac_charge_demand_relative (float or None): The AC charge demand percentage - for the current hour, or None if not present. - - dc_charge_demand_relative (float or None): The DC charge demand percentage - for the current hour, or None if not present. - - discharge_allowed (bool or None): Whether discharge is allowed for the - current hour, or None if not present. - Logs: - - Debug logs for AC charge demand, DC charge demand, and discharge allowance - values for the current hour if they are present in the input. - - An error log if no control data is found in the optimized response. - """ - current_hour = datetime.now(self.time_zone).hour - ac_charge_demand_relative = None - dc_charge_demand_relative = None - discharge_allowed = None - response_error = False - # ecar_response = None - if "ac_charge" in optimized_response_in: - ac_charge_demand_relative = optimized_response_in["ac_charge"] - self.last_control_data[0]["ac_charge_demand"] = ac_charge_demand_relative[ - current_hour - ] - self.last_control_data[1]["ac_charge_demand"] = ac_charge_demand_relative[ - current_hour + 1 if current_hour < 23 else 0 - ] - # getting entry for current hour - ac_charge_demand_relative = ac_charge_demand_relative[current_hour] - logger.debug( - "[EOS] RESPONSE AC charge demand for current hour %s:00 -> %s %%", - current_hour, - ac_charge_demand_relative * 100, - ) - if "dc_charge" in optimized_response_in: - dc_charge_demand_relative = optimized_response_in["dc_charge"] - self.last_control_data[0]["dc_charge_demand"] = dc_charge_demand_relative[ - current_hour - ] - self.last_control_data[1]["dc_charge_demand"] = dc_charge_demand_relative[ - current_hour + 1 if current_hour < 23 else 0 - ] - - # getting entry for current hour - dc_charge_demand_relative = dc_charge_demand_relative[current_hour] - logger.debug( - "[EOS] RESPONSE DC charge demand for current hour %s:00 -> %s %%", - current_hour, - dc_charge_demand_relative * 100, - ) - if "discharge_allowed" in optimized_response_in: - discharge_allowed = optimized_response_in["discharge_allowed"] - self.last_control_data[0]["discharge_allowed"] = discharge_allowed[ - current_hour - ] - self.last_control_data[1]["discharge_allowed"] = discharge_allowed[ - current_hour + 1 if current_hour < 23 else 0 - ] - # getting entry for current hour - discharge_allowed = bool(discharge_allowed[current_hour]) - logger.debug( - "[EOS] RESPONSE Discharge allowed for current hour %s:00 %s", - current_hour, - discharge_allowed, - ) - # if "eauto_obj" in optimized_response_in: - # eauto_obj = optimized_response_in["eauto_obj"] - - if ( - "start_solution" in optimized_response_in - and len(optimized_response_in["start_solution"]) > 1 - ): - self.set_last_start_solution(optimized_response_in["start_solution"]) - logger.debug( - "[EOS] RESPONSE Start solution for current hour %s:00 %s", - current_hour, - self.get_last_start_solution(), - ) - else: - logger.error("[EOS] RESPONSE No control data in optimized response") - response_error = True - - self.last_control_data[0]["error"] = int(response_error) - self.last_control_data[1]["error"] = int(response_error) - self.last_control_data[0]["hour"] = current_hour - self.last_control_data[1]["hour"] = current_hour + 1 if current_hour < 23 else 0 - - if "washingstart" in optimized_response_in: - self.home_appliance_start_hour = optimized_response_in["washingstart"] - if self.home_appliance_start_hour == current_hour: - self.home_appliance_released = True - else: - self.home_appliance_released = False - logger.debug( - "[EOS] RESPONSE Home appliance - current hour %s:00" - + " - start hour %s - is Released: %s", - current_hour, - self.home_appliance_start_hour, - self.home_appliance_released, - ) - - return ( - ac_charge_demand_relative, - dc_charge_demand_relative, - discharge_allowed, - response_error, - ) - - def eos_save_config_to_config_file(self): - """ - Save the current configuration to the configuration file on the EOS server. - """ - response = requests.put(self.base_url + "/v1/config/file", timeout=10) - response.raise_for_status() - logger.debug("[EOS] CONFIG saved to config file successfully.") - - def eos_update_config_from_config_file(self): - """ - Update the current configuration from the configuration file on the EOS server. - """ - try: - response = requests.post(self.base_url + "/v1/config/update", timeout=10) - response.raise_for_status() - logger.info("[EOS] CONFIG Config updated from config file successfully.") - except requests.exceptions.Timeout: - logger.error( - "[EOS] CONFIG Request timed out while updating config from config file." - ) - except requests.exceptions.RequestException as e: - logger.error( - "[EOS] CONFIG Request failed while updating config from config file: %s", - e, - ) - - def get_last_control_data(self): - """ - Get the last control data for the EOS interface. - - Returns: - list: The last control data. - """ - return self.last_control_data - - def set_last_start_solution(self, last_start_solution): - """ - Set the last start solution for the EOS interface. - - Args: - last_start_solution (str): The last start solution to set. - """ - self.last_start_solution = last_start_solution - - def get_last_start_solution(self): - """ - Get the last start solution for the EOS interface. - - Returns: - str: The last start solution. - """ - return self.last_start_solution - - def get_home_appliance_released(self): - """ - Get the home appliance released status. - - Returns: - bool: True if the home appliance is released, False otherwise. - """ - return self.home_appliance_released - - def get_home_appliance_start_hour(self): - """ - Get the home appliance start hour. - - Returns: - int: The hour when the home appliance starts. - """ - return self.home_appliance_start_hour - - # function that creates a pandas dataframe with a DateTimeIndex with the given average profile - def create_dataframe(self, profile): - """ - Creates a pandas DataFrame with hourly energy values for a given profile. - - Args: - profile (list of tuples): A list of tuples where each tuple contains: - - month (int): The month (1-12). - - weekday (int): The day of the week (0=Monday, 6=Sunday). - - hour (int): The hour of the day (0-23). - - energy (float): The energy value to set. - - Returns: - pandas.DataFrame: A DataFrame with a DateTime index for the year 2025 and a 'Household' - column containing the energy values from the profile. - """ - - # create a list of all dates in the year - dates = pd.date_range(start="1/1/2025", end="31/12/2025", freq="H") - # create an empty dataframe with the dates as index - df = pd.DataFrame(index=dates) - # add a column 'Household' to the dataframe with NaN values - df["Household"] = np.nan - # iterate over the profile and set the energy values in the dataframe - for entry in profile: - month = entry[0] - weekday = entry[1] - hour = entry[2] - energy = entry[3] - # get the dates that match the month, weekday and hour - dates = df[ - (df.index.month == month) - & (df.index.weekday == weekday) - & (df.index.hour == hour) - ].index - # set the energy value for the dates - for date in dates: - df.loc[date, "Household"] = energy - return df - - def __retrieve_eos_version(self): - """ - Get the EOS version from the server. Dirty hack to get something to distinguish between - different versions of the EOS server. - - Returns: - str: The EOS version. - """ - try: - response = requests.get(self.base_url + "/v1/health", timeout=10) - response.raise_for_status() - eos_version = response.json().get("status") - if eos_version == "alive": - eos_version = ">=2025-04-09" - logger.info("[EOS] Getting EOS version: %s", eos_version) - return eos_version - except requests.exceptions.HTTPError as e: - if e.response.status_code == 404: - # if not found, assume version < 2025-04-09 - eos_version = "<2025-04-09" - logger.info("[EOS] Getting EOS version: %s", eos_version) - return eos_version - else: - logger.error( - "[EOS] HTTP error occurred while getting EOS version" - + " - use preset version: %s : %s - Response: %s", - self.eos_version, - e, - e.response.text if e.response else "No response", - ) - return self.eos_version # return preset version if error occurs - except requests.exceptions.ConnectTimeout: - logger.error( - "[EOS] Failed to get EOS version - use preset version: '%s'" - + " - Server not reachable: Connection to %s timed out", - self.eos_version, - self.base_url, - ) - return self.eos_version # return preset version if error occurs - except requests.exceptions.ConnectionError as e: - logger.error( - "[EOS] Failed to get EOS version - use preset version: '%s' - Connection error: %s", - self.eos_version, - e, - ) - return self.eos_version # return preset version if error occurs - except requests.exceptions.RequestException as e: - logger.error( - "[EOS] Failed to get EOS version - use preset version: '%s' - Error: %s ", - self.eos_version, - e, - ) - return self.eos_version # return preset version if error occurs - except json.JSONDecodeError as e: - logger.error( - "[EOS] Failed to decode EOS version - use preset version: '%s' - response: %s ", - self.eos_version, - e, - ) - return self.eos_version # return preset version if error occurs - - def get_eos_version(self): - """ - Get the EOS version from the server. - - Returns: - str: The EOS version. - """ - return self.eos_version - - def calculate_next_run_time(self, current_time, avg_runtime, update_interval): - """ - Calculate the next run time prioritizing quarter-hour alignment with improved gap filling. - """ - # Calculate minimum time between runs - min_gap_seconds = max((update_interval + avg_runtime) * 0.7, 30) - - # Find next quarter-hour from current time - next_quarter = current_time.replace(second=0, microsecond=0) - current_minute = next_quarter.minute - - minutes_past_quarter = current_minute % 15 - if minutes_past_quarter == 0 and current_time.second > 0: - minutes_to_add = 15 - elif minutes_past_quarter == 0: - minutes_to_add = 15 - else: - minutes_to_add = 15 - minutes_past_quarter - - next_quarter += timedelta(minutes=minutes_to_add) - - quarter_aligned_start = next_quarter - timedelta(seconds=avg_runtime) - - # **BUG FIX**: Check if quarter_aligned_start is in the past - if quarter_aligned_start <= current_time: - # Move to the next quarter-hour - next_quarter += timedelta(minutes=15) - quarter_aligned_start = next_quarter - timedelta(seconds=avg_runtime) - logger.debug( - "[OPTIMIZATION] Quarter start was in past, moved to next: %s", - next_quarter.strftime("%H:%M:%S"), - ) - - time_until_quarter_start = ( - quarter_aligned_start - current_time - ).total_seconds() - - # Debug logging - logger.debug( - "[OPTIMIZATION] Debug: current=%s, next_quarter=%s, quarter_start=%s, time_until=%.1fs", - current_time.strftime("%H:%M:%S"), - next_quarter.strftime("%H:%M:%S"), - quarter_aligned_start.strftime("%H:%M:%S"), - time_until_quarter_start, - ) - - # More aggressive gap-filling: if we have at least 2x the update interval, - # try a gap-fill run - if ( - time_until_quarter_start >= (2 * update_interval) - and time_until_quarter_start >= min_gap_seconds - ): - normal_next_start = current_time + timedelta(seconds=update_interval) - logger.info( - "[OPTIMIZATION] Gap-fill run: start %s (quarter-aligned run follows at %s)", - normal_next_start.strftime("%H:%M:%S"), - next_quarter.strftime("%H:%M:%S"), - ) - return normal_next_start - - # Otherwise, use quarter-aligned timing - absolute_min_seconds = max(avg_runtime * 0.5, 30) - if time_until_quarter_start < absolute_min_seconds: - next_quarter += timedelta(minutes=15) - quarter_aligned_start = next_quarter - timedelta(seconds=avg_runtime) - logger.debug( - "[OPTIMIZATION] Quarter too close, moved to next: %s", - next_quarter.strftime("%H:%M:%S"), - ) - - logger.info( - "[OPTIMIZATION] Quarter-hour aligned run: start %s, finish at %s", - quarter_aligned_start.strftime("%H:%M:%S"), - next_quarter.strftime("%H:%M:%S"), - ) - return quarter_aligned_start diff --git a/src/interfaces/optimization_backends/optimization_backend_eos.py b/src/interfaces/optimization_backends/optimization_backend_eos.py new file mode 100644 index 0000000..a8791dd --- /dev/null +++ b/src/interfaces/optimization_backends/optimization_backend_eos.py @@ -0,0 +1,275 @@ +""" +This module provides the EOSBackend class for interacting with the EOS optimization server. +It includes methods for sending optimization requests, managing configuration, and handling +measurement data. +""" + +import logging +import time +import json +from datetime import datetime +import requests +import pandas as pd +import numpy as np + +logger = logging.getLogger("__main__") + + +class EOSBackend: + """ + Backend for direct EOS server optimization. + Accepts and returns EOS-format requests/responses. + """ + + def __init__(self, base_url, time_zone): + self.base_url = base_url + self.time_zone = time_zone + self.last_optimization_runtimes = [0] * 5 + self.last_optimization_runtime_number = 0 + self.eos_version = ">=2025-04-09" # default + self.eos_version = self._retrieve_eos_version() + + def optimize(self, eos_request, timeout=180): + """ + Send the optimize request to the EOS server. + Returns (response_json, avg_runtime) + """ + headers = {"accept": "application/json", "Content-Type": "application/json"} + request_url = ( + self.base_url + + "/optimize" + + "?start_hour=" + + str(datetime.now(self.time_zone).hour) + ) + logger.info( + "[EOS] OPTIMIZE request optimization with: %s - and with timeout: %s", + request_url, + timeout, + ) + response = None + try: + start_time = time.time() + response = requests.post( + request_url, headers=headers, json=eos_request, timeout=timeout + ) + end_time = time.time() + elapsed_time = end_time - start_time + minutes, seconds = divmod(elapsed_time, 60) + logger.info( + "[EOS] OPTIMIZE response retrieved successfully in %d min %.2f sec for current run", + int(minutes), + seconds, + ) + response.raise_for_status() + # Store runtime in circular list + if all(runtime == 0 for runtime in self.last_optimization_runtimes): + self.last_optimization_runtimes = [elapsed_time] * 5 + else: + self.last_optimization_runtimes[ + self.last_optimization_runtime_number + ] = elapsed_time + self.last_optimization_runtime_number = ( + self.last_optimization_runtime_number + 1 + ) % 5 + avg_runtime = sum(self.last_optimization_runtimes) / 5 + return response.json(), avg_runtime + except requests.exceptions.Timeout: + logger.error("[EOS] OPTIMIZE Request timed out after %s seconds", timeout) + return {"error": "Request timed out - trying again with next run"}, None + except requests.exceptions.ConnectionError as e: + logger.error( + "[EOS] OPTIMIZE Connection error - EOS server not reachable at %s " + "will try again with next cycle - error: %s", + request_url, + str(e), + ) + return { + "error": f"EOS server not reachable at {self.base_url} " + "will try again with next cycle" + }, None + except requests.exceptions.RequestException as e: + logger.error("[EOS] OPTIMIZE Request failed: %s", e) + if response is not None: + logger.error("[EOS] OPTIMIZE Response status: %s", response.status_code) + logger.debug( + "[EOS] OPTIMIZE ERROR - response of EOS is:\n%s", + response.text, + ) + logger.debug( + "[EOS] OPTIMIZE ERROR - payload for the request was:\n%s", + eos_request, + ) + return {"error": str(e)}, None + + def set_config_value(self, key, value): + """ + Set a configuration value on the EOS server. + """ + if isinstance(value, list): + value = json.dumps(value) + params = {"key": key, "value": value} + response = requests.put( + self.base_url + "/v1/config/value", params=params, timeout=10 + ) + response.raise_for_status() + logger.info( + "[EOS] Config value set successfully. Key: %s => Value: %s", key, value + ) + + def send_measurement_to_eos(self, dataframe): + """ + Send the measurement data to the EOS server. + """ + params = { + "data": dataframe.to_json(orient="index"), + "dtype": "float64", + "tz": "UTC", + } + response = requests.put( + self.base_url + + "/v1/measurement/load-mr/series/by-name" + + "?name=Household", + params=params, + timeout=10, + ) + response.raise_for_status() + if response.status_code == 200: + logger.debug("[EOS] Measurement data sent to EOS server successfully.") + else: + logger.debug( + "[EOS] Failed to send data to EOS server. Status code: %s, Response: %s", + response.status_code, + response.text, + ) + + def save_config_to_config_file(self): + """ + Save the current configuration to the configuration file on the EOS server. + """ + response = requests.put(self.base_url + "/v1/config/file", timeout=10) + response.raise_for_status() + logger.debug("[EOS] CONFIG saved to config file successfully.") + + def update_config_from_config_file(self): + """ + Update the current configuration from the configuration file on the EOS server. + """ + try: + response = requests.post(self.base_url + "/v1/config/update", timeout=10) + response.raise_for_status() + logger.info("[EOS] CONFIG updated from config file successfully.") + except requests.exceptions.Timeout: + logger.error( + "[EOS] CONFIG Request timed out while updating config from config file." + ) + except requests.exceptions.RequestException as e: + logger.error( + "[EOS] CONFIG Request failed while updating config from config file: %s", + e, + ) + + def _retrieve_eos_version(self): + """ + Get the EOS version from the server. + Returns: str + """ + try: + response = requests.get(self.base_url + "/v1/health", timeout=10) + response.raise_for_status() + eos_version = response.json().get("status") + if eos_version == "alive": + eos_version = ">=2025-04-09" + logger.info("[EOS] Getting EOS version: %s", eos_version) + return eos_version + except requests.exceptions.HTTPError as e: + if hasattr(e, "response") and e.response and e.response.status_code == 404: + eos_version = "<2025-04-09" + logger.info("[EOS] Getting EOS version: %s", eos_version) + return eos_version + else: + logger.error( + "[EOS] HTTP error occurred while getting EOS version - use preset version:" + + " %s : %s - Response: %s", + self.eos_version, + e, + ( + e.response.text + if hasattr(e, "response") and e.response + else "No response" + ), + ) + return self.eos_version + except requests.exceptions.ConnectTimeout: + logger.error( + "[EOS] Failed to get EOS version - use preset version: '%s' - Server not " + + "reachable: Connection to %s timed out", + self.eos_version, + self.base_url, + ) + return self.eos_version + except requests.exceptions.ConnectionError as e: + logger.error( + "[EOS] Failed to get EOS version - use preset version: '%s' - Connection error: %s", + self.eos_version, + e, + ) + return self.eos_version + except requests.exceptions.RequestException as e: + logger.error( + "[EOS] Failed to get EOS version - use preset version: '%s' - Error: %s ", + self.eos_version, + e, + ) + return self.eos_version + except json.JSONDecodeError as e: + logger.error( + "[EOS] Failed to decode EOS version - use preset version: '%s' - response: %s ", + self.eos_version, + e, + ) + return self.eos_version + + def get_eos_version(self): + """ + Get the EOS version from the server. + Returns: str + """ + return self.eos_version + + def create_dataframe(self, profile): + """ + Creates a pandas DataFrame with hourly energy values for a given profile. + Args: + profile (list of tuples): Each tuple: (month, weekday, hour, energy) + Returns: + pandas.DataFrame: DateTime index for 2025, 'Household' column. + """ + dates = pd.date_range(start="1/1/2025", end="31/12/2025", freq="H") + df = pd.DataFrame(index=dates) + df["Household"] = np.nan + for entry in profile: + month, weekday, hour, energy = entry + matching_dates = df[ + (df.index.month == month) + & (df.index.weekday == weekday) + & (df.index.hour == hour) + ].index + for date in matching_dates: + df.loc[date, "Household"] = energy + return df + + def _validate_eos_input(self, eos_request): + """ + Validate EOS-format optimization request. + Returns: (bool, list[str]) - valid, errors + """ + errors = [] + if not isinstance(eos_request, dict): + errors.append("Request must be a dictionary.") + # Add more checks as needed + # Example: check required keys + required_keys = ["ems", "pv_akku"] + for key in required_keys: + if key not in eos_request: + errors.append(f"Missing required key: {key}") + return len(errors) == 0, errors diff --git a/src/interfaces/optimization_backends/optimization_backend_evcc_opt.py b/src/interfaces/optimization_backends/optimization_backend_evcc_opt.py new file mode 100644 index 0000000..6783e57 --- /dev/null +++ b/src/interfaces/optimization_backends/optimization_backend_evcc_opt.py @@ -0,0 +1,601 @@ +""" +Module: optimization_backend_evcc_opt +This module provides the EVCCOptBackend class, which acts as a backend for EVCC Opt optimization. +It accepts EOS-format optimization requests, transforms them into the EVCC Opt format, sends them +to the EVCC Opt server, +and transforms the responses back into EOS-format responses. +Classes: + EVCCOptBackend: Handles the transformation, communication, and response processing for + EVCC Opt optimization. +Typical usage example: + backend = EVCCOptBackend(base_url="http://evcc-opt-server", + time_zone=pytz.timezone("Europe/Berlin")) + eos_response, avg_runtime = backend.optimize(eos_request) +""" + +import logging +import time +import json +import os +from datetime import datetime +import requests + +logger = logging.getLogger("__main__") + + +class EVCCOptBackend: + """ + Backend for EVCC Opt optimization. + Accepts EOS-format requests, transforms to EVCC Opt format, and returns EOS-format responses. + """ + + def __init__(self, base_url, time_zone): + self.base_url = base_url + self.time_zone = time_zone + self.last_optimization_runtimes = [0] * 5 + self.last_optimization_runtime_number = 0 + + def optimize(self, eos_request, timeout=180): + """ + Accepts EOS-format request, transforms to EVCC Opt format, sends request, + transforms response back to EOS-format, and returns (response_json, avg_runtime). + """ + evcc_request, errors = self._transform_request_to_evcc(eos_request) + if errors: + logger.error("[EVCC OPT] Request transformation errors: %s", errors) + # Optionally, write transformed payload to json file for debugging + debug_path = os.path.join( + os.path.dirname(__file__), + "..", + "..", + "json", + "optimize_request_evcc_opt.json", + ) + debug_path = os.path.abspath(debug_path) + try: + with open(debug_path, "w", encoding="utf-8") as fh: + json.dump(evcc_request, fh, indent=2, ensure_ascii=False) + except OSError as e: + logger.warning("[EVCC OPT] Could not write debug file: %s", e) + + request_url = self.base_url + "/optimize/charge-schedule" + logger.info( + "[EVCC OPT] Request optimization with: %s - and with timeout: %s", + request_url, + timeout, + ) + headers = {"accept": "application/json", "Content-Type": "application/json"} + response = None + try: + start_time = time.time() + response = requests.post( + request_url, headers=headers, json=evcc_request, timeout=timeout + ) + end_time = time.time() + elapsed_time = end_time - start_time + minutes, seconds = divmod(elapsed_time, 60) + logger.info( + "[EVCC OPT] Response retrieved successfully in %d min %.2f sec for current run", + int(minutes), + seconds, + ) + response.raise_for_status() + # Store runtime in circular list + if all(runtime == 0 for runtime in self.last_optimization_runtimes): + self.last_optimization_runtimes = [elapsed_time] * 5 + else: + self.last_optimization_runtimes[ + self.last_optimization_runtime_number + ] = elapsed_time + self.last_optimization_runtime_number = ( + self.last_optimization_runtime_number + 1 + ) % 5 + avg_runtime = sum(self.last_optimization_runtimes) / 5 + evcc_response = response.json() + eos_response = self._transform_response_from_evcc( + evcc_response, evcc_request + ) + return eos_response, avg_runtime + except requests.exceptions.Timeout: + logger.error("[EVCC OPT] Request timed out after %s seconds", timeout) + return {"error": "Request timed out - trying again with next run"}, None + except requests.exceptions.ConnectionError as e: + logger.error( + "[EVCC OPT] Connection error - server not reachable at %s " + "will try again with next cycle - error: %s", + request_url, + str(e), + ) + return { + "error": f"EVCC OPT server not reachable at {self.base_url} " + "will try again with next cycle" + }, None + except requests.exceptions.RequestException as e: + logger.error("[EVCC OPT] Request failed: %s", e) + if response is not None: + logger.error("[EVCC OPT] Response status: %s", response.status_code) + logger.debug( + "[EVCC OPT] ERROR - response of server is:\n%s", + response.text, + ) + logger.debug( + "[EVCC OPT] ERROR - payload for the request was:\n%s", + evcc_request, + ) + return {"error": str(e)}, None + + def _transform_request_to_evcc(self, eos_request): + """ + Translate EOS request -> EVCC request. + Returns (evcc_req: dict, external_errors: list[str]) + """ + eos_request = eos_request or {} + errors = [] + + ems = eos_request.get("ems", {}) or {} + pv_series = ems.get("pv_prognose_wh", []) or [] + price_series = ems.get("strompreis_euro_pro_wh", []) or [] + feed_series = ems.get("einspeiseverguetung_euro_pro_wh", []) or [] + load_series = ems.get("gesamtlast", []) or [] + + current_hour = datetime.now(self.time_zone).hour + pv_series = ( + pv_series[current_hour:] if len(pv_series) > current_hour else pv_series + ) + price_series = ( + price_series[current_hour:] + if len(price_series) > current_hour + else price_series + ) + feed_series = ( + feed_series[current_hour:] + if len(feed_series) > current_hour + else feed_series + ) + load_series = ( + load_series[current_hour:] + if len(load_series) > current_hour + else load_series + ) + + lengths = [ + len(s) + for s in (pv_series, price_series, feed_series, load_series) + if len(s) > 0 + ] + n = min(lengths) if lengths else 1 + + def normalize(arr): + return [float(x) for x in arr[:n]] if arr else [0.0] * n + + pv_ts = normalize(pv_series) + price_ts = normalize(price_series) + feed_ts = normalize(feed_series) + load_ts = normalize(load_series) + + pv_akku = eos_request.get("pv_akku") or {} + batt_capacity_wh = float(pv_akku.get("capacity_wh", 0)) + batt_initial_pct = float(pv_akku.get("initial_soc_percentage", 0)) + batt_min_pct = float(pv_akku.get("min_soc_percentage", 0)) + batt_max_pct = float(pv_akku.get("max_soc_percentage", 100)) + batt_c_max = float(pv_akku.get("max_charge_power_w", 0)) + batt_eta_c = float(pv_akku.get("charging_efficiency", 0.95)) + batt_eta_d = float(pv_akku.get("discharging_efficiency", 0.95)) + + s_min = batt_capacity_wh * (batt_min_pct / 100.0) + s_max = batt_capacity_wh * (batt_max_pct / 100.0) + s_initial = batt_capacity_wh * (batt_initial_pct / 100.0) + + batteries = [] + if batt_capacity_wh > 0: + batteries.append( + { + "device_id": pv_akku.get("device_id", "akku1"), + "charge_from_grid": True, + "discharge_to_grid": True, + "s_min": s_min, + "s_max": s_max, + "s_initial": s_initial, + "p_demand": [0.0] * n, + # "s_goal": [s_initial] * n, + "s_goal": [0.0] * n, + "c_min": 0.0, + "c_max": batt_c_max, + "d_max": batt_c_max, + "p_a": 0.0, + } + ) + + p_max_imp = 10000 + p_max_exp = 10000 + + evcc_req = { + "strategy": { + "charging_strategy": "charge_before_export", + "discharging_strategy": "discharge_before_import", + }, + "grid": { + "p_max_imp": p_max_imp, + "p_max_exp": p_max_exp, + "prc_p_imp_exc": 0, + }, + "batteries": batteries, + "time_series": { + "dt": [3600.0] * n, + "gt": [float(x) for x in load_ts], + "ft": [float(x) for x in pv_ts], + "p_N": [float(x) for x in price_ts], + "p_E": [float(x) for x in feed_ts], + }, + "eta_c": batt_eta_c if batt_capacity_wh > 0 else 0.95, + "eta_d": batt_eta_d if batt_capacity_wh > 0 else 0.95, + } + + return evcc_req, errors + + def _transform_response_from_evcc(self, evcc_resp, evcc_req=None): + """ + Translate EVCC optimizer response -> EOS-style optimize response. + + Produces a fuller EOS-shaped response using the sample `src/json/optimize_response.json` + as guidance. The mapping is conservative and uses available EVCC fields: + + - ac_charge, dc_charge, discharge_allowed, start_solution + - result.* arrays: Last_Wh_pro_Stunde, EAuto_SoC_pro_Stunde, + Einnahmen_Euro_pro_Stunde, Kosten_Euro_pro_Stunde, Netzbezug_Wh_pro_Stunde, + Netzeinspeisung_Wh_pro_Stunde, Verluste_Pro_Stunde, akku_soc_pro_stunde, + Electricity_price + - numeric summaries: Gesamt_Verluste, Gesamtbilanz_Euro, Gesamteinnahmen_Euro, + Gesamtkosten_Euro + - eauto_obj, washingstart, timestamp + + evcc_resp: dict (raw EVCC JSON) or {"response": {...}}. + evcc_req: optional EVCC request dict (used to read p_N, p_E, eta_c, eta_d, + battery s_max/c_max). + """ + # defensive guard + if not isinstance(evcc_resp, dict): + logger.debug( + "[EOS] EVCC transform response - input not a dict, returning empty dict" + ) + return {} + + # EVCC might wrap actual payload under "response" + resp = evcc_resp.get("response", evcc_resp) + + # Set total hours and slice for future + current_hour = datetime.now(self.time_zone).hour + n_total = 48 # Total hours from midnight today to midnight tomorrow + n_future = n_total - current_hour # Hours from now to end of tomorrow + n = n_future # Override n to focus on future horizon + + # # determine horizon length n + # n = 0 + # if isinstance(resp.get("grid_import"), list): + # n = len(resp.get("grid_import")) + # elif isinstance(resp.get("grid_export"), list): + # n = len(resp.get("grid_export")) + # else: + # # try batteries[*].charging_power + # b_list = resp.get("batteries") or [] + # if isinstance(b_list, list) and len(b_list) > 0: + # b0 = b_list[0] + # if isinstance(b0.get("charging_power"), list): + # n = len(b0.get("charging_power")) + # if n == 0: + # n = 24 + + # primary battery arrays (first battery) + batteries_resp = resp.get("batteries") or [] + first_batt = batteries_resp[0] if batteries_resp else {} + charging_power = list(first_batt.get("charging_power") or [0.0] * n)[:n] + discharging_power = list(first_batt.get("discharging_power") or [0.0] * n)[:n] + soc_wh = list(first_batt.get("state_of_charge") or [])[:n] + + # grid arrays + grid_import = list(resp.get("grid_import") or [0.0] * n)[:n] + grid_export = list(resp.get("grid_export") or [0.0] * n)[:n] + + # harvest pricing from evcc_req when available (per-Wh units) + p_n = None + p_e = None + electricity_price = [None] * n + if isinstance(evcc_req, dict): + ts = evcc_req.get("time_series", {}) or {} + p_n = ts.get("p_N") + p_e = ts.get("p_E") + # if p_N/p_E are lists, normalize length + if isinstance(p_n, list): + p_n = ( + [float(x) for x in p_n[:n]] + + [float(p_n[-1])] * max(0, n - len(p_n)) + if p_n + else None + ) + if isinstance(p_e, list): + p_e = ( + [float(x) for x in p_e[:n]] + + [float(p_e[-1])] * max(0, n - len(p_e)) + if p_e + else None + ) + if isinstance(p_n, list): + electricity_price = [float(x) for x in p_n[:n]] + elif isinstance(p_n, (int, float)): + electricity_price = [float(p_n)] * n + + # fallback price arrays if missing + if not any(isinstance(x, (int, float)) for x in electricity_price): + electricity_price = [0.0] * n + if p_n is None: + p_n = electricity_price + if p_e is None: + p_e = [0.0] * n + + # battery parameters from request if present (s_max in Wh, eta_c, eta_d) + s_max_req = None + eta_c = None + eta_d = None + if isinstance(evcc_req, dict): + breq = evcc_req.get("batteries") + if isinstance(breq, list) and len(breq) > 0: + b0r = breq[0] + try: + s_max_req = float(b0r.get("s_max", 0.0)) + except (ValueError, TypeError): + s_max_req = None + try: + eta_c = float(evcc_req.get("eta_c", b0r.get("eta_c", 0.95) or 0.95)) + except (ValueError, TypeError): + eta_c = 0.95 + try: + eta_d = float(evcc_req.get("eta_d", b0r.get("eta_d", 0.95) or 0.95)) + except (ValueError, TypeError): + eta_d = 0.95 + # Set defaults + eta_c = eta_c if eta_c is not None else 0.95 + eta_d = eta_d if eta_d is not None else 0.95 + s_max_val = s_max_req if s_max_req not in (None, 0) else None + + # compute ac_charge fraction: charging_power normalized to c_max from request + # or observed max + c_max = None + d_max = None + if isinstance(evcc_req, dict): + breq = evcc_req.get("batteries") + if isinstance(breq, list) and len(breq) > 0: + try: + c_max = float(breq[0].get("c_max", 0.0)) + except (ValueError, TypeError): + c_max = None + try: + d_max = float(breq[0].get("d_max", 0.0)) + except (ValueError, TypeError): + d_max = None + # fallback observed maxima + try: + if not c_max: + observed_max_ch = ( + max([float(x) for x in charging_power]) if charging_power else 0.0 + ) + c_max = observed_max_ch if observed_max_ch > 0 else 1.0 + if not d_max: + observed_max_dch = ( + max([float(x) for x in discharging_power]) + if discharging_power + else 0.0 + ) + d_max = observed_max_dch if observed_max_dch > 0 else 1.0 + except (ValueError, TypeError): + c_max = c_max or 1.0 + d_max = d_max or 1.0 + + ac_charge = [] + for v in charging_power: + try: + frac = float(v) / float(c_max) if float(c_max) > 0 else 0.0 + except (ValueError, TypeError): + frac = 0.0 + if frac != frac: + frac = 0.0 + ac_charge.append(max(0.0, min(1.0, frac))) + + # Adjust ac_charge: set to 0 if no grid import (PV-only charging) + for i in range(n): + if grid_import[i] <= 0: + ac_charge[i] = 0.0 + + # dc_charge: mark 1.0 if charging_power > 0 (conservative) + dc_charge = [1.0 if float(v) > 0.0 else 0.0 for v in charging_power] + + # discharge_allowed: 1 if discharging_power > tiny epsilon + discharge_allowed = [1 if float(v) > 1e-9 else 0 for v in discharging_power] + + # start_solution: prefer resp['start_solution'] if present, else try + # eauto_obj.charge_array -> ints, otherwise zeros + start_solution = None + if isinstance(resp.get("start_solution"), list): + # coerce to numbers + start_solution = [ + float(x) if isinstance(x, (int, float)) else 0 + for x in resp.get("start_solution")[:n] + ] + else: + eauto_obj = resp.get("eauto_obj") or evcc_resp.get("eauto_obj") + if isinstance(eauto_obj, dict) and isinstance( + eauto_obj.get("charge_array"), list + ): + # map boolean/float charge_array to integers (placeholder) + start_solution = [ + int(1 if float(x) > 0 else 0) + for x in eauto_obj.get("charge_array")[:n] + ] + if start_solution is None: + start_solution = [0] * n + + # washingstart if present + washingstart = resp.get("washingstart") + + # compute per-hour costs and revenues in Euro (using €/Wh units from p_N/p_E) + kosten_per_hour = [] + einnahmen_per_hour = [] + for i in range(n): + gi = float(grid_import[i]) if i < len(grid_import) else 0.0 + ge = float(grid_export[i]) if i < len(grid_export) else 0.0 + pr = ( + float(p_n[i]) + if isinstance(p_n, list) and i < len(p_n) + else float(p_n[i]) if isinstance(p_n, list) and len(p_n) > 0 else 0.0 + ) + pe = ( + float(p_e[i]) + if isinstance(p_e, list) and i < len(p_e) + else (float(p_e[i]) if isinstance(p_e, list) and len(p_e) > 0 else 0.0) + ) + # if p_N/p_E are scalars (should be lists), handle above; fallback zero if missing + if isinstance(p_n, (int, float)): + pr = float(p_n) + if isinstance(p_e, (int, float)): + pe = float(p_e) + + kosten = gi * pr + einnahmen = ge * pe + kosten_per_hour.append(kosten) + einnahmen_per_hour.append(einnahmen) + + # estimate per-hour battery losses: charging_loss + discharging_loss + verluste_per_hour = [] + for i in range(n): + ch = float(charging_power[i]) if i < len(charging_power) else 0.0 + dch = float(discharging_power[i]) if i < len(discharging_power) else 0.0 + loss = ch * (1.0 - eta_c) + dch * (1.0 - eta_d) + verluste_per_hour.append(loss) + + # Akku SoC percent per hour (if soc_wh available): convert to percent using s_max_req + # or inferred max + akku_soc_pct = [] + if soc_wh: + # determine s_max reference: use s_max_req if provided, otherwise attempt to + # infer from soc_wh max + ref = s_max_val + if not ref: + try: + ref = max([float(x) for x in soc_wh]) if soc_wh else None + except (ValueError, TypeError): + ref = None + for v in soc_wh: + try: + if ref and ref > 0: + pct = float(v) / float(ref) * 100.0 + else: + pct = float(v) + except (ValueError, TypeError): + pct = 0.0 + akku_soc_pct.append(pct) + else: + akku_soc_pct = [] + + # totals + gesamt_kosten = sum(kosten_per_hour) if kosten_per_hour else 0.0 + gesamt_einnahmen = sum(einnahmen_per_hour) if einnahmen_per_hour else 0.0 + gesamt_verluste = sum(verluste_per_hour) if verluste_per_hour else 0.0 + gesamt_bilanz = gesamt_einnahmen - gesamt_kosten + + # build result dict like optimize_response.json + result = {} + # Prefer household load ('gt') from the EVCC request if available, + # otherwise fall back to EVCC response grid_import (parity with previous behavior). + last_wh = None + if isinstance(evcc_req, dict): + ts = evcc_req.get("time_series", {}) or {} + gt = ts.get("gt") + if isinstance(gt, list) and len(gt) > 0: + # normalize/trim/pad gt to length n (similar to other normalizations) + if len(gt) >= n: + last_wh = [float(x) for x in gt[:n]] + else: + last_val = float(gt[-1]) + last_wh = [float(x) for x in gt] + [last_val] * (n - len(gt)) + # fallback to grid_import if gt not present or invalid + if last_wh is None: + last_wh = [float(x) for x in grid_import[:n]] + + result["Last_Wh_pro_Stunde"] = last_wh + + if akku_soc_pct: + # EAuto_SoC_pro_Stunde - fallback to eauto object SOC or same as akku + # percent if appropriate + result["EAuto_SoC_pro_Stunde"] = ( + [ + float(x) + for x in ( + evcc_resp.get("eauto_obj", {}).get("soc_wh") + or ( + [] + if not isinstance(evcc_resp.get("eauto_obj", {}), dict) + else [] + ) + )[:n] + ] + if evcc_resp.get("eauto_obj") + else [] + ) + # Einnahmen & Kosten per hour + result["Einnahmen_Euro_pro_Stunde"] = [float(x) for x in einnahmen_per_hour] + result["Kosten_Euro_pro_Stunde"] = [float(x) for x in kosten_per_hour] + result["Gesamt_Verluste"] = float(gesamt_verluste) + result["Gesamtbilanz_Euro"] = float(gesamt_bilanz) + result["Gesamteinnahmen_Euro"] = float(gesamt_einnahmen) + result["Gesamtkosten_Euro"] = float(gesamt_kosten) + # Home appliance placeholder (zeros) + result["Home_appliance_wh_per_hour"] = [0.0] * n + result["Netzbezug_Wh_pro_Stunde"] = [float(x) for x in grid_import[:n]] + result["Netzeinspeisung_Wh_pro_Stunde"] = [float(x) for x in grid_export[:n]] + result["Verluste_Pro_Stunde"] = [float(x) for x in verluste_per_hour] + if akku_soc_pct: + result["akku_soc_pro_stunde"] = [float(x) for x in akku_soc_pct[:n]] + # Electricity price array + result["Electricity_price"] = [float(x) for x in electricity_price[:n]] + + # Pad past hours with zeros for control arrays + pad_past = [0.0] * current_hour + + eos_resp = { + "ac_charge": pad_past + [float(x) for x in ac_charge], + "dc_charge": pad_past + [float(x) for x in dc_charge], + "discharge_allowed": pad_past + [int(x) for x in discharge_allowed], + "eautocharge_hours_float": None, + "result": result, # result arrays remain unpadded (start from current time) + } + + # attach eauto_obj if present in resp + if "eauto_obj" in resp: + eos_resp["eauto_obj"] = resp.get("eauto_obj") + + # map start_solution and washingstart if present + eos_resp["start_solution"] = pad_past + start_solution + if washingstart is not None: + eos_resp["washingstart"] = pad_past + washingstart + + # timestamp + try: + eos_resp["timestamp"] = datetime.now(self.time_zone).isoformat() + except (ValueError, TypeError): + eos_resp["timestamp"] = datetime.now().isoformat() + + return eos_resp + + def _validate_evcc_request(self, evcc_req): + """ + Validate EVCC Opt-format optimization request. + Returns: (bool, list[str]) - valid, errors + """ + errors = [] + if not isinstance(evcc_req, dict): + errors.append("EVCC request must be a dictionary.") + # Example: check required keys + required_keys = ["strategy", "grid", "batteries", "time_series"] + for key in required_keys: + if key not in evcc_req: + errors.append(f"Missing required key: {key}") + return len(errors) == 0, errors diff --git a/src/interfaces/optimization_interface.py b/src/interfaces/optimization_interface.py new file mode 100644 index 0000000..903c8dc --- /dev/null +++ b/src/interfaces/optimization_interface.py @@ -0,0 +1,300 @@ +""" +optimization_interface.py +This module provides the OptimizationInterface class, which serves as the main abstraction layer +for interacting with different optimization backends. It accepts and returns requests and responses +in the EOS format, handles backend selection, and delegates transformation logic to the selected +backend. The interface also manages control data, home appliance scheduling, and calculates the next +optimal run time for the optimization process. +Classes: + OptimizationInterface: Abstraction for optimization backends supporting EOS-format requests and + responses. +Usage: + Instantiate OptimizationInterface with configuration and timezone, then use its methods to + perform optimization, retrieve control data, and manage scheduling. +Example: + interface = OptimizationInterface(config, timezone) + response, avg_runtime = interface.optimize(eos_request) +""" + +import logging +from datetime import datetime, timedelta +from .optimization_backends.optimization_backend_eos import EOSBackend +from .optimization_backends.optimization_backend_evcc_opt import EVCCOptBackend + +logger = logging.getLogger("__main__") + + +class OptimizationInterface: + """ + Main abstraction for optimization backends. + Accepts and returns EOS-format requests/responses. + Handles backend selection and delegates all transformation logic to the backend. + """ + + def __init__(self, config, timezone): + self.eos_source = config.get("source", "eos_server") + self.base_url = ( + f"http://{config.get('server', '192.168.1.1')}:{config.get('port', 8503)}" + ) + self.time_zone = timezone + + if self.eos_source == "evcc_opt": + self.backend = EVCCOptBackend(self.base_url, self.time_zone) + self.backend_type = "evcc_opt" + logger.info("[OPT] Using EVCC Opt backend") + elif self.eos_source == "eos_server": + self.backend = EOSBackend(self.base_url, self.time_zone) + self.backend_type = "eos_server" + logger.info("[OPT] Using EOS Server backend") + else: + raise ValueError(f"Unknown backend source: {self.eos_source}") + + self.last_start_solution = None + self.home_appliance_released = False + self.home_appliance_start_hour = None + self.last_control_data = [ + { + "ac_charge_demand": 0, + "dc_charge_demand": 0, + "discharge_allowed": False, + "error": 0, + "hour": -1, + }, + { + "ac_charge_demand": 0, + "dc_charge_demand": 0, + "discharge_allowed": False, + "error": 0, + "hour": -1, + }, + ] + + def optimize(self, eos_request, timeout=180): + """ + Main entry point for optimization. + Accepts EOS-format request, returns EOS-format response. + """ + eos_response, avg_runtime = self.backend.optimize(eos_request, timeout) + return eos_response, avg_runtime + + def examine_response_to_control_data(self, optimized_response_in): + """ + Examines the optimized response data for control parameters. + Returns tuple: (ac_charge, dc_charge, discharge_allowed, response_error) + """ + current_hour = datetime.now(self.time_zone).hour + ac_charge_demand_relative = None + dc_charge_demand_relative = None + discharge_allowed = None + response_error = False + + if "ac_charge" in optimized_response_in: + ac_charge_demand_relative = optimized_response_in["ac_charge"] + self.last_control_data[0]["ac_charge_demand"] = ac_charge_demand_relative[ + current_hour + ] + self.last_control_data[1]["ac_charge_demand"] = ac_charge_demand_relative[ + current_hour + 1 if current_hour < 23 else 0 + ] + ac_charge_demand_relative = ac_charge_demand_relative[current_hour] + logger.debug( + "[OPT] AC charge demand for current hour %s:00 -> %s %%", + current_hour, + ac_charge_demand_relative * 100, + ) + if "dc_charge" in optimized_response_in: + dc_charge_demand_relative = optimized_response_in["dc_charge"] + self.last_control_data[0]["dc_charge_demand"] = dc_charge_demand_relative[ + current_hour + ] + self.last_control_data[1]["dc_charge_demand"] = dc_charge_demand_relative[ + current_hour + 1 if current_hour < 23 else 0 + ] + dc_charge_demand_relative = dc_charge_demand_relative[current_hour] + logger.debug( + "[OPT] DC charge demand for current hour %s:00 -> %s %%", + current_hour, + dc_charge_demand_relative * 100, + ) + if "discharge_allowed" in optimized_response_in: + discharge_allowed = optimized_response_in["discharge_allowed"] + self.last_control_data[0]["discharge_allowed"] = discharge_allowed[ + current_hour + ] + self.last_control_data[1]["discharge_allowed"] = discharge_allowed[ + current_hour + 1 if current_hour < 23 else 0 + ] + discharge_allowed = bool(discharge_allowed[current_hour]) + logger.debug( + "[OPT] Discharge allowed for current hour %s:00 %s", + current_hour, + discharge_allowed, + ) + + if ( + "start_solution" in optimized_response_in + and len(optimized_response_in["start_solution"]) > 1 + ): + self.set_last_start_solution(optimized_response_in["start_solution"]) + logger.debug( + "[OPT] Start solution for current hour %s:00 %s", + current_hour, + self.get_last_start_solution(), + ) + else: + logger.error("[OPT] No control data in optimized response") + response_error = True + + self.last_control_data[0]["error"] = int(response_error) + self.last_control_data[1]["error"] = int(response_error) + self.last_control_data[0]["hour"] = current_hour + self.last_control_data[1]["hour"] = current_hour + 1 if current_hour < 23 else 0 + + if "washingstart" in optimized_response_in: + self.home_appliance_start_hour = optimized_response_in["washingstart"] + self.home_appliance_released = ( + self.home_appliance_start_hour == current_hour + ) + logger.debug( + "[OPT] Home appliance - current hour %s:00 - start hour %s - is Released: %s", + current_hour, + self.home_appliance_start_hour, + self.home_appliance_released, + ) + + return ( + ac_charge_demand_relative, + dc_charge_demand_relative, + discharge_allowed, + response_error, + ) + + def set_last_start_solution(self, last_start_solution): + """ + Sets the last start solution for the optimization process. + + Args: + last_start_solution: The solution to be stored as the last start solution. + """ + self.last_start_solution = last_start_solution + + def get_last_start_solution(self): + """ + Returns the last start solution used in the optimization process. + + Returns: + Any: The last start solution stored in the instance. + """ + return self.last_start_solution + + def get_last_control_data(self): + """ + Retrieve the most recent control data. + + Returns: + Any: The last control data stored in the instance. + """ + return self.last_control_data + + def get_home_appliance_released(self): + """ + Returns the value of the home_appliance_released attribute. + + Returns: + Any: The current value of home_appliance_released. + """ + return self.home_appliance_released + + def get_home_appliance_start_hour(self): + """ + Returns the start hour for the home appliance. + + Returns: + int: The hour at which the home appliance is scheduled to start. + """ + return self.home_appliance_start_hour + + def calculate_next_run_time(self, current_time, avg_runtime, update_interval): + """ + Calculate the next run time prioritizing quarter-hour alignment with improved gap filling. + """ + # Calculate minimum time between runs + min_gap_seconds = max((update_interval + avg_runtime) * 0.7, 30) + + # Find next quarter-hour from current time + next_quarter = current_time.replace(second=0, microsecond=0) + current_minute = next_quarter.minute + + minutes_past_quarter = current_minute % 15 + if minutes_past_quarter == 0 and current_time.second > 0: + minutes_to_add = 15 + elif minutes_past_quarter == 0: + minutes_to_add = 15 + else: + minutes_to_add = 15 - minutes_past_quarter + + next_quarter += timedelta(minutes=minutes_to_add) + + quarter_aligned_start = next_quarter - timedelta(seconds=avg_runtime) + + # **BUG FIX**: Check if quarter_aligned_start is in the past + if quarter_aligned_start <= current_time: + # Move to the next quarter-hour + next_quarter += timedelta(minutes=15) + quarter_aligned_start = next_quarter - timedelta(seconds=avg_runtime) + logger.debug( + "[OPTIMIZATION] Quarter start was in past, moved to next: %s", + next_quarter.strftime("%H:%M:%S"), + ) + + time_until_quarter_start = ( + quarter_aligned_start - current_time + ).total_seconds() + + # Debug logging + logger.debug( + "[OPTIMIZATION] Debug: current=%s, next_quarter=%s, quarter_start=%s, time_until=%.1fs", + current_time.strftime("%H:%M:%S"), + next_quarter.strftime("%H:%M:%S"), + quarter_aligned_start.strftime("%H:%M:%S"), + time_until_quarter_start, + ) + + # More aggressive gap-filling: if we have at least 2x the update interval, + # try a gap-fill run + if ( + time_until_quarter_start >= (2 * update_interval) + and time_until_quarter_start >= min_gap_seconds + ): + normal_next_start = current_time + timedelta(seconds=update_interval) + logger.info( + "[OPTIMIZATION] Gap-fill run: start %s (quarter-aligned run follows at %s)", + normal_next_start.strftime("%H:%M:%S"), + next_quarter.strftime("%H:%M:%S"), + ) + return normal_next_start + + # Otherwise, use quarter-aligned timing + absolute_min_seconds = max(avg_runtime * 0.5, 30) + if time_until_quarter_start < absolute_min_seconds: + next_quarter += timedelta(minutes=15) + quarter_aligned_start = next_quarter - timedelta(seconds=avg_runtime) + logger.debug( + "[OPTIMIZATION] Quarter too close, moved to next: %s", + next_quarter.strftime("%H:%M:%S"), + ) + + logger.info( + "[OPTIMIZATION] Quarter-hour aligned run: start %s, finish at %s", + quarter_aligned_start.strftime("%H:%M:%S"), + next_quarter.strftime("%H:%M:%S"), + ) + return quarter_aligned_start + + def get_eos_version(self): + """ + Returns the EOS version from the backend if available. + """ + if hasattr(self.backend, "get_eos_version"): + return self.backend.get_eos_version() + return None diff --git a/src/web/index.html b/src/web/index.html index ddbcc14..97baee4 100644 --- a/src/web/index.html +++ b/src/web/index.html @@ -321,6 +321,25 @@ +
diff --git a/src/web/js/controls.js b/src/web/js/controls.js index a6d090d..50e3ad6 100644 --- a/src/web/js/controls.js +++ b/src/web/js/controls.js @@ -427,6 +427,12 @@ class ControlsManager { // Update mode icon and click handler this.updateModeIcon(inverterModeNum, overrideActive, controlsData.battery.max_charge_power_dyn); + + if (controlsData.used_optimization_source === "evcc_opt") { + document.getElementById("experimental-banner").style.display = "flex"; + } else { + document.getElementById("experimental-banner").style.display = "none"; + } } /** @@ -475,6 +481,7 @@ class ControlsManager { document.getElementById('control_discharge_allowed').innerText = states.current_discharge_allowed ? "Yes" : "No"; document.getElementById('current_controls_box').style.border = ""; + } /** diff --git a/tests/interfaces/test_eos_interface.py b/tests/interfaces/test_eos_interface.py deleted file mode 100644 index d90a6b6..0000000 --- a/tests/interfaces/test_eos_interface.py +++ /dev/null @@ -1,267 +0,0 @@ -""" -Module: test_eos_interface -========================== -This module contains tests for the `EosInterface` class, specifically focusing on the scheduling -algorithm implemented in `calculate_next_run_time`. The tests validate the algorithm's behavior -under various scenarios, ensuring reasonable and consistent scheduling for EOS data collection -jobs. -Fixtures: ---------- -- `patch_eos_version`: Automatically patches the EOS version retrieval to return a mocked - value for all tests. -Tests: ------- -- `test_calculate_next_run_time_combinations`: Parametrized test that checks the scheduling - algorithm across a wide range of input combinations. Validates output type, timing - constraints, quarter-hour alignment, performance, and consistency. -- `test_calculate_next_run_time_patterns`: Examines the scheduling patterns over multiple - consecutive runs for different scenarios, ensuring expected behavior such as quarter-hour - alignment or gap-filling. -- `test_algorithm_behavior_showcase`: Demonstrates the algorithm's scheduling decisions for - documentation purposes, printing actual run times and types for various intervals and runtimes. -- `test_simulation_over_time`: Simulates a sequence of scheduled runs over time for different - update intervals, printing results to showcase the algorithm's long-term behavior and - alignment patterns. -Usage: ------- -These tests are designed to validate and document the scheduling logic of `EosInterface`, -ensuring robust and predictable job timing for EOS data collection processes. -""" - -import time -from datetime import datetime, timedelta -import pytest -from src.interfaces.eos_interface import EosInterface - - -@pytest.fixture(autouse=True) -def patch_eos_version(monkeypatch): - """ - Patches the EosInterface class to mock the retrieval of the EOS version. - - This function uses the provided monkeypatch fixture to replace the - '_EosInterface__retrieve_eos_version' method of the EosInterface class - with a lambda that returns a fixed string "mocked_version". - - Args: - monkeypatch: The pytest monkeypatch fixture used to modify class behavior. - """ - monkeypatch.setattr( - EosInterface, - "_EosInterface__retrieve_eos_version", - lambda self: "mocked_version", - ) - - -@pytest.mark.parametrize( - "current_time", - [ - datetime(2025, 1, 1, 0, 0), - datetime(2025, 1, 1, 0, 5), - datetime(2025, 1, 1, 0, 7), - datetime(2025, 1, 1, 0, 10), - datetime(2025, 1, 1, 0, 14), - datetime(2025, 1, 1, 0, 15), - datetime(2025, 1, 1, 0, 16), - datetime(2025, 1, 1, 0, 18), - datetime(2025, 1, 1, 0, 22), - datetime(2025, 1, 1, 0, 25), - datetime(2025, 1, 1, 0, 27, 30), - datetime(2025, 1, 1, 0, 29), - datetime(2025, 1, 1, 0, 30), - datetime(2025, 1, 1, 0, 31), - datetime(2025, 1, 1, 0, 36), - datetime(2025, 1, 1, 0, 45), - datetime(2025, 1, 1, 23, 59), - datetime(2025, 1, 1, 13, 14, 58), # specific real-world edge case - ], -) -@pytest.mark.parametrize( - "avg_runtime", [60, 87, 90, 300, 600, 900] # in seconds - added 87s from real logs -) -@pytest.mark.parametrize("update_interval", [60, 300, 600, 899, 900, 1200]) -@pytest.mark.parametrize("is_first_run", [True, False]) -def test_calculate_next_run_time_combinations( - current_time, avg_runtime, update_interval, is_first_run -): - """ - Test the algorithm's actual behavior without trying to predict the exact timing. - Just validate that the output is reasonable and consistent. - """ - ei = EosInterface("localhost", 1234, None) - ei.is_first_run = is_first_run - - start = time.perf_counter() - next_run = ei.calculate_next_run_time(current_time, avg_runtime, update_interval) - duration = time.perf_counter() - start - - # Basic validation - assert isinstance(next_run, datetime) - assert next_run > current_time - - finish_time = next_run + timedelta(seconds=avg_runtime) - time_until_start = (next_run - current_time).total_seconds() - - # Test 1: Not scheduled too soon (minimum 30 seconds) - assert ( - time_until_start >= 25 - ), f"Scheduled too soon: {time_until_start}s from {current_time}" - - # Test 2: Not scheduled unreasonably far in future - max_reasonable_wait = max(3600, update_interval * 3) # 1 hour or 3x interval - assert ( - time_until_start <= max_reasonable_wait - ), f"Scheduled too far: {time_until_start}s > {max_reasonable_wait}s" - - # Test 3: If it claims to be quarter-aligned, verify it actually is - is_quarter_aligned = finish_time.minute % 15 == 0 and finish_time.second == 0 - if is_quarter_aligned: - # If quarter-aligned, the finish time should be exactly on a quarter-hour - assert finish_time.minute in [ - 0, - 15, - 30, - 45, - ], f"Claims quarter-aligned but finishes at {finish_time.strftime('%H:%M:%S')}" - - # Test 4: Performance check - assert duration < 0.1, f"Calculation too slow: {duration}s" - - # Test 5: Consistency check - running again immediately should give same or later time - next_run_2 = ei.calculate_next_run_time(current_time, avg_runtime, update_interval) - time_diff = abs((next_run_2 - next_run).total_seconds()) - assert ( - time_diff < 1 - ), f"Inconsistent results: {next_run} vs {next_run_2} (diff: {time_diff}s)" - - -@pytest.mark.parametrize( - "scenario", - [ - # (current_time, update_interval, avg_runtime, expected_pattern) - (datetime(2025, 1, 1, 0, 0), 300, 60, "mixed"), - (datetime(2025, 1, 1, 0, 13), 300, 60, "mixed"), - (datetime(2025, 1, 1, 0, 0), 900, 60, "quarter_heavy"), - (datetime(2025, 1, 1, 0, 0), 60, 60, "gap_fill_heavy"), - ], -) -def test_calculate_next_run_time_patterns(scenario): - """ - Test patterns over multiple runs without being too prescriptive about exact behavior. - """ - current_time, update_interval, avg_runtime, expected_pattern = scenario - - ei = EosInterface("localhost", 1234, None) - - # Simulate multiple runs to see the pattern - runs = [] - sim_time = current_time - - for _ in range(8): - next_run = ei.calculate_next_run_time(sim_time, avg_runtime, update_interval) - finish_time = next_run + timedelta(seconds=avg_runtime) - - # Determine run type - is_quarter = finish_time.minute % 15 == 0 and finish_time.second == 0 - - # Check if it's a gap-fill (approximately update_interval from last finish) - if runs: - time_since_last = (next_run - runs[-1]["finish"]).total_seconds() - is_gap_fill = abs(time_since_last - update_interval) < 120 # More tolerance - else: - is_gap_fill = False - - runs.append( - { - "start": next_run, - "finish": finish_time, - "is_quarter": is_quarter, - "is_gap_fill": is_gap_fill, - } - ) - sim_time = finish_time + timedelta(seconds=1) # Move just past the finish - - # Count patterns - quarter_count = sum(1 for r in runs if r["is_quarter"]) - gap_fill_count = sum(1 for r in runs if r["is_gap_fill"]) - - # Validate patterns with relaxed expectations - if expected_pattern == "quarter_heavy": - assert ( - quarter_count >= 4 - ), f"Expected many quarter-aligned runs, got {quarter_count}/8" - - elif expected_pattern == "gap_fill_heavy": - assert ( - gap_fill_count >= 4 - ), f"Expected many gap-fill runs, got {gap_fill_count}/8" - - elif expected_pattern == "mixed": - # Just ensure we get some reasonable mix and no crazy gaps - total_time = (runs[-1]["finish"] - runs[0]["start"]).total_seconds() - avg_gap = total_time / (len(runs) - 1) if len(runs) > 1 else 0 - assert ( - avg_gap < update_interval * 2 - ), f"Average gap too large: {avg_gap}s (update_interval: {update_interval}s)" - - # Universal checks: no run should be scheduled unreasonably - for i, run in enumerate(runs): - if i > 0: - gap = (run["start"] - runs[i - 1]["finish"]).total_seconds() - assert gap >= 0, f"Overlapping runs at index {i}: gap={gap}s" - assert gap < 3600, f"Gap too large at index {i}: {gap}s" - - -def test_simulation_over_time(): - """ - Show how the algorithm behaves over several consecutive runs. - """ - ei = EosInterface("localhost", 1234, None) - - scenarios = [ - ("1min", 60), - ("5min", 300), - ("10min", 600), - ("15min", 900), - ] - - for interval_name, update_interval in scenarios: - print(f"\n=== {interval_name} interval simulation ===") - - sim_time = datetime(2025, 1, 1, 0, 0) - avg_runtime = 75 - - run_count = 0 - quarter_count = 0 - - # Run simulation for 12 iterations or until we see a clear pattern - while run_count < 12: - next_run = ei.calculate_next_run_time( - sim_time, avg_runtime, update_interval - ) - finish_time = next_run + timedelta(seconds=avg_runtime) - - is_quarter = finish_time.minute % 15 == 0 and finish_time.second == 0 - if is_quarter: - quarter_count += 1 - - wait_time = (next_run - sim_time).total_seconds() - run_type = "Q" if is_quarter else "G" # Q=Quarter, G=Gap-fill - - print( - f"Run {run_count+1:2d}: {sim_time.strftime('%H:%M:%S')} → " - f"{next_run.strftime('%H:%M:%S')} → " - f"{finish_time.strftime('%H:%M:%S')} " - f"({run_type}, wait: {wait_time:3.0f}s)" - ) - - # Move to just after the finish time for next iteration - sim_time = finish_time + timedelta(seconds=1) - run_count += 1 - - print( - f"Summary: {quarter_count}/{run_count} quarter-aligned runs " - f"({quarter_count/run_count*100:.1f}%)" - ) - - assert True # Always pass - this is for documentation diff --git a/tests/interfaces/test_optimization_interface.py b/tests/interfaces/test_optimization_interface.py new file mode 100644 index 0000000..9917dac --- /dev/null +++ b/tests/interfaces/test_optimization_interface.py @@ -0,0 +1,326 @@ +""" +Unit tests for the OptimizationInterface class, which provides an abstraction layer for optimization +backends such as EOS and EVCC Opt. These tests validate correct integration, response handling, and +configuration management for different backend sources. +Fixtures: + - eos_server_config: Supplies a sample configuration dictionary for the EOS backend. + - evcc_opt_config: Supplies a sample configuration dictionary for the EVCC Opt backend. + - berlin_timezone: Provides a pytz timezone object for Europe/Berlin. + - sample_eos_request: Supplies a representative optimization request payload in EOS format. +Test Cases: + - test_eos_server_optimize: Verifies optimization with the EOS backend, ensuring the response + structure and runtime value are as expected. + - test_evcc_opt_optimize: Verifies optimization with the EVCC Opt backend, checking response + structure and runtime value. + - test_control_data_tracking: Checks the extraction and type correctness of control data from + optimization responses. + - test_get_eos_version: Ensures the EOS backend version retrieval returns the expected version + string. + - test_backend_selection_eos: Verifies that the correct backend is selected for the EOS server. + - test_backend_selection_evcc: Verifies that the correct backend is selected for the EVCC Opt. + - test_backend_selection_unknown: Confirms that an error is raised for an unknown backend + source. +Mocks: + - Uses unittest.mock.patch to replace backend optimization and version retrieval methods, + allowing isolated testing of the interface logic without requiring actual backend servers. +Usage: + Run with pytest to execute all test cases and validate the OptimizationInterface integration + with supported backends. + +""" + +from unittest.mock import patch +import pytz +import pytest +from src.interfaces.optimization_interface import OptimizationInterface + + +@pytest.fixture(name="eos_server_config") +def fixture_eos_server_config(): + """ + Provides a sample EOS server configuration dictionary. + Returns: + dict: Configuration for EOS backend. + """ + return { + "source": "eos_server", + "server": "localhost", + "port": 8503, + } + + +@pytest.fixture(name="evcc_opt_config") +def fixture_evcc_opt_config(): + """ + Provides a sample EVCC Opt server configuration dictionary. + Returns: + dict: Configuration for EVCC Opt backend. + """ + return { + "source": "evcc_opt", + "server": "localhost", + "port": 7050, + } + + +@pytest.fixture(name="berlin_timezone") +def fixture_berlin_timezone(): + """ + Provides a timezone object for Europe/Berlin. + Returns: + pytz.timezone: Timezone object. + """ + + return pytz.timezone("Europe/Berlin") + + +@pytest.fixture(name="sample_eos_request") +def fixture_sample_eos_request(): + """ + Provides a sample EOS-format optimization request. + Returns: + dict: Sample request payload. + """ + return { + "ems": { + "pv_prognose_wh": [0.0] * 48, + "strompreis_euro_pro_wh": [0.0003] * 48, + "einspeiseverguetung_euro_pro_wh": [0.000075] * 48, + "gesamtlast": [400.0] * 48, + }, + "pv_akku": { + "device_id": "battery1", + "capacity_wh": 20000, + "charging_efficiency": 0.9, + "discharging_efficiency": 0.9, + "max_charge_power_w": 10000, + "initial_soc_percentage": 20, + "min_soc_percentage": 5, + "max_soc_percentage": 100, + }, + } + + +def test_eos_server_optimize(eos_server_config, berlin_timezone, sample_eos_request): + """ + Test optimization with EOS backend. + Ensures the response is a dict and contains expected keys. + """ + with patch( + "src.interfaces.optimization_backends.optimization_backend_eos.EOSBackend.optimize" + ) as mock_opt: + mock_opt.return_value = ( + { + "ac_charge": [0.1] * 48, + "dc_charge": [0.2] * 48, + "discharge_allowed": [1] * 48, + "start_solution": [0] * 48, + }, + 1.0, + ) + interface = OptimizationInterface(eos_server_config, berlin_timezone) + response, avg_runtime = interface.optimize(sample_eos_request) + assert isinstance(response, dict) + assert avg_runtime == 1.0 + assert "ac_charge" in response + + +def test_evcc_opt_optimize(evcc_opt_config, berlin_timezone, sample_eos_request): + """ + Test optimization with EVCC Opt backend. + Ensures the response is a dict and contains expected keys. + """ + with patch( + "src.interfaces.optimization_backends.optimization_backend_evcc_opt.EVCCOptBackend.optimize" + ) as mock_opt: + mock_opt.return_value = ( + { + "ac_charge": [0.1] * 48, + "dc_charge": [0.2] * 48, + "discharge_allowed": [1] * 48, + "start_solution": [0] * 48, + }, + 1.0, + ) + interface = OptimizationInterface(evcc_opt_config, berlin_timezone) + response, avg_runtime = interface.optimize(sample_eos_request) + assert isinstance(response, dict) + assert avg_runtime == 1.0 + assert "ac_charge" in response + + +def test_control_data_tracking(eos_server_config, berlin_timezone, sample_eos_request): + """ + Test control data tracking and response examination. + Ensures correct types for control values. + """ + with patch( + "src.interfaces.optimization_backends.optimization_backend_eos.EOSBackend.optimize" + ) as mock_opt: + mock_opt.return_value = ( + { + "ac_charge": [0.1] * 48, + "dc_charge": [0.2] * 48, + "discharge_allowed": [1] * 48, + "start_solution": [0] * 48, + }, + 1.0, + ) + interface = OptimizationInterface(eos_server_config, berlin_timezone) + response, _ = interface.optimize(sample_eos_request) + ac, dc, discharge, error = interface.examine_response_to_control_data(response) + assert isinstance(ac, float) + assert isinstance(dc, float) + assert isinstance(discharge, bool) + assert isinstance(error, bool) or isinstance(error, int) + + +def test_get_eos_version(eos_server_config, berlin_timezone): + """ + Test EOS version retrieval from the backend. + Ensures the correct version string is returned. + """ + with patch( + "src.interfaces.optimization_backends.optimization_backend_eos.EOSBackend.get_eos_version" + ) as mock_ver: + mock_ver.return_value = "2025-04-09" + interface = OptimizationInterface(eos_server_config, berlin_timezone) + assert interface.get_eos_version() == "2025-04-09" + + +def test_backend_selection_eos(eos_server_config, berlin_timezone): + """ + Test that EOSBackend is selected for 'eos_server' source. + """ + interface = OptimizationInterface(eos_server_config, berlin_timezone) + assert interface.backend_type == "eos_server" + + +def test_backend_selection_evcc(evcc_opt_config, berlin_timezone): + """ + Test that EVCCOptBackend is selected for 'evcc_opt' source. + """ + interface = OptimizationInterface(evcc_opt_config, berlin_timezone) + assert interface.backend_type == "evcc_opt" + + +def test_backend_selection_unknown(berlin_timezone): + """ + Test that an unknown backend source raises an error or uses a default. + """ + unknown_config = {"source": "unknown_backend", "server": "localhost", "port": 9999} + with pytest.raises(Exception): + OptimizationInterface(unknown_config, berlin_timezone) + + +def test_interface_methods_exist(eos_server_config, berlin_timezone): + """ + Test that OptimizationInterface exposes required methods. + """ + interface = OptimizationInterface(eos_server_config, berlin_timezone) + for method in [ + "optimize", + "examine_response_to_control_data", + "get_last_control_data", + "get_last_start_solution", + "get_home_appliance_released", + "get_home_appliance_start_hour", + "calculate_next_run_time", + "get_eos_version", + ]: + assert hasattr(interface, method) + + +class DummyBackend: + """ + A dummy backend class for testing optimization interfaces. + Attributes: + base_url (str): The base URL for the backend. + time_zone (str): The time zone associated with the backend. + backend_type (str): The type of backend, set to "dummy". + Methods: + optimize(eos_request, timeout=180): + Simulates an optimization process and returns dummy results. + """ + + def __init__(self, base_url, time_zone): + self.base_url = base_url + self.time_zone = time_zone + self.backend_type = "dummy" + + def optimize(self, eos_request, timeout=180): + """ + Optimizes the given EOS request and returns the optimization results. + + Args: + eos_request: The request object containing EOS parameters for optimization. + timeout (int, optional): Maximum time allowed for the optimization process + in seconds. Defaults to 180. + + Returns: + tuple: A tuple containing: + - dict: Optimization results with key 'ac_charge' mapped to a list + of 48 float values. + - float: The objective value of the optimization. + """ + return {"ac_charge": [0.5] * 48}, 0.5 + + +def test_dummy_backend_integration(monkeypatch, berlin_timezone): + """ + Test that a new backend can be integrated without breaking the interface. + """ + config = {"source": "dummy", "server": "localhost", "port": 1234} + # Monkeypatch the OptimizationInterface to use DummyBackend for 'dummy' + + orig_init = OptimizationInterface.__init__ + + def patched_init(self, config, timezone): + self.eos_source = config.get("source", "eos_server") + self.base_url = ( + f"http://{config.get('server', 'localhost')}:{config.get('port', 8503)}" + ) + self.time_zone = timezone + if self.eos_source == "dummy": + self.backend = DummyBackend(self.base_url, self.time_zone) + self.backend_type = "dummy" + else: + orig_init(self, config, timezone) + self.last_start_solution = None + self.home_appliance_released = False + self.home_appliance_start_hour = None + self.last_control_data = [ + { + "ac_charge_demand": 0, + "dc_charge_demand": 0, + "discharge_allowed": False, + "error": 0, + "hour": -1, + }, + { + "ac_charge_demand": 0, + "dc_charge_demand": 0, + "discharge_allowed": False, + "error": 0, + "hour": -1, + }, + ] + + monkeypatch.setattr(OptimizationInterface, "__init__", patched_init) + interface = OptimizationInterface(config, berlin_timezone) + response, avg_runtime = interface.optimize({}) + assert response["ac_charge"][0] == 0.5 + assert avg_runtime == 0.5 + + +def test_backend_error_handling(eos_server_config, berlin_timezone): + """ + Test that backend errors are handled and do not crash the interface. + """ + with patch( + "src.interfaces.optimization_backends.optimization_backend_eos.EOSBackend.optimize" + ) as mock_opt: + mock_opt.side_effect = Exception("Backend error") + interface = OptimizationInterface(eos_server_config, berlin_timezone) + with pytest.raises(Exception): + interface.optimize({})