Skip to content

Commit 7ca309a

Browse files
authored
feat: add multi-reply support for async mode (more/continues flags) (#141)
The `more` flag in requests and `continues` flag in responses were not working in async mode. This commit fixes the async code generation and client to properly support multi-reply sequences. Server-side changes (varlink_generator): - AsyncCall now tracks `continues`, `more`, `oneway` flags and collects multiple replies in a Vec - `wants_more()` returns the actual client request flag - `is_oneway()` returns the actual client request flag - `set_continues()` stores the value and applies it to replies - `reply_struct()` validates consistency and sets `continues: Some(true)` - Handler dispatch extracts request metadata and sends all replies Client-side changes (varlink/client_async.rs): - Added persistent `recv_buf` to AsyncMethodCall to handle multiple messages arriving in a single read (fixes buffering bug) - Added `continues()` method to check if more replies are expected - Rewrote `recv()` to properly parse messages from the buffer Also adds `examples/async_more/` demonstrating multi-reply functionality. 🤖 Generated with [Claude Code](https://claude.com/claude-code)
2 parents 57f51f0 + c767a9d commit 7ca309a

10 files changed

Lines changed: 929 additions & 90 deletions

File tree

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ members = [
1212
"examples/ping",
1313
"examples/async_ping",
1414
"examples/async_example",
15+
"examples/async_more",
1516
]
1617
resolver = "1"
1718

examples/async_more/Cargo.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "async_more"
3+
version = "1.0.0"
4+
authors = ["Harald Hoyer <harald@hoyer.xyz>"]
5+
build = "build.rs"
6+
edition = "2021"
7+
publish = false
8+
rust-version = "1.80.0"
9+
10+
[features]
11+
default = ["tokio"]
12+
tokio = []
13+
14+
[dependencies]
15+
anyhow = { workspace = true }
16+
async-trait = { workspace = true }
17+
varlink = { workspace = true, features = ["tokio"] }
18+
varlink_stdinterfaces = { workspace = true, features = ["tokio"] }
19+
serde = { workspace = true }
20+
serde_derive = { workspace = true }
21+
serde_json = { workspace = true }
22+
tokio = { workspace = true, features = ["full"] }
23+
24+
[build-dependencies]
25+
varlink_generator = { workspace = true }

examples/async_more/build.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
fn main() {
2+
varlink_generator::cargo_build_tosource_options(
3+
"src/org.example.more.varlink",
4+
true,
5+
&varlink_generator::GeneratorOptions {
6+
generate_async: true,
7+
..Default::default()
8+
},
9+
);
10+
}

examples/async_more/src/main.rs

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
//! Async Varlink "More" Example using Tokio
2+
//!
3+
//! This example demonstrates multi-reply functionality in async mode,
4+
//! using the `more` flag in requests and `continues` flag in responses.
5+
6+
use anyhow::Result;
7+
use async_trait::async_trait;
8+
use std::sync::Arc;
9+
use std::time::Duration;
10+
use varlink::{listen_async, AsyncVarlinkService, ListenAsyncConfig};
11+
12+
// Include the generated code
13+
mod org_example_more;
14+
use org_example_more::{State, VarlinkClientInterface, VarlinkInterface};
15+
16+
/// More service implementation using the generated async VarlinkInterface trait
17+
struct MoreService {
18+
sleep_duration: Duration,
19+
}
20+
21+
#[async_trait]
22+
impl VarlinkInterface for MoreService {
23+
async fn ping(
24+
&self,
25+
call: &mut dyn org_example_more::Call_Ping,
26+
ping: String,
27+
) -> varlink::Result<()> {
28+
println!("Server: Ping request with: '{}'", ping);
29+
call.reply(ping)
30+
}
31+
32+
async fn test_more(
33+
&self,
34+
call: &mut dyn org_example_more::Call_TestMore,
35+
n: i64,
36+
) -> varlink::Result<()> {
37+
println!("Server: TestMore request with n={}", n);
38+
39+
// Check if the client requested multiple replies
40+
if !call.wants_more() {
41+
println!("Server: Error - client did not request 'more'");
42+
return call.reply_test_more_error("called without more".into());
43+
}
44+
45+
if n == 0 {
46+
return call.reply_test_more_error("n == 0".into());
47+
}
48+
49+
// Indicate that more replies are coming
50+
call.set_continues(true);
51+
52+
// Send start state
53+
call.reply(State {
54+
start: Some(true),
55+
end: None,
56+
progress: None,
57+
})?;
58+
println!("Server: Sent start state");
59+
60+
// Send progress states
61+
for i in 0..n {
62+
tokio::time::sleep(self.sleep_duration).await;
63+
let progress = i * 100 / n;
64+
call.reply(State {
65+
progress: Some(progress),
66+
start: None,
67+
end: None,
68+
})?;
69+
println!("Server: Sent progress {}%", progress);
70+
}
71+
72+
// Send 100% progress
73+
call.reply(State {
74+
progress: Some(100),
75+
start: None,
76+
end: None,
77+
})?;
78+
println!("Server: Sent progress 100%");
79+
80+
// Final reply - no more continues
81+
call.set_continues(false);
82+
83+
call.reply(State {
84+
end: Some(true),
85+
progress: None,
86+
start: None,
87+
})?;
88+
println!("Server: Sent end state");
89+
90+
Ok(())
91+
}
92+
93+
async fn stop_serving(
94+
&self,
95+
call: &mut dyn org_example_more::Call_StopServing,
96+
) -> varlink::Result<()> {
97+
call.reply()?;
98+
Err(varlink::ErrorKind::ConnectionClosed.into())
99+
}
100+
}
101+
102+
/// Run a server with AsyncVarlinkService for introspection support
103+
async fn run_server(addr: &str, sleep_ms: u64) -> Result<()> {
104+
println!("Server: Listening on {} (with introspection)", addr);
105+
106+
let more_service = Arc::new(MoreService {
107+
sleep_duration: Duration::from_millis(sleep_ms),
108+
});
109+
let more_handler = Arc::new(org_example_more::new(more_service));
110+
111+
// Wrap with AsyncVarlinkService for introspection support
112+
let service = Arc::new(AsyncVarlinkService::new(
113+
"org.example",
114+
"Async More Example",
115+
"1.0",
116+
"https://github.com/varlink/rust",
117+
vec![more_handler],
118+
));
119+
120+
listen_async(
121+
service,
122+
format!("tcp:{}", addr),
123+
&ListenAsyncConfig::default(),
124+
)
125+
.await
126+
.map_err(|e| anyhow::anyhow!("Server error: {:?}", e))
127+
}
128+
129+
/// Async client that makes a TestMore request and receives multiple replies
130+
async fn run_client(addr: &str, n: i64) -> Result<()> {
131+
println!("Client: Connecting to {}", addr);
132+
133+
let connection = varlink::AsyncConnection::with_address(format!("tcp:{}", addr))
134+
.await
135+
.map_err(|e| anyhow::anyhow!("Connection error: {:?}", e))?;
136+
137+
let client = org_example_more::VarlinkClient::new(connection);
138+
139+
println!("Client: Sending TestMore request with n={}", n);
140+
141+
// Use .more() to indicate we want multiple replies
142+
let mut method_call = client.test_more(n);
143+
method_call
144+
.more()
145+
.await
146+
.map_err(|e| anyhow::anyhow!("More error: {:?}", e))?;
147+
148+
// Receive all replies
149+
loop {
150+
let reply = method_call
151+
.recv()
152+
.await
153+
.map_err(|e| anyhow::anyhow!("Recv error: {:?}", e))?;
154+
155+
let state = reply.state;
156+
match state {
157+
State {
158+
start: Some(true),
159+
end: None,
160+
progress: None,
161+
} => {
162+
println!("Client: --- Start ---");
163+
}
164+
State {
165+
start: None,
166+
end: Some(true),
167+
progress: None,
168+
} => {
169+
println!("Client: --- End ---");
170+
break; // This is the last reply
171+
}
172+
State {
173+
start: None,
174+
end: None,
175+
progress: Some(progress),
176+
} => {
177+
println!("Client: Progress: {}%", progress);
178+
}
179+
_ => {
180+
println!("Client: Got unknown state: {:?}", state);
181+
}
182+
}
183+
184+
// Check if there are more replies coming
185+
if !method_call.continues() {
186+
println!("Client: No more replies expected");
187+
break;
188+
}
189+
}
190+
191+
println!("Client: Done receiving replies");
192+
Ok(())
193+
}
194+
195+
#[tokio::main]
196+
async fn main() -> Result<()> {
197+
let addr = "127.0.0.1:9997";
198+
let n = 5; // Number of progress steps
199+
200+
// Spawn the server
201+
let server_handle = tokio::spawn(async move {
202+
if let Err(e) = run_server(addr, 100).await {
203+
eprintln!("Server error: {}", e);
204+
}
205+
});
206+
207+
// Give the server time to start
208+
tokio::time::sleep(Duration::from_millis(100)).await;
209+
210+
// Run the client
211+
println!("\n=== Running Async More Example ===\n");
212+
run_client(addr, n).await?;
213+
214+
// Clean shutdown
215+
server_handle.abort();
216+
217+
println!("\n=== Example Complete ===");
218+
219+
Ok(())
220+
}
221+
222+
#[cfg(test)]
223+
mod tests {
224+
use super::*;
225+
use std::sync::atomic::{AtomicBool, Ordering};
226+
227+
#[tokio::test]
228+
async fn test_async_more() {
229+
let addr = "127.0.0.1:9996";
230+
231+
// Create a stop flag for graceful shutdown
232+
let stop = Arc::new(AtomicBool::new(false));
233+
let stop_clone = Arc::clone(&stop);
234+
235+
// Spawn server
236+
let server_handle = tokio::spawn(async move {
237+
let more_service = Arc::new(MoreService {
238+
sleep_duration: Duration::from_millis(10),
239+
});
240+
let more_handler = Arc::new(org_example_more::new(more_service));
241+
242+
let service = Arc::new(AsyncVarlinkService::new(
243+
"org.example",
244+
"Async More Test",
245+
"1.0",
246+
"https://github.com/varlink/rust",
247+
vec![more_handler],
248+
));
249+
250+
let config = ListenAsyncConfig {
251+
idle_timeout: Duration::ZERO,
252+
stop_listening: Some(stop_clone),
253+
};
254+
listen_async(service, format!("tcp:{}", addr), &config)
255+
.await
256+
.ok();
257+
});
258+
259+
// Give server time to start
260+
tokio::time::sleep(Duration::from_millis(100)).await;
261+
262+
// Test multi-reply client
263+
let result = run_client(addr, 3).await;
264+
assert!(
265+
result.is_ok(),
266+
"Multi-reply client failed: {:?}",
267+
result.err()
268+
);
269+
270+
// Signal server to stop
271+
stop.store(true, Ordering::SeqCst);
272+
tokio::time::sleep(Duration::from_millis(200)).await;
273+
server_handle.abort();
274+
}
275+
276+
#[tokio::test]
277+
async fn test_wants_more_check() {
278+
let addr = "127.0.0.1:9995";
279+
280+
// Create a stop flag for graceful shutdown
281+
let stop = Arc::new(AtomicBool::new(false));
282+
let stop_clone = Arc::clone(&stop);
283+
284+
// Spawn server
285+
let server_handle = tokio::spawn(async move {
286+
let more_service = Arc::new(MoreService {
287+
sleep_duration: Duration::from_millis(10),
288+
});
289+
let more_handler = Arc::new(org_example_more::new(more_service));
290+
291+
let service = Arc::new(AsyncVarlinkService::new(
292+
"org.example",
293+
"Async More Test",
294+
"1.0",
295+
"https://github.com/varlink/rust",
296+
vec![more_handler],
297+
));
298+
299+
let config = ListenAsyncConfig {
300+
idle_timeout: Duration::ZERO,
301+
stop_listening: Some(stop_clone),
302+
};
303+
listen_async(service, format!("tcp:{}", addr), &config)
304+
.await
305+
.ok();
306+
});
307+
308+
// Give server time to start
309+
tokio::time::sleep(Duration::from_millis(100)).await;
310+
311+
// Try to call TestMore without .more() - should get an error
312+
let connection = varlink::AsyncConnection::with_address(format!("tcp:{}", addr))
313+
.await
314+
.expect("Connection failed");
315+
316+
let client = org_example_more::VarlinkClient::new(connection);
317+
318+
// Call without .more() - should return TestMoreError
319+
let result = client.test_more(5).call().await;
320+
assert!(result.is_err(), "Expected error when calling without more");
321+
322+
// Signal server to stop
323+
stop.store(true, Ordering::SeqCst);
324+
tokio::time::sleep(Duration::from_millis(200)).await;
325+
server_handle.abort();
326+
}
327+
}

0 commit comments

Comments
 (0)