-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcollector.py
executable file
·65 lines (48 loc) · 1.58 KB
/
collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python3
import requests, json, logging, time, psycopg2, signal, sys
from psycopg2.extras import execute_values
CHUNK_SIZE = 96
INTERVAL = 10 # fetch interval in seconds
LOG_LEVEL = logging.ERROR
URL = "http://inverter/solar_api/v1/GetPowerFlowRealtimeData.fcgi"
logging.basicConfig(level=LOG_LEVEL)
db_conn = psycopg2.connect("dbname='home' user='postgres'")
db_cursor = db_conn.cursor()
chunk = []
def fetch():
resp = requests.get(url=URL).json()
sample = {
"time": int(time.time()),
"prod": resp["Body"]["Data"]["Site"]["P_PV"] or 0,
"load": round(resp["Body"]["Data"]["Site"]["P_Load"]) * -1,
# "grid": round(resp['Body']['Data']['Site']['P_Grid'])
}
logging.debug("sample: %s", sample)
chunk.append(sample)
def insert():
columns = chunk[0].keys()
values_list = [tuple([sample[c] for c in columns]) for sample in chunk]
execute_values(
db_cursor,
f'INSERT INTO electricity ({",".join(columns)}) VALUES %s',
values_list,
)
db_conn.commit()
logging.info("inserted %d samples", len(chunk))
def exit_handler(signal_received, frame):
logging.warning("Exiting on signal")
if len(chunk) > 0:
insert()
db_conn.close()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, exit_handler)
while True:
try:
fetch()
except Exception as e:
logging.error(f"{__file__}: Error fetching data\n{e}")
if len(chunk) == CHUNK_SIZE:
insert()
chunk = []
time.sleep(INTERVAL)