-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
123 lines (102 loc) · 4.61 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import websockets
import json
import pickle
from StatusCodes import StatusCode
from Communicator import Communicator
from async_database import AsyncDatabase
from response_parser import ResponseParser
import base64
class StatusCodesMapper:
@staticmethod
async def submit_user_response(data, db, websocket):
response = {
**data,
**{
"statusCode": StatusCode.REQUEST_SENT_TO_API,
}
}
await websocket.send(json.dumps(response))
await db.add_row(response["userInput"], StatusCode.REQUEST_SENT_TO_API, new_uuid=response["UUID"], date=response["Date"])
asyncio.create_task(process_user_input(response, websocket, db))
@staticmethod
async def send_for_execution(data, db, websocket):
row = await db.get_row_by_uuid(data["UUID"])
llm_message = pickle.loads(row["LLM Response"])
statusCode = StatusCode.SENT_FOR_EXECUTION
response = {
**data,
"statusCode": statusCode,
}
print(llm_message)
await websocket.send(json.dumps(response))
asyncio.create_task(process_user_input(data, websocket, db, llm_message, statusCode))
@staticmethod
async def saved_to_bookmarks(data, db, websocket):
await db.update_is_saved(data["UUID"], is_saved=True)
@staticmethod
async def remove_from_bookmarks(data, db, websocket):
await db.update_is_saved(data["UUID"], is_saved=False)
@staticmethod
async def delete_user_message(data, db, websocket):
await db.remove_row_by_uuid(data["UUID"])
@staticmethod
async def asked_all_saved(data, db, websocket):
await db.delete_unsaved_rows()
rows = await db.get_saved_rows()
new_rows = []
for row in rows:
# llm_response = base64.b64encode(row["LLM Response"]).decode('utf-8')
new_row = {
**data,
"UUID": str(row["uuid"]),
"statusCode": row["statuscode"],
"userInput": row["User Input"],
"StdErr": row["stderr"] if row["stderr"] else "",
"StdOut": row["stdout"] if row["stdout"] else "",
# "llmResponse": llm_response
"llmResponse": ResponseParser.get_func_class(ResponseParser.parse_response_object(pickle.loads(row["LLM Response"]))[0].func_name).get_exec_description() if not isinstance(pickle.loads(row["LLM Response"]), str) else pickle.loads(row["LLM Response"]),
"Date": row["date"].strftime("%Y-%m-%d %H:%M:%S")
}
new_rows.append(new_row)
print(new_rows)
await websocket.send(json.dumps(new_rows))
async def echo(websocket, path):
db = AsyncDatabase("postgres", "postgres", "postgres", port=5432)
await db.connect()
await db.create_table()
func_mapping = {
StatusCode.SUBMIT_USER_RESPONSE: StatusCodesMapper.submit_user_response,
StatusCode.EXECUTION_CONFIRMED: StatusCodesMapper.send_for_execution,
StatusCode.ASK_RERUN: StatusCodesMapper.send_for_execution,
StatusCode.SAVE_TO_BOOKMARKS: StatusCodesMapper.saved_to_bookmarks,
StatusCode.REMOVE_FROM_BOOKMARKS: StatusCodesMapper.remove_from_bookmarks,
StatusCode.ASK_ALL_SAVED: StatusCodesMapper.asked_all_saved,
StatusCode.DELETE_USER_MESSAGE: StatusCodesMapper.delete_user_message,
}
async for message in websocket:
data = json.loads(message)
print("Received: ", data)
statusCode = int(data["statusCode"])
assert statusCode in func_mapping.keys(), "Wrong StatusCode... :/"
submit_func = func_mapping[statusCode]
await submit_func(data, db, websocket)
async def process_user_input(data, websocket, db, llm_message=None, specified_status=None):
# Call the communicator's method to send input to LLM and get a response
response_uuid, llm_response, llm_description, response_status_code = await Communicator.async_swift_input(
data, websocket, db, llm_message, specified_status)
# Send the LLM response back to the client asynchronously
response = {**data,
**{
"statusCode": response_status_code,
"llmResponse": llm_description,
}
}
print("Sending: ", response)
await websocket.send(json.dumps(response))
await db.update_llm_response_and_status_code(response_uuid, llm_response, response_status_code)
async def main():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future() # Run forever
if __name__ == "__main__":
asyncio.run(main())