-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
91 lines (68 loc) · 2.78 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import psycopg2
import uuid
class Database:
def __init__(self, dbname, user, password, host='localhost', port=5433):
self.conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
self.cur = self.conn.cursor()
self.cur.execute('''
CREATE TABLE IF NOT EXISTS data (
UUID UUID PRIMARY KEY,
"User Input" TEXT,
StdOut TEXT,
StdErr TEXT,
StatusCode INTEGER,
is_saved BOOLEAN,
"LLM Response" TEXT
)
''')
self.conn.commit()
def get_rows(self):
self.cur.execute('SELECT * FROM data')
return self.cur.fetchall()
def get_saved_rows(self):
self.cur.execute('SELECT * FROM data WHERE is_saved = TRUE')
return self.cur.fetchall()
def get_row_by_uuid(self, uuid):
self.cur.execute('SELECT * FROM data WHERE UUID = %s', (uuid,))
return self.cur.fetchone()
def update_status_code(self, uuid, status_code):
self.cur.execute('UPDATE data SET StatusCode = %s WHERE UUID = %s', (status_code, uuid))
self.conn.commit()
def update_stdout_stderr(self, uuid, stdout, stderr):
self.cur.execute('UPDATE data SET StdOut = %s, StdErr = %s WHERE UUID = %s', (stdout, stderr, uuid))
self.conn.commit()
def add_row(self, user_input, status_code, is_saved=False, llm_response=None):
new_uuid = uuid.uuid4()
self.cur.execute('''
INSERT INTO data (UUID, "User Input", StatusCode, is_saved, "LLM Response")
VALUES (%s, %s, %s, %s, %s)
''', (str(new_uuid), user_input, status_code, is_saved, llm_response))
self.conn.commit()
def update_is_saved(self, uuid, is_saved):
self.cur.execute('UPDATE data SET is_saved = %s WHERE UUID = %s', (is_saved, uuid))
self.conn.commit()
def delete_unsaved_rows(self):
self.cur.execute('DELETE FROM data WHERE is_saved = FALSE')
self.conn.commit()
def delete_all_rows(self):
self.cur.execute('DELETE FROM data')
self.conn.commit()
db = Database(dbname='postgres', user='postgres', password='postgre_pass')
db.add_row("Example input", 200, is_saved=True)
db.add_row("Another input", 404, is_saved=True)
# print(db.get_saved_rows())
# row_uuid = db.get_saved_rows()[0][0]
# print(db.get_row_by_uuid(row_uuid))
# db.update_status_code(row_uuid, 500)
# print(db.get_saved_rows())
# db.update_stdout_stderr(row_uuid, "New stdout", "New stderr")
# db.update_is_saved(row_uuid, False)
# print(db.get_saved_rows())
rows = db.get_rows()
for row in rows:
print(row)
# db.delete_unsaved_rows()
# db.delete_all_rows()
# rows = db.get_rows()
# for row in rows:
# print(row)