Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
371 changes: 205 additions & 166 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "2.5.0"
version = "2.5.1"
edition = "2024"
rust-version = "1.86"
authors = ["Sopaco"]
Expand Down
37 changes: 29 additions & 8 deletions cortex-mem-core/src/automation/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Semaphore};
use tracing::{info, warn};

/// 自动化配置
Expand All @@ -26,18 +26,21 @@ pub struct AutomationConfig {
pub auto_generate_layers_on_startup: bool,
/// 每N条消息触发一次L0/L1生成(0表示禁用)
pub generate_layers_every_n_messages: usize,
/// 最大并发 LLM 任务数(防止压垮 LLM API)
pub max_concurrent_llm_tasks: usize,
}

impl Default for AutomationConfig {
fn default() -> Self {
Self {
auto_index: true,
auto_extract: true,
index_on_message: false, // 默认不实时索引(性能考虑)
index_on_close: true, // 默认会话关闭时索引
index_on_message: false, // 默认不实时索引(性能考虑)
index_on_close: true, // 默认会话关闭时索引
index_batch_delay: 2,
auto_generate_layers_on_startup: false, // 默认关闭(避免启动时阻塞)
generate_layers_every_n_messages: 0, // 默认禁用(避免频繁LLM调用)
max_concurrent_llm_tasks: 3, // 默认最多3个并发LLM任务
}
}
}
Expand All @@ -46,8 +49,10 @@ impl Default for AutomationConfig {
pub struct AutomationManager {
indexer: Arc<AutoIndexer>,
extractor: Option<Arc<AutoExtractor>>,
layer_generator: Option<Arc<LayerGenerator>>, // 层级生成器
layer_generator: Option<Arc<LayerGenerator>>,
config: AutomationConfig,
/// 并发限制信号量
llm_semaphore: Arc<Semaphore>,
}

impl AutomationManager {
Expand All @@ -57,11 +62,13 @@ impl AutomationManager {
extractor: Option<Arc<AutoExtractor>>,
config: AutomationConfig,
) -> Self {
let llm_semaphore = Arc::new(Semaphore::new(config.max_concurrent_llm_tasks));
Self {
indexer,
extractor,
layer_generator: None, // 初始为 None,需要单独设置
layer_generator: None,
config,
llm_semaphore,
}
}

Expand All @@ -71,6 +78,11 @@ impl AutomationManager {
self
}

/// 获取并发限制信号量(供外部使用)
pub fn llm_semaphore(&self) -> Arc<Semaphore> {
self.llm_semaphore.clone()
}

/// 🎯 核心方法:启动自动化任务
pub async fn start(self, mut event_rx: mpsc::UnboundedReceiver<CortexEvent>) -> Result<()> {
info!("Starting AutomationManager with config: {:?}", self.config);
Expand All @@ -80,7 +92,10 @@ impl AutomationManager {
if let Some(ref generator) = self.layer_generator {
info!("启动时检查并生成缺失的 L0/L1 文件...");
let generator_clone = generator.clone();
let semaphore = self.llm_semaphore.clone();
tokio::spawn(async move {
// 获取信号量许可
let _permit = semaphore.acquire().await;
match generator_clone.ensure_all_layers().await {
Ok(stats) => {
info!(
Expand Down Expand Up @@ -168,13 +183,16 @@ impl AutomationManager {
count, session_id
);

// 异步生成L0/L1(避免阻塞
// 异步生成L0/L1(带并发限制
let generator_clone = generator.clone();
let indexer_clone = self.indexer.clone();
let session_id_clone = session_id.clone();
let auto_index = self.config.auto_index;
let semaphore = self.llm_semaphore.clone();

tokio::spawn(async move {
// 获取信号量许可(限制并发)
let _permit = semaphore.acquire().await;
let timeline_uri =
format!("cortex://session/{}/timeline", session_id_clone);

Expand Down Expand Up @@ -240,15 +258,18 @@ impl AutomationManager {
session_id
);

// 🔧 异步执行所有后处理任务,避免阻塞事件循环
// 异步执行所有后处理任务(带并发限制)
let extractor = self.extractor.clone();
let generator = self.layer_generator.clone();
let indexer = self.indexer.clone();
let auto_extract = self.config.auto_extract;
let auto_index = self.config.auto_index;
let session_id_clone = session_id.clone();
let semaphore = self.llm_semaphore.clone();

tokio::spawn(async move {
// 获取信号量许可(限制并发)
let _permit = semaphore.acquire().await;
let start = tokio::time::Instant::now();

// 1. 自动提取记忆(如果配置了且有extractor)
Expand Down Expand Up @@ -364,4 +385,4 @@ impl AutomationManager {
}
}
}
}
}
Loading
Loading