-
-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy patheos_connect.py
More file actions
1750 lines (1559 loc) · 66.5 KB
/
eos_connect.py
File metadata and controls
1750 lines (1559 loc) · 66.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
This module fetches energy data from OpenHAB, processes it, and creates a load profile.
"""
import os
import sys
from datetime import datetime, timedelta
import time
import logging
import json
import threading
import pytz
import requests
from flask import Flask, Response, render_template_string, request, send_from_directory
from version import __version__
from config import ConfigManager
from log_handler import MemoryLogHandler
from constants import CURRENCY_SYMBOL_MAP, CURRENCY_MINOR_UNIT_MAP
from interfaces.base_control import BaseControl
from interfaces.load_interface import LoadInterface
from interfaces.battery_interface import BatteryInterface
from interfaces.inverter_fronius import FroniusWR
from interfaces.inverter_fronius_v2 import FroniusWRV2
from interfaces.evcc_interface import EvccInterface
from interfaces.optimization_interface import OptimizationInterface
from interfaces.price_interface import PriceInterface
from interfaces.mqtt_interface import MqttInterface
from interfaces.pv_interface import PvInterface
from interfaces.port_interface import PortInterface
# Check Python version early
if sys.version_info < (3, 11):
sys.stderr.write(
f"ERROR: Python 3.11 or higher is required. "
f"You are running Python {sys.version_info.major}.{sys.version_info.minor}\n"
)
sys.stderr.write("Please upgrade your Python installation.\n")
sys.exit(1)
EOS_TGT_DURATION = 48
###################################################################################################
# Custom formatter to use the configured timezone
class TimezoneFormatter(logging.Formatter):
"""
A custom logging formatter that formats log timestamps according to a specified timezone.
"""
def __init__(self, fmt=None, datefmt=None, tz=None):
super().__init__(fmt, datefmt)
self.tz = tz
def formatTime(self, record, datefmt=None):
# Convert the record's timestamp to the configured timezone
record_time = datetime.fromtimestamp(record.created, self.tz)
return record_time.strftime(datefmt or self.default_time_format)
##################################################################################################
LOGLEVEL = logging.DEBUG # start before reading the config file
logger = logging.getLogger(__name__)
# Basic formatter for startup logging (before config/timezone is available)
basic_formatter = logging.Formatter(
"%(asctime)s %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S"
)
streamhandler = logging.StreamHandler(sys.stdout)
streamhandler.setFormatter(basic_formatter)
logger.addHandler(streamhandler)
logger.setLevel(LOGLEVEL)
logger.info("[Main] Starting eos_connect - version: %s", __version__)
###################################################################################################
base_path = os.path.dirname(os.path.abspath(__file__))
# get param to set a specific path
if len(sys.argv) > 1:
current_dir = sys.argv[1]
else:
current_dir = base_path
###################################################################################################
config_manager = ConfigManager(current_dir)
time_zone = pytz.timezone(config_manager.config["time_zone"])
LOGLEVEL = config_manager.config["log_level"].upper()
logger.setLevel(LOGLEVEL)
# Now upgrade to timezone-aware formatter after config is loaded
timezone_formatter = TimezoneFormatter(
"%(asctime)s %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S", tz=time_zone
)
streamhandler.setFormatter(timezone_formatter)
memory_handler = MemoryLogHandler(
max_records=10000, # All log entries (mixed levels)
max_alerts=2000, # Dedicated alert buffer (WARNING/ERROR/CRITICAL only)
)
memory_handler.setFormatter(timezone_formatter) # Use timezone formatter for web logs
logger.addHandler(memory_handler)
logger.debug("[Main] Memory log handler initialized successfully")
logger.info(
"[Main] set user defined time zone to %s and loglevel to %s",
config_manager.config["time_zone"],
LOGLEVEL,
)
# initialize eos interface
eos_interface = OptimizationInterface(
config=config_manager.config["eos"],
timezone=time_zone,
)
# initialize base control
base_control = BaseControl(config_manager.config, time_zone)
# initialize the inverter interface
inverter_interface = None
# Handle backward compatibility for old interface names
inverter_type = config_manager.config["inverter"]["type"]
if inverter_type == "fronius_gen24_v2":
logger.warning(
"[Config] Interface name 'fronius_gen24_v2' is deprecated. "
"Please update your config.yaml to use 'fronius_gen24' instead. "
"Using enhanced interface for compatibility."
)
inverter_type = "fronius_gen24" # Auto-migrate to new name
if inverter_type == "fronius_gen24":
# Enhanced V2 interface (default for existing users)
logger.info(
"[Inverter] Using enhanced Fronius GEN24 interface with firmware-based authentication"
)
inverter_config = {
"address": config_manager.config["inverter"]["address"],
"max_grid_charge_rate": config_manager.config["inverter"][
"max_grid_charge_rate"
],
"max_pv_charge_rate": config_manager.config["inverter"]["max_pv_charge_rate"],
"user": config_manager.config["inverter"]["user"],
"password": config_manager.config["inverter"]["password"],
}
inverter_interface = FroniusWRV2(inverter_config)
elif inverter_type == "fronius_gen24_legacy":
# Legacy V1 interface (for corner cases)
logger.info(
"[Inverter] Using legacy Fronius GEN24 interface (V1) for compatibility"
)
inverter_config = {
"address": config_manager.config["inverter"]["address"],
"max_grid_charge_rate": config_manager.config["inverter"][
"max_grid_charge_rate"
],
"max_pv_charge_rate": config_manager.config["inverter"]["max_pv_charge_rate"],
"user": config_manager.config["inverter"]["user"],
"password": config_manager.config["inverter"]["password"],
}
inverter_interface = FroniusWR(inverter_config)
elif inverter_type == "evcc":
logger.info(
"[Inverter] Inverter type %s - using the universal evcc external battery control.",
inverter_type,
)
else:
logger.info(
"[Inverter] Inverter type %s - no external connection."
+ " Changing to show only mode.",
config_manager.config["inverter"]["type"],
)
# callback function for evcc interface
def charging_state_callback(new_state):
"""
Callback function that gets triggered when the charging state changes.
"""
# update the base control with the new charging state
base_control.set_current_evcc_charging_state(evcc_interface.get_charging_state())
base_control.set_current_evcc_charging_mode(evcc_interface.get_charging_mode())
logger.info("[MAIN] EVCC Event - Charging state changed to: %s", new_state)
change_control_state()
# callback function for battery interface
def battery_state_callback():
"""
Callback function that gets triggered when the battery state changes.
"""
logger.debug(
"[MAIN] Battery Event - State of charge changed to: %s",
battery_interface.get_current_soc(),
)
# update the base control with the new battery state of charge
change_control_state()
# callback function for mqtt interface
def mqtt_control_callback(command):
"""
Handles MQTT control commands by parsing the command dictionary and updating the system's state.
Args:
command (dict): Contains "duration" (str, "HH:MM"), "mode" (str/int),
and "grid_charge_power" (str/int).
Side Effects:
- Updates base control mode override.
- Publishes updated control topics to MQTT.
- Logs the event and triggers a control state change.
"""
# Default to "02:00" if empty or None
duration_string = command.get("duration", "02:00") or "02:00"
duration_hh = duration_string.split(":")[0]
duration_mm = duration_string.split(":")[1]
duration = int(duration_hh) * 60 + int(duration_mm)
# Default to 0 if empty or None
charge_power = command.get("charge_power", 0) or 0
charge_power = int(charge_power) / 1000 # convert to kW
# update the base control with the new charging state
base_control.set_mode_override(int(command["mode"]), duration, charge_power)
mqtt_interface.update_publish_topics(
{
"control/override_charge_power": {"value": charge_power * 1000},
"control/override_active": {
"value": base_control.get_override_active_and_endtime()[0]
},
"control/override_end_time": {
"value": (
datetime.fromtimestamp(
base_control.get_override_active_and_endtime()[1], time_zone
)
).isoformat()
},
}
)
logger.info("[MAIN] MQTT Event - control command to: %s", command["mode"])
change_control_state()
mqtt_interface = MqttInterface(
config_mqtt=config_manager.config["mqtt"], on_mqtt_command=None
)
evcc_interface = EvccInterface(
url=config_manager.config.get("evcc", {}).get("url", ""),
ext_bat_mode=config_manager.config["inverter"]["type"] == "evcc",
update_interval=10,
on_charging_state_change=None,
)
# intialize the load interface
load_interface = LoadInterface(
config_manager.config.get("load", {}),
time_zone,
)
battery_interface = BatteryInterface(
config_manager.config["battery"],
on_bat_max_changed=None,
)
price_interface = PriceInterface(config_manager.config["price"], time_zone)
pv_interface = PvInterface(
config_manager.config["pv_forecast_source"],
config_manager.config["pv_forecast"],
config_manager.config.get("evcc", {}),
config_manager.config.get("time_zone", "UTC"),
)
# wait for the interfaces to initialize - depend on entries for pv_forecast
init_time = 3 + 1 * len(config_manager.config["pv_forecast"])
logger.info("[Main] Waiting %s seconds for interfaces to initialize", init_time)
time.sleep(init_time)
# pv_interface.test_output()
# sys.exit(0) # exit if the interfaces are not initialized correctly
# summarize all date
def create_optimize_request():
"""
Creates an optimization request payload for energy management systems.
Args:
api_version (str): The API version to use for the request. Defaults to "new".
Returns:
dict: A dictionary containing the payload for the optimization request.
"""
def get_dst_change_in_next_48(tz, start_dt=None):
"""
Returns:
0 if no DST change in next 48 hours,
+N if DST fallback (extra hour) at Nth hour from now,
-N if DST spring forward (missing hour) at Nth hour from now.
"""
if start_dt is None:
start_dt = datetime.now(tz)
if start_dt.tzinfo is None:
start_dt = tz.localize(start_dt)
prev_offset = start_dt.utcoffset()
for i in range(1, 49):
check_dt = tz.normalize(start_dt + timedelta(hours=i))
offset = check_dt.utcoffset()
if offset != prev_offset:
# DST change detected
if offset > prev_offset:
logger.debug("[DST] Spring forward detected at hour %s: -%s", i, i)
return -i # hour lost
logger.debug("[DST] Fall back detected at hour %s: +%s", i, i)
return i # hour gained
prev_offset = offset
logger.debug("[DST] No DST change detected in next 48 hours (0)")
return 0
# def adjust_forecast_array_for_dst(data_array, dst_change_detected):
# """
# Adjusts the forecast array for Daylight Saving Time (DST) changes.
# Args:
# data_array (list): The original forecast array.
# dst_change_detected (int): The DST change detected (positive for fall back,
# negative for spring forward).
# Returns:
# list: The adjusted forecast array.
# """
# arr = list(data_array) # Make a copy so the original is not modified
# if dst_change_detected != 0:
# hour_index = abs(dst_change_detected) - 1
# # Validate computed index to avoid IndexError
# if hour_index < 0 or hour_index >= len(arr):
# logger.warning(
# "[DST] Computed hour index %s out of range for array length %s"
# + " - skipping DST adjustment",
# hour_index,
# len(arr),
# )
# return arr
# if dst_change_detected > 0:
# # Fall back - repeat hour
# arr.insert(hour_index, arr[hour_index]) # duplicate hour
# logger.debug(
# "[DST] Adjusted forecast for fall back at hour %s",
# hour_index + 1,
# )
# else:
# # Spring forward - remove hour
# removed_value = arr.pop(hour_index)
# logger.debug(
# "[DST] Adjusted forecast for spring forward at hour %s (removed %s Wh)",
# hour_index + 1,
# removed_value,
# )
# return arr
def get_ems_data(dst_change_detected):
pv_prognose_wh = pv_interface.get_current_pv_forecast()
strompreis_euro_pro_wh = price_interface.get_current_prices()
einspeiseverguetung_euro_pro_wh = price_interface.get_current_feedin_prices()
gesamtlast = load_interface.get_load_profile(EOS_TGT_DURATION)
# if dst_change_detected != 0:
# pv_prognose_wh = adjust_forecast_array_for_dst(
# pv_prognose_wh, dst_change_detected
# )
# strompreis_euro_pro_wh = adjust_forecast_array_for_dst(
# strompreis_euro_pro_wh, dst_change_detected
# )
# einspeiseverguetung_euro_pro_wh = adjust_forecast_array_for_dst(
# einspeiseverguetung_euro_pro_wh, dst_change_detected
# )
# gesamtlast = adjust_forecast_array_for_dst(gesamtlast, dst_change_detected)
return {
"pv_prognose_wh": pv_prognose_wh,
"strompreis_euro_pro_wh": strompreis_euro_pro_wh,
"einspeiseverguetung_euro_pro_wh": einspeiseverguetung_euro_pro_wh,
"preis_euro_pro_wh_akku": config_manager.config["battery"][
"price_euro_per_wh_accu"
],
"gesamtlast": gesamtlast,
}
def get_pv_akku_data():
akku_object = {
"capacity_wh": config_manager.config["battery"]["capacity_wh"],
"charging_efficiency": config_manager.config["battery"][
"charge_efficiency"
],
"discharging_efficiency": config_manager.config["battery"][
"discharge_efficiency"
],
"max_charge_power_w": config_manager.config["battery"][
"max_charge_power_w"
],
"initial_soc_percentage": round(battery_interface.get_current_soc()),
"min_soc_percentage": config_manager.config["battery"][
"min_soc_percentage"
],
"max_soc_percentage": config_manager.config["battery"][
"max_soc_percentage"
],
}
if eos_interface.get_eos_version() == ">=2025-04-09":
akku_object = {"device_id": "battery1", **akku_object}
return akku_object
def get_wechselrichter_data():
wechselrichter_object = {
"max_power_wh": config_manager.config["inverter"]["max_pv_charge_rate"],
}
if eos_interface.get_eos_version() == ">=2025-04-09":
wechselrichter_object = {
"device_id": "inverter1",
**wechselrichter_object,
} # at top
wechselrichter_object["battery_id"] = "battery1" # at the bottom
return wechselrichter_object
def get_eauto_data():
eauto_object = {
"capacity_wh": 27000,
"charging_efficiency": 0.90,
"discharging_efficiency": 0.95,
"max_charge_power_w": 7360,
"initial_soc_percentage": 50,
"min_soc_percentage": 5,
"max_soc_percentage": 100,
}
if eos_interface.get_eos_version() == ">=2025-04-09":
eauto_object = {"device_id": "ev1", **eauto_object}
return eauto_object
def get_dishwasher_data():
consumption_wh = config_manager.config["load"].get(
"additional_load_1_consumption", 1
)
if not consumption_wh or consumption_wh == 0:
consumption_wh = 1
duration_h = config_manager.config["load"].get("additional_load_1_runtime", 1)
if not duration_h or duration_h == 0:
duration_h = 1
dishwasher_object = {
"consumption_wh": consumption_wh,
"duration_h": duration_h,
}
if eos_interface.get_eos_version() == ">=2025-04-09":
dishwasher_object = {"device_id": "additional_load_1", **dishwasher_object}
return dishwasher_object
dst_change_detected = get_dst_change_in_next_48(time_zone)
temperature_forecast = pv_interface.get_current_temp_forecast()
if dst_change_detected != 0:
logger.info(
"[Main] DST change detected: in %s hours there will be a shift with %s - please check"
+ " https://github.com/ohAnd/EOS_connect/issues/130#issuecomment-3444749335"
+ " for details.",
abs(dst_change_detected),
"1 hour plus" if dst_change_detected > 0 else "1 hour minus",
)
# temperature_forecast = adjust_forecast_array_for_dst(
# temperature_forecast, dst_change_detected
# )
payload = {
"ems": get_ems_data(dst_change_detected),
"pv_akku": get_pv_akku_data(),
"inverter": get_wechselrichter_data(),
"eauto": get_eauto_data(),
"dishwasher": get_dishwasher_data(),
"temperature_forecast": temperature_forecast,
"start_solution": eos_interface.get_last_start_solution(),
}
logger.debug(
"[Main] optimize request payload - startsolution: %s", payload["start_solution"]
)
return payload
last_control_data = {
"current_soc": None,
"ac_charge_demand": None,
"dc_charge_demand": None,
"discharge_allowed": None,
}
def setting_control_data(ac_charge_demand_rel, dc_charge_demand_rel, discharge_allowed):
"""
Process the optimized response from EOS and update the load interface.
Args:
ac_charge_demand_rel (float): The relative AC charge demand.
dc_charge_demand_rel (float): The relative DC charge demand.
discharge_allowed (bool): Whether discharge is allowed (True/False).
"""
# Safety check: Prevent AC charging if battery SoC exceeds maximum
current_soc = battery_interface.get_current_soc()
max_soc = config_manager.config["battery"]["max_soc_percentage"]
if (
last_control_data["current_soc"] is not None
and last_control_data["ac_charge_demand"] is not None
):
if (
current_soc >= max_soc
and ac_charge_demand_rel > 0
and last_control_data["current_soc"] != current_soc
and last_control_data["ac_charge_demand"] != ac_charge_demand_rel
):
logger.warning(
"[Main] EOS requested AC charging (%s) but battery SoC (%s%%)"
+ " at/above maximum (%s%%) - overriding to 0",
ac_charge_demand_rel,
current_soc,
max_soc,
)
ac_charge_demand_rel = 0 # Override EOS decision for safety
base_control.set_current_ac_charge_demand(ac_charge_demand_rel)
base_control.set_current_dc_charge_demand(dc_charge_demand_rel)
base_control.set_current_discharge_allowed(bool(discharge_allowed))
mqtt_interface.update_publish_topics(
{
"control/eos_ac_charge_demand": {
"value": base_control.get_current_ac_charge_demand()
},
"control/eos_dc_charge_demand": {
"value": base_control.get_current_dc_charge_demand()
},
"control/eos_discharge_allowed": {
"value": base_control.get_current_discharge_allowed()
},
}
)
# set the current battery state of charge
base_control.set_current_battery_soc(battery_interface.get_current_soc())
# getting the current charging state from evcc
base_control.set_current_evcc_charging_state(evcc_interface.get_charging_state())
base_control.set_current_evcc_charging_mode(evcc_interface.get_charging_mode())
last_control_data["current_soc"] = current_soc
last_control_data["ac_charge_demand"] = ac_charge_demand_rel
last_control_data["dc_charge_demand"] = dc_charge_demand_rel
last_control_data["discharge_allowed"] = discharge_allowed
class OptimizationScheduler:
"""
A scheduler class that manages the periodic execution of an optimization process
in a background thread. The class is responsible for starting, stopping, and
managing the lifecycle of the optimization service.
Attributes:
update_interval (int): The interval in seconds between optimization runs.
_update_thread_optimization_loop (threading.Thread): The background thread
running the optimization loop.
_stop_event (threading.Event): An event used to signal the thread to stop.
Methods:
__start_update_service_optimization_loop():
shutdown():
_update_state_loop():
__run_optimization_loop():
"""
def __init__(self, update_interval):
self.update_interval = update_interval
self.last_request_response = {
"request": json.dumps(
{
"status": "Awaiting first optimization run",
},
indent=4,
),
"response": json.dumps(
{
"status": "starting up",
"message": (
"The first request has been sent to EOS and is now waiting for "
"the completion of the first optimization run."
),
},
indent=4,
),
}
self.current_state = {
"request_state": None,
"last_request_timestamp": None,
# initialize with startup time stamp to avoid confusion in gui
"last_response_timestamp": datetime.now(time_zone).isoformat(),
"next_run": None,
}
self._update_thread_optimization_loop = None
self._stop_event = threading.Event()
self._last_avg_runtime = 120 # Initialize with a default value
self.__start_update_service_optimization_loop()
self._update_thread_control_loop = None
self._stop_event_control_loop = threading.Event()
self.__start_update_service_control_loop()
self._update_thread_data_loop = None
self._stop_event_data_loop = threading.Event()
self.__start_update_service_data_loop()
def get_last_request_response(self):
"""
Returns the last request response.
"""
return self.last_request_response
def get_current_state(self):
"""
Returns the current state of the optimization scheduler.
"""
return self.current_state
def __set_state_request(self):
"""
Sets the current state of the optimization scheduler.
"""
self.current_state["request_state"] = "request send"
self.current_state["last_request_timestamp"] = datetime.now(
time_zone
).isoformat()
def __set_state_response(self):
"""
Sets the current state of the optimization scheduler.
"""
self.current_state["request_state"] = "response received"
self.current_state["last_response_timestamp"] = datetime.now(
time_zone
).isoformat()
def __set_state_next_run(self, next_run_time):
"""
Sets the current state of the optimization scheduler.
"""
self.current_state["next_run"] = next_run_time
def __start_update_service_optimization_loop(self):
"""
Starts the background thread to periodically update the state.
"""
if (
self._update_thread_optimization_loop is None
or not self._update_thread_optimization_loop.is_alive()
):
self._stop_event.clear()
self._update_thread_optimization_loop = threading.Thread(
target=self.__update_state_optimization_loop, daemon=True
)
self._update_thread_optimization_loop.start()
logger.info("[OPTIMIZATION] Update service Optimization Run started.")
def __update_state_optimization_loop(self):
"""
The loop that runs in the background thread to update the state.
"""
while not self._stop_event.is_set():
try:
self.__run_optimization_loop()
# Calculate actual sleep time based on smart scheduling
loop_now = datetime.now(time_zone)
next_eval = eos_interface.calculate_next_run_time(
loop_now,
getattr(self, "_last_avg_runtime", 120), # Use last known runtime
self.update_interval,
)
actual_sleep_interval = max(10, (next_eval - loop_now).total_seconds())
self.__set_state_next_run(next_eval.astimezone(time_zone).isoformat())
mqtt_interface.update_publish_topics(
{
"optimization/last_run": {
"value": self.get_current_state()["last_response_timestamp"]
},
"optimization/next_run": {
"value": self.get_current_state()["next_run"]
},
}
)
minutes, seconds = divmod(actual_sleep_interval, 60)
logger.info(
"[Main] Next optimization at %s (based on average runtime of %.0f seconds)."
+ " Sleeping for %d min %.0f seconds\n",
next_eval.strftime("%H:%M:%S"),
getattr(self, "_last_avg_runtime", 120),
minutes,
seconds,
)
except (requests.exceptions.RequestException, ValueError, KeyError) as e:
logger.error("[OPTIMIZATION] Error while updating state: %s", e)
actual_sleep_interval = self.update_interval # Fallback on error
# Use the calculated sleep interval instead of fixed interval
while actual_sleep_interval > 0:
if self._stop_event.is_set():
return # Exit immediately if stop event is set
time.sleep(min(1, actual_sleep_interval)) # Sleep in 1-second chunks
actual_sleep_interval -= 1
# self.__start_update_service_optimization_loop()
def __run_optimization_loop(self):
"""
Executes the optimization process by creating an optimization request,
sending it to the EOS interface, processing the response, and scheduling
the next optimization run.
The method performs the following steps:
1. Logs the start of a new optimization run.
2. Creates an optimization request in JSON format and saves it to a file.
3. Sends the optimization request to the EOS interface and retrieves the response.
4. Adds a timestamp to the response and saves it to a file.
5. Extracts control data from the response and, if no error is detected,
applies the control settings and updates the control state.
6. Calculates the time for the next optimization run and logs the sleep duration.
Raises:
Any exceptions raised during file operations, JSON serialization,
or EOS interface communication will propagate to the caller.
Notes:
- The method assumes the presence of global variables or objects such as
`logger`, `base_path`, `eos_interface`, `config_manager`, and `time_zone`.
- The `config_manager.config` dictionary is expected to contain the
necessary configuration values for "eos.timeout" and "refresh_time".
"""
logger.info("[Main] start new run")
# update prices
# price_interface.update_prices(
# EOS_TGT_DURATION,
# datetime.now(time_zone).replace(hour=0, minute=0, second=0, microsecond=0),
# )
# create optimize request
json_optimize_input = create_optimize_request()
self.__set_state_request()
with open(
base_path + "/json/optimize_request.json", "w", encoding="utf-8"
) as file:
json.dump(json_optimize_input, file, indent=4)
mqtt_interface.update_publish_topics(
{"optimization/state": {"value": self.get_current_state()["request_state"]}}
)
optimized_response, avg_runtime = eos_interface.optimize(
json_optimize_input, config_manager.config["eos"]["timeout"]
)
# Store the runtime for use in sleep calculation
self._last_avg_runtime = avg_runtime
json_optimize_input["timestamp"] = datetime.now(time_zone).isoformat()
self.last_request_response["request"] = json.dumps(
json_optimize_input, indent=4
)
optimized_response["timestamp"] = datetime.now(time_zone).isoformat()
self.last_request_response["response"] = json.dumps(
optimized_response, indent=4
)
self.__set_state_response()
with open(
base_path + "/json/optimize_response.json", "w", encoding="utf-8"
) as file:
json.dump(optimized_response, file, indent=4)
# +++++++++
ac_charge_demand, dc_charge_demand, discharge_allowed, error = (
eos_interface.examine_response_to_control_data(optimized_response)
)
if error is not True:
setting_control_data(ac_charge_demand, dc_charge_demand, discharge_allowed)
# get recent evcc states
base_control.set_current_evcc_charging_state(
evcc_interface.get_charging_state()
)
base_control.set_current_evcc_charging_mode(
evcc_interface.get_charging_mode()
)
# change_control_state() # -> moved to __run_control_loop
def __start_update_service_control_loop(self):
"""
Starts the background thread to periodically update the state.
"""
if (
self._update_thread_control_loop is None
or not self._update_thread_control_loop.is_alive()
):
self._stop_event_control_loop.clear()
self._update_thread_control_loop = threading.Thread(
target=self.__update_state_loop_control_loop, daemon=True
)
self._update_thread_control_loop.start()
logger.info("[OPTIMIZATION] Update service Control started.")
def __update_state_loop_control_loop(self):
"""
The loop that runs in the background thread to update the state.
"""
while not self._stop_event_control_loop.is_set():
try:
self.__run_control_loop()
except (requests.exceptions.RequestException, ValueError, KeyError) as e:
logger.error("[OPTIMIZATION] Error while running control loop: %s", e)
# Break the sleep interval into smaller chunks to allow immediate shutdown
sleep_interval = 1
while sleep_interval > 0:
if self._stop_event_control_loop.is_set():
return # Exit immediately if stop event is set
time.sleep(min(1, sleep_interval)) # Sleep in 1-second chunks
sleep_interval -= 1
self.__start_update_service_control_loop()
def __run_control_loop(self):
current_hour = datetime.now(time_zone).hour
last_control_selected_entry = 0
if current_hour == eos_interface.get_last_control_data()[1]["hour"]:
last_control_selected_entry = 1
elif -1 == eos_interface.get_last_control_data()[0]["hour"]:
# logger.debug("[Main] check current tgt ctrl - still in startup - skip")
return
elif (
current_hour != eos_interface.get_last_control_data()[0]["hour"]
and current_hour != eos_interface.get_last_control_data()[1]["hour"]
):
logger.warning(
"[Main] check current tgt ctrl - wrong hour data for fast control - skip"
)
return
ac_charge_demand = eos_interface.get_last_control_data()[
last_control_selected_entry
]["ac_charge_demand"]
dc_charge_demand = eos_interface.get_last_control_data()[
last_control_selected_entry
]["dc_charge_demand"]
discharge_allowed = eos_interface.get_last_control_data()[
last_control_selected_entry
]["discharge_allowed"]
error = eos_interface.get_last_control_data()[last_control_selected_entry][
"error"
]
if (
ac_charge_demand is None
or dc_charge_demand is None
or discharge_allowed is None
):
logger.warning(
"[Main] check current tgt ctrl - missing data for fast control - skip"
)
return
if ac_charge_demand < 0 or dc_charge_demand < 0:
logger.warning(
"[Main] check current tgt ctrl - invalid data for fast control - skip"
)
return
if error is not True:
# logger.debug(
# "[Main] Optimization fast control loop - current state: %s (Num: %s) "+
# "-> ac_charge_demand: %s, dc_charge_demand: %s, discharge_allowed: %s",
# base_control.get_current_overall_state(),
# base_control.get_current_overall_state_number(),
# ac_charge_demand,
# dc_charge_demand,
# discharge_allowed,
# )
setting_control_data(ac_charge_demand, dc_charge_demand, discharge_allowed)
# get recent evcc states
base_control.set_current_evcc_charging_state(
evcc_interface.get_charging_state()
)
base_control.set_current_evcc_charging_mode(
evcc_interface.get_charging_mode()
)
change_control_state()
# logger.debug(
# "[Main] Optimization control loop - secondly check - current state: %s (Num: %s)",
# base_control.get_current_overall_state(),
# base_control.get_current_overall_state_number(),
# )
def __start_update_service_data_loop(self):
"""
Starts the background thread to periodically update the state.
"""
if (
self._update_thread_data_loop is None
or not self._update_thread_data_loop.is_alive()
):
self._stop_event_data_loop.clear()
self._update_thread_data_loop = threading.Thread(
target=self.__update_state_loop_data_loop, daemon=True
)
self._update_thread_data_loop.start()
logger.info("[OPTIMIZATION] Update service Data started.")
def __update_state_loop_data_loop(self):
"""
The loop that runs in the background thread to update the state.
"""
while not self._stop_event_data_loop.is_set():
try:
self.__run_data_loop()
except (requests.exceptions.RequestException, ValueError, KeyError) as e:
logger.error(
"[OPTIMIZATION] Error while running data control loop: %s", e
)
# Break the sleep interval into smaller chunks to allow immediate shutdown
sleep_interval = 15
while sleep_interval > 0:
if self._stop_event_data_loop.is_set():
return # Exit immediately if stop event is set
time.sleep(min(1, sleep_interval)) # Sleep in 1-second chunks
sleep_interval -= 1
self.__start_update_service_data_loop()
def __run_data_loop(self):
if inverter_type in ["fronius_gen24", "fronius_gen24_legacy"]:
inverter_interface.fetch_inverter_data()
mqtt_interface.update_publish_topics(
{
"inverter/special/temperature_inverter": {
"value": inverter_interface.get_inverter_current_data()[
"DEVICE_TEMPERATURE_AMBIENTEMEAN_F32"
]
},
"inverter/special/temperature_ac_module": {
"value": inverter_interface.get_inverter_current_data()[
"MODULE_TEMPERATURE_MEAN_01_F32"
]
},
"inverter/special/temperature_dc_module": {
"value": inverter_interface.get_inverter_current_data()[
"MODULE_TEMPERATURE_MEAN_03_F32"
]
},
"inverter/special/temperature_battery_module": {
"value": inverter_interface.get_inverter_current_data()[
"MODULE_TEMPERATURE_MEAN_04_F32"
]
},
"inverter/special/fan_control_01": {
"value": inverter_interface.get_inverter_current_data()[
"FANCONTROL_PERCENT_01_F32"
]
},
"inverter/special/fan_control_02": {
"value": inverter_interface.get_inverter_current_data()[
"FANCONTROL_PERCENT_02_F32"
]
},
}
)
# logger.debug(
# "[Main] Inverter data fetched - %s",
# inverter_interface.get_inverter_current_data(),
# )
def shutdown(self):
"""
Stops the background thread and shuts down the update service.
"""
if (
self._update_thread_optimization_loop
and self._update_thread_optimization_loop.is_alive()
):
self._stop_event.set()
self._update_thread_optimization_loop.join()
logger.info("[OPTIMIZATION] Update service Optimization Loop stopped.")
if (
self._update_thread_control_loop
and self._update_thread_control_loop.is_alive()
):
self._stop_event_control_loop.set()
self._update_thread_control_loop.join()
logger.info("[OPTIMIZATION] Update service Control Loop stopped.")
if self._update_thread_data_loop and self._update_thread_data_loop.is_alive():
self._stop_event_data_loop.set()
self._update_thread_data_loop.join()
logger.info("[OPTIMIZATION] Update service Data Loop stopped.")
optimization_scheduler = OptimizationScheduler(
config_manager.config["refresh_time"] * 60 # convert to seconds
)
def change_control_state():
"""
Adjusts the control state of the inverter based on the current overall state.
This function checks the current overall state of the inverter and performs
the corresponding action. The possible states and their actions are: