-
Notifications
You must be signed in to change notification settings - Fork 322
Expand file tree
/
Copy pathmain.py
More file actions
262 lines (225 loc) · 9.45 KB
/
main.py
File metadata and controls
262 lines (225 loc) · 9.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
SimpleMem - Efficient Lifelong Memory for LLM Agents
Main system class integrating all components
"""
from typing import List, Optional
from models.memory_entry import Dialogue, MemoryEntry
from utils.llm_client import LLMClient
from utils.embedding import EmbeddingModel
from database.vector_store import VectorStore
from core.memory_builder import MemoryBuilder
from core.hybrid_retriever import HybridRetriever
from core.answer_generator import AnswerGenerator
import config
class SimpleMemSystem:
"""
SimpleMem Main System
Three-stage pipeline:
1. Semantic Structured Compression (Section 3.1): add_dialogue() -> MemoryBuilder -> VectorStore
2. Online Semantic Synthesis (Section 3.2): Intra-session consolidation during write
3. Intent-Aware Retrieval Planning (Section 3.3): ask() -> HybridRetriever -> AnswerGenerator
"""
def __init__(
self,
api_key: Optional[str] = None,
model: Optional[str] = None,
base_url: Optional[str] = None,
db_path: Optional[str] = None,
table_name: Optional[str] = None,
clear_db: bool = False,
enable_thinking: Optional[bool] = None,
use_streaming: Optional[bool] = None,
enable_planning: Optional[bool] = None,
enable_reflection: Optional[bool] = None,
max_reflection_rounds: Optional[int] = None,
enable_parallel_processing: Optional[bool] = None,
max_parallel_workers: Optional[int] = None,
enable_parallel_retrieval: Optional[bool] = None,
max_retrieval_workers: Optional[int] = None
):
"""
Initialize system
Args:
- api_key: OpenAI API key
- model: LLM model name
- base_url: Custom OpenAI base URL (for compatible APIs)
- db_path: Database path
- table_name: Memory table name (for parallel processing)
- clear_db: Whether to clear existing database
- enable_thinking: Enable deep thinking mode (for Qwen and compatible models)
- use_streaming: Enable streaming responses
- enable_planning: Enable multi-query planning for retrieval (None=use config default)
- enable_reflection: Enable reflection-based additional retrieval (None=use config default)
- max_reflection_rounds: Maximum number of reflection rounds (None=use config default)
- enable_parallel_processing: Enable parallel processing for memory building (None=use config default)
- max_parallel_workers: Maximum number of parallel workers for memory building (None=use config default)
- enable_parallel_retrieval: Enable parallel processing for retrieval queries (None=use config default)
- max_retrieval_workers: Maximum number of parallel workers for retrieval (None=use config default)
"""
print("=" * 60)
print("Initializing SimpleMem System")
print("=" * 60)
# Initialize core components
self.llm_client = LLMClient(
api_key=api_key,
model=model,
base_url=base_url,
enable_thinking=enable_thinking,
use_streaming=use_streaming
)
self.embedding_model = EmbeddingModel()
self.vector_store = VectorStore(
db_path=db_path,
embedding_model=self.embedding_model,
table_name=table_name
)
if clear_db:
print("\nClearing existing database...")
self.vector_store.clear()
# Initialize three major modules
self.memory_builder = MemoryBuilder(
llm_client=self.llm_client,
vector_store=self.vector_store,
enable_parallel_processing=enable_parallel_processing,
max_parallel_workers=max_parallel_workers
)
self.hybrid_retriever = HybridRetriever(
llm_client=self.llm_client,
vector_store=self.vector_store,
enable_planning=enable_planning,
enable_reflection=enable_reflection,
max_reflection_rounds=max_reflection_rounds,
enable_parallel_retrieval=enable_parallel_retrieval,
max_retrieval_workers=max_retrieval_workers
)
self.answer_generator = AnswerGenerator(
llm_client=self.llm_client
)
print("\nSystem initialization complete!")
print("=" * 60)
def add_dialogue(self, speaker: str, content: str, timestamp: Optional[str] = None):
"""
Add a single dialogue
Args:
- speaker: Speaker name
- content: Dialogue content
- timestamp: Timestamp (ISO 8601 format)
"""
dialogue_id = self.memory_builder.processed_count + len(self.memory_builder.dialogue_buffer) + 1
dialogue = Dialogue(
dialogue_id=dialogue_id,
speaker=speaker,
content=content,
timestamp=timestamp
)
self.memory_builder.add_dialogue(dialogue)
def add_dialogues(self, dialogues: List[Dialogue]):
"""
Batch add dialogues
Args:
- dialogues: List of dialogues
"""
self.memory_builder.add_dialogues(dialogues)
def finalize(self):
"""
Finalize dialogue input, process any remaining buffer (safety check)
Note: In parallel mode, remaining dialogues are already processed
"""
self.memory_builder.process_remaining()
def ask(self, question: str) -> str:
"""
Ask question - Core Q&A interface
Args:
- question: User question
Returns:
- Answer
"""
print("\n" + "=" * 60)
print(f"Question: {question}")
print("=" * 60)
# Stage 3: Intent-Aware Retrieval Planning
contexts = self.hybrid_retriever.retrieve(question)
# Generate answer from retrieved context C_q
answer = self.answer_generator.generate_answer(question, contexts)
print("\nAnswer:")
print(answer)
print("=" * 60 + "\n")
return answer
def get_all_memories(self) -> List[MemoryEntry]:
"""
Get all memory entries (for debugging)
"""
return self.vector_store.get_all_entries()
def print_memories(self):
"""
Print all memory entries (for debugging)
"""
memories = self.get_all_memories()
print("\n" + "=" * 60)
print(f"All Memory Entries ({len(memories)} total)")
print("=" * 60)
for i, memory in enumerate(memories, 1):
print(f"\n[Entry {i}]")
print(f"ID: {memory.entry_id}")
print(f"Restatement: {memory.lossless_restatement}")
if memory.timestamp:
print(f"Time: {memory.timestamp}")
if memory.location:
print(f"Location: {memory.location}")
if memory.persons:
print(f"Persons: {', '.join(memory.persons)}")
if memory.entities:
print(f"Entities: {', '.join(memory.entities)}")
if memory.topic:
print(f"Topic: {memory.topic}")
print(f"Keywords: {', '.join(memory.keywords)}")
print("\n" + "=" * 60)
# Convenience function
def create_system(
clear_db: bool = False,
enable_planning: Optional[bool] = None,
enable_reflection: Optional[bool] = None,
max_reflection_rounds: Optional[int] = None,
enable_parallel_processing: Optional[bool] = None,
max_parallel_workers: Optional[int] = None,
enable_parallel_retrieval: Optional[bool] = None,
max_retrieval_workers: Optional[int] = None
) -> SimpleMemSystem:
"""
Create SimpleMem system instance (uses config.py defaults when None)
"""
return SimpleMemSystem(
clear_db=clear_db,
enable_planning=enable_planning,
enable_reflection=enable_reflection,
max_reflection_rounds=max_reflection_rounds,
enable_parallel_processing=enable_parallel_processing,
max_parallel_workers=max_parallel_workers,
enable_parallel_retrieval=enable_parallel_retrieval,
max_retrieval_workers=max_retrieval_workers
)
if __name__ == "__main__":
# Quick test with Qwen3 integration
print("🚀 Running SimpleMem Quick Test with Qwen3...")
system = create_system(clear_db=True)
print(f"📌 Using embedding model: {system.memory_builder.vector_store.embedding_model.model_name}")
print(f"📌 Model type: {system.memory_builder.vector_store.embedding_model.model_type}")
# Add some test dialogues
system.add_dialogue("Alice", "Bob, let's meet at Starbucks tomorrow at 2pm to discuss the new product", "2025-11-15T14:30:00")
system.add_dialogue("Bob", "Okay, I'll prepare the materials", "2025-11-15T14:31:00")
system.add_dialogue("Alice", "Remember to bring the market research report from last time", "2025-11-15T14:32:00")
# Finalize input
system.finalize()
# View memories
system.print_memories()
# Ask questions (with new features)
print("\n🔍 Testing retrieval with planning and reflection...")
system.ask("When will Alice and Bob meet?")
print("\n🔍 Testing adversarial question (reflection disabled)...")
question = "What is Alice's favorite food?"
contexts = system.hybrid_retriever.retrieve(question, enable_reflection=False)
answer = system.answer_generator.generate_answer(question, contexts)
print(f"\nQuestion: {question}")
print(f"Answer: {answer}")
print("\n✅ Quick test completed!")
print("\n💡 To run comprehensive tests: python test_qwen3_integration.py")