-
Notifications
You must be signed in to change notification settings - Fork 5k
/
Copy pathai.py
193 lines (159 loc) · 7.42 KB
/
ai.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from concurrent.futures import ThreadPoolExecutor
import tiktoken
def split_into_chunks(text, tokens, llm, overlap):
try:
encoding = tiktoken.encoding_for_model(llm.model)
tokenized_text = encoding.encode(text)
chunks = []
for i in range(0, len(tokenized_text), tokens - overlap):
chunk = encoding.decode(tokenized_text[i : i + tokens])
chunks.append(chunk)
except Exception:
chunks = []
for i in range(0, len(text), tokens * 4 - overlap):
chunk = text[i : i + tokens * 4]
chunks.append(chunk)
return chunks
def chunk_responses(responses, tokens, llm):
try:
encoding = tiktoken.encoding_for_model(llm.model)
chunked_responses = []
current_chunk = ""
current_tokens = 0
for response in responses:
tokenized_response = encoding.encode(response)
new_tokens = current_tokens + len(tokenized_response)
# If the new token count exceeds the limit, handle the current chunk
if new_tokens > tokens:
# If current chunk is empty or response alone exceeds limit, add response as standalone
if current_tokens == 0 or len(tokenized_response) > tokens:
chunked_responses.append(response)
else:
chunked_responses.append(current_chunk)
current_chunk = response
current_tokens = len(tokenized_response)
continue
# Add response to the current chunk
current_chunk += "\n\n" + response if current_chunk else response
current_tokens = new_tokens
# Add remaining chunk if not empty
if current_chunk:
chunked_responses.append(current_chunk)
except Exception:
chunked_responses = []
current_chunk = ""
current_chars = 0
for response in responses:
new_chars = current_chars + len(response)
# If the new char count exceeds the limit, handle the current chunk
if new_chars > tokens * 4:
# If current chunk is empty or response alone exceeds limit, add response as standalone
if current_chars == 0 or len(response) > tokens * 4:
chunked_responses.append(response)
else:
chunked_responses.append(current_chunk)
current_chunk = response
current_chars = len(response)
continue
# Add response to the current chunk
current_chunk += "\n\n" + response if current_chunk else response
current_chars = new_chars
# Add remaining chunk if not empty
if current_chunk:
chunked_responses.append(current_chunk)
return chunked_responses
def fast_llm(llm, system_message, user_message):
old_messages = llm.interpreter.messages
old_system_message = llm.interpreter.system_message
try:
llm.interpreter.system_message = system_message
llm.interpreter.messages = []
response = llm.interpreter.chat(user_message)
finally:
llm.interpreter.messages = old_messages
llm.interpreter.system_message = old_system_message
return response[-1].get("content")
def query_map_chunks(chunks, llm, query):
"""Query the chunks of text using query_chunk_map."""
with ThreadPoolExecutor() as executor:
responses = list(
executor.map(lambda chunk: fast_llm(llm, query, chunk), chunks)
)
return responses
def query_reduce_chunks(responses, llm, chunk_size, query):
"""Reduce query responses in a while loop."""
while len(responses) > 1:
chunks = chunk_responses(responses, chunk_size, llm)
# Use multithreading to summarize each chunk simultaneously
with ThreadPoolExecutor() as executor:
summaries = list(
executor.map(lambda chunk: fast_llm(llm, query, chunk), chunks)
)
responses = summaries
return responses[0]
class Ai:
def __init__(self, computer):
self.computer = computer
def chat(self, text, base64=None):
messages = [
{
"role": "system",
"type": "message",
"content": "You are a helpful AI assistant.",
},
{"role": "user", "type": "message", "content": text},
]
if base64:
messages.append(
{"role": "user", "type": "image", "format": "base64", "content": base64}
)
response = ""
for chunk in self.computer.interpreter.llm.run(messages):
if "content" in chunk:
response += chunk.get("content")
return response
# Old way
old_messages = self.computer.interpreter.llm.interpreter.messages
old_system_message = self.computer.interpreter.llm.interpreter.system_message
old_import_computer_api = self.computer.import_computer_api
old_execution_instructions = (
self.computer.interpreter.llm.execution_instructions
)
try:
self.computer.interpreter.llm.interpreter.system_message = (
"You are an AI assistant."
)
self.computer.interpreter.llm.interpreter.messages = []
self.computer.import_computer_api = False
self.computer.interpreter.llm.execution_instructions = ""
response = self.computer.interpreter.llm.interpreter.chat(text)
finally:
self.computer.interpreter.llm.interpreter.messages = old_messages
self.computer.interpreter.llm.interpreter.system_message = (
old_system_message
)
self.computer.import_computer_api = old_import_computer_api
self.computer.interpreter.llm.execution_instructions = (
old_execution_instructions
)
return response[-1].get("content")
def query(self, text, query, custom_reduce_query=None):
if custom_reduce_query == None:
custom_reduce_query = query
chunk_size = 2000
overlap = 50
# Split the text into chunks
chunks = split_into_chunks(
text, chunk_size, self.computer.interpreter.llm, overlap
)
# (Map) Query each chunk
responses = query_map_chunks(chunks, self.computer.interpreter.llm, query)
# (Reduce) Compress the responses
response = query_reduce_chunks(
responses, self.computer.interpreter.llm, chunk_size, custom_reduce_query
)
return response
def summarize(self, text):
query = "You are a highly skilled AI trained in language comprehension and summarization. I would like you to read the following text and summarize it into a concise abstract paragraph. Aim to retain the most important points, providing a coherent and readable summary that could help a person understand the main points of the discussion without needing to read the entire text. Please avoid unnecessary details or tangential points."
custom_reduce_query = "You are tasked with taking multiple summarized texts and merging them into one unified and concise summary. Maintain the core essence of the content and provide a clear and comprehensive summary that encapsulates all the main points from the individual summaries."
return self.query(text, query, custom_reduce_query)