Skip to content

Commit 2612bc9

Browse files
committed
feat: parallel function calling
1 parent dc7ffee commit 2612bc9

File tree

6 files changed

+85
-143
lines changed

6 files changed

+85
-143
lines changed

anda_core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "anda_core"
33
description = "Core types and traits for Anda -- an AI agent framework built with Rust, powered by ICP and TEEs."
44
repository = "https://github.com/ldclabs/anda/tree/main/anda_core"
55
publish = true
6-
version = "0.9.10"
6+
version = "0.9.11"
77
edition.workspace = true
88
keywords.workspace = true
99
categories.workspace = true

anda_core/src/model.rs

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -836,70 +836,4 @@ mod tests {
836836
&serde_json::json!({"x": true})
837837
);
838838
}
839-
840-
#[test]
841-
fn test_function_definition_parameters_openapi_order() {
842-
let def = FunctionDefinition {
843-
name: "trigger_paywall".into(),
844-
description: "Trigger payment".into(),
845-
parameters: serde_json::json!({
846-
// Intentionally not in the preferred order.
847-
"properties": {
848-
"hook_text": {
849-
"description": "hook",
850-
"type": "string"
851-
},
852-
"reason": {
853-
"description": "reason",
854-
"type": "string"
855-
}
856-
},
857-
"description": "top",
858-
"required": ["reason", "hook_text"],
859-
"type": "object"
860-
}),
861-
strict: None,
862-
};
863-
864-
let s = serde_json::to_string(&def).unwrap();
865-
let start =
866-
s.find("\"parameters\":{").expect("parameters should exist") + "\"parameters\":{".len();
867-
let sub = &s[start..];
868-
let i_type = sub.find("\"type\"").unwrap();
869-
let i_props = sub.find("\"properties\"").unwrap();
870-
let i_req = sub.find("\"required\"").unwrap();
871-
let i_desc = sub.find("\"description\"").unwrap();
872-
assert!(i_type < i_props);
873-
assert!(i_props > i_desc);
874-
assert!(i_props < i_req);
875-
}
876-
877-
#[test]
878-
fn test_function_definition_nested_schema_order_is_deterministic() {
879-
let def = FunctionDefinition {
880-
name: "trigger_paywall".into(),
881-
description: "Trigger payment".into(),
882-
parameters: serde_json::json!({
883-
"type": "object",
884-
"properties": {
885-
"hook_text": {
886-
// Intentionally reverse order.
887-
"description": "hook",
888-
"type": "string"
889-
}
890-
},
891-
"required": ["hook_text"],
892-
}),
893-
strict: None,
894-
};
895-
896-
let s = serde_json::to_string(&def).unwrap();
897-
// Ensure nested property schema emits type before description.
898-
let needle = "\"hook_text\":{";
899-
let start = s.find(needle).unwrap() + needle.len();
900-
let sub = &s[start..];
901-
let i_type = sub.find("\"type\"").unwrap();
902-
let i_desc = sub.find("\"description\"").unwrap();
903-
assert!(i_type < i_desc);
904-
}
905839
}

anda_engine/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "anda_engine"
33
description = "Agents engine for Anda -- an AI agent framework built with Rust, powered by ICP and TEEs."
44
repository = "https://github.com/ldclabs/anda/tree/main/anda_engine"
55
publish = true
6-
version = "0.9.16"
6+
version = "0.9.17"
77
edition.workspace = true
88
keywords.workspace = true
99
categories.workspace = true

anda_engine/src/context/agent.rs

Lines changed: 81 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@
2626
//! agents or tools while maintaining access to the core functionality.
2727
2828
use anda_core::{
29-
AgentArgs, AgentContext, AgentInput, AgentOutput, AgentSet, BaseContext, BoxError, CacheExpiry,
30-
CacheFeatures, CacheStoreFeatures, CancellationToken, CanisterCaller, CompletionFeatures,
31-
CompletionRequest, ContentPart, Embedding, EmbeddingFeatures, FunctionDefinition, HttpFeatures,
32-
Json, KeysFeatures, Message, ObjectMeta, Path, PutMode, PutResult, RequestMeta, Resource,
33-
StateFeatures, StoreFeatures, ToolCall, ToolInput, ToolOutput, ToolSet, Usage,
29+
AgentArgs, AgentContext, AgentInput, AgentOutput, AgentSet, BaseContext, BoxError, BoxPinFut,
30+
CacheExpiry, CacheFeatures, CacheStoreFeatures, CancellationToken, CanisterCaller,
31+
CompletionFeatures, CompletionRequest, ContentPart, Embedding, EmbeddingFeatures,
32+
FunctionDefinition, HttpFeatures, Json, KeysFeatures, Message, ObjectMeta, Path, PutMode,
33+
PutResult, RequestMeta, Resource, StateFeatures, StoreFeatures, ToolCall, ToolInput,
34+
ToolOutput, ToolSet, Usage,
3435
};
3536
use bytes::Bytes;
3637
use candid::{CandidType, Principal, utils::ArgumentEncoder};
@@ -1032,8 +1033,9 @@ impl CompletionRunner {
10321033
self.chat_history.append(&mut output.chat_history);
10331034

10341035
// 自动执行工具/代理调用
1035-
let mut tool_calls_continue: Vec<ContentPart> = Vec::new();
1036-
for tool in output.tool_calls.iter_mut() {
1036+
let mut tool_call_futs: Vec<BoxPinFut<(Option<ToolCall>, Option<String>)>> = Vec::new();
1037+
let tool_calls = std::mem::take(&mut output.tool_calls);
1038+
for mut tool in tool_calls.into_iter() {
10371039
if self.ctx.cancellation_token().is_cancelled() {
10381040
return Err("operation cancelled".into());
10391041
}
@@ -1042,40 +1044,26 @@ impl CompletionRunner {
10421044
|| tool.name.starts_with("RT_")
10431045
|| tool.name.starts_with("rt_")
10441046
{
1045-
match self
1046-
.ctx
1047-
.tool_call(ToolInput {
1048-
name: tool.name.clone(),
1049-
args: tool.args.clone(),
1050-
resources: self
1051-
.ctx
1052-
.select_tool_resources(&tool.name, &mut self.resources)
1053-
.await,
1054-
meta: None,
1055-
})
1056-
.await
1057-
{
1058-
Ok((mut res, remote_id)) => {
1059-
self.usage.accumulate(&res.usage);
1060-
1061-
// We can not ignore some tool calls.
1062-
// GPT-5: An assistant message with 'tool_calls' must be followed by tool messages responding to each 'tool_call_id'.
1063-
tool_calls_continue.push(ContentPart::ToolOutput {
1064-
name: tool.name.clone(),
1065-
output: res.output.clone(),
1066-
call_id: tool.call_id.clone(),
1067-
remote_id,
1068-
});
1069-
1070-
self.artifacts.append(&mut res.artifacts);
1071-
tool.remote_id = remote_id;
1072-
tool.result = Some(res);
1073-
}
1074-
Err(err) => {
1075-
output.failed_reason = Some(err.to_string());
1076-
return Ok(Some(self.final_output(output)));
1047+
let ctx = self.ctx.clone();
1048+
let input = ToolInput {
1049+
name: tool.name.clone(),
1050+
args: tool.args.clone(),
1051+
resources: self
1052+
.ctx
1053+
.select_tool_resources(&tool.name, &mut self.resources)
1054+
.await,
1055+
meta: None,
1056+
};
1057+
tool_call_futs.push(Box::pin(async move {
1058+
match ctx.tool_call(input).await {
1059+
Ok((res, remote_id)) => {
1060+
tool.remote_id = remote_id;
1061+
tool.result = Some(res);
1062+
(Some(tool), None)
1063+
}
1064+
Err(err) => (None, Some(err.to_string())),
10771065
}
1078-
}
1066+
}));
10791067
} else if self.ctx.agents.contains(&tool.name)
10801068
|| tool.name.starts_with("LA_")
10811069
|| tool.name.starts_with("la_")
@@ -1093,52 +1081,73 @@ impl CompletionRunner {
10931081
return Ok(Some(self.final_output(output)));
10941082
}
10951083
};
1096-
match self
1097-
.ctx
1098-
.agent_run(AgentInput {
1099-
name: tool.name.clone(),
1100-
prompt: args.prompt,
1101-
resources: self
1102-
.ctx
1103-
.agents
1104-
.select_resources(&tool.name, &mut self.resources),
1105-
..Default::default()
1106-
})
1107-
.await
1108-
{
1109-
Ok((mut res, remote_id)) => {
1110-
self.usage.accumulate(&res.usage);
1111-
if res.failed_reason.is_some() {
1112-
output.failed_reason = res.failed_reason;
1113-
return Ok(Some(self.final_output(output)));
1084+
1085+
let ctx = self.ctx.clone();
1086+
let input = AgentInput {
1087+
name: tool.name.clone(),
1088+
prompt: args.prompt,
1089+
resources: self
1090+
.ctx
1091+
.select_agent_resources(&tool.name, &mut self.resources)
1092+
.await,
1093+
..Default::default()
1094+
};
1095+
tool_call_futs.push(Box::pin(async move {
1096+
match ctx.agent_run(input).await {
1097+
Ok((res, remote_id)) => {
1098+
tool.remote_id = remote_id;
1099+
tool.result = Some(ToolOutput {
1100+
output: res.content.clone().into(),
1101+
artifacts: res.artifacts,
1102+
usage: res.usage,
1103+
});
1104+
if let Some(reason) = res.failed_reason {
1105+
(Some(tool), Some(reason))
1106+
} else {
1107+
(Some(tool), None)
1108+
}
11141109
}
1110+
Err(err) => (None, Some(err.to_string())),
1111+
}
1112+
}));
1113+
}
1114+
}
11151115

1116-
// TODO: remote agent id
1116+
let mut tool_calls: Vec<ToolCall> = Vec::new();
1117+
let mut tool_calls_continue: Vec<ContentPart> = Vec::new();
1118+
let mut tool_call_errors: Vec<String> = Vec::new();
1119+
if !tool_call_futs.is_empty() {
1120+
let results = futures::future::join_all(tool_call_futs).await;
1121+
for (tool, err) in results {
1122+
if let Some(mut tool) = tool {
1123+
if let Some(res) = &mut tool.result {
1124+
self.usage.accumulate(&res.usage);
1125+
// We can not ignore some tool calls.
1126+
// GPT-5: An assistant message with 'tool_calls' must be followed by tool messages responding to each 'tool_call_id'.
11171127
tool_calls_continue.push(ContentPart::ToolOutput {
11181128
name: tool.name.clone(),
1119-
output: res.content.clone().into(),
1129+
output: res.output.clone(),
11201130
call_id: tool.call_id.clone(),
1121-
remote_id,
1131+
remote_id: tool.remote_id.clone(),
11221132
});
11231133

11241134
self.artifacts.append(&mut res.artifacts);
1125-
tool.result = Some(ToolOutput {
1126-
output: res.content.clone().into(),
1127-
artifacts: vec![],
1128-
usage: res.usage,
1129-
});
1130-
}
1131-
Err(err) => {
1132-
output.failed_reason = Some(err.to_string());
1133-
return Ok(Some(self.final_output(output)));
1135+
tool_calls.push(tool);
11341136
}
11351137
}
1138+
if let Some(err) = err {
1139+
tool_call_errors.push(err);
1140+
}
11361141
}
1137-
// 未知工具名,忽略
11381142
}
11391143

11401144
// 累计当前轮的 tool_calls
1141-
self.tool_calls.append(&mut output.tool_calls);
1145+
self.tool_calls.append(&mut tool_calls);
1146+
1147+
if !tool_call_errors.is_empty() {
1148+
output.failed_reason = Some(tool_call_errors.join("; "));
1149+
return Ok(Some(self.final_output(output)));
1150+
}
11421151

11431152
// 若无需继续,返回最终结果并结束
11441153
if tool_calls_continue.is_empty() {

anda_engine_server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "anda_engine_server"
33
description = "A http server to serve multiple Anda engines."
44
repository = "https://github.com/ldclabs/anda/tree/main/anda_engine_server"
55
publish = true
6-
version = "0.9.10"
6+
version = "0.9.11"
77
edition.workspace = true
88
keywords.workspace = true
99
categories.workspace = true

anda_engine_server/src/middleware.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@ use axum::{
66
response::{IntoResponse, Response},
77
};
88
use std::{future::Future, sync::Arc};
9-
use tower_http::compression::CompressionLayer;
109

1110
pub use crate::handler::AppState;
12-
11+
pub use tower_http::compression::CompressionLayer;
1312
pub type AppRouter = Router<AppState>;
1413

1514
/// Object-safe middleware trait for applying HTTP middleware to the server `Router`.

0 commit comments

Comments
 (0)