Skip to content

Commit a1cd899

Browse files
committed
test(subscriber): add initial integration tests
The `console-subscriber` crate has no integration tests. There are some unit tests, but without very high coverage of features. Recently, we've found or fixed a few errors which probably could have been caught by a medium level of integration testing. However, testing `console-subscriber` isn't straight forward. It is effectively a tracing subscriber (or layer) on one end, and a gRPC server on the other end. This change adds enough of a testing framework to write some initial integration tests. Each test comprises 2 parts: - One or more "expcted tasks" - A future which will be driven to completion on a dedicated Tokio runtime. Behind the scenes, a console subscriber layer is created and it's server part is connected to a duplex stream. The client of the duplex stream then records incoming updates and reconstructs "actual tasks". The layer itself is set as the default subscriber for the duration of `block_on` which is used to drive the provided future to completioin. The expected tasks have a set of "matches", which is how we find the actual task that we want to validate against. Currently, the only value we match on is the task's name. The expected tasks also have a set of expectations. These are other fields on the actual task which are validated once a matching task is found. Currently, the two fields which can have expectations set on them are the `wakes` and `self_wakes` fields. So, to construct an expected task, which will match a task with the name `"my-task"` and then validate that the matched task gets woken once, the code would be: ```rust ExpectedTask::default() .match_name("my-task") .expect_wakes(1); ``` A future which passes this test could be: ```rust async { task::Builder::new() .name("my-task") .spawn(async { tokio::time::sleep(std::time::Duration::ZERO).await }) } ``` The full test would then look like: ```rust fn wakes_once() { let expected_task = ExpectedTask::default() .match_name("my-task") .expect_wakes(1); let future = async { task::Builder::new() .name("my-task") .spawn(async { tokio::time::sleep(std::time::Duration::ZERO).await }) }; assert_task(expected_task, future); } ``` The PR depends on 2 others: - #447 which fixes an error in the logic that determines whether a task is retained in the aggregator or not. - #451 which exposes the server parts and is necessary to allow us to connect the instrument server and client via a duplex channel. This change contains some initial tests for wakes and self wakes which would have caught the error fixed in #430. Additionally there are tests for the functionality of the testing framework itself.
1 parent 7c8e80a commit a1cd899

File tree

10 files changed

+1049
-16
lines changed

10 files changed

+1049
-16
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

console-subscriber/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ crossbeam-channel = "0.5"
5555

5656
[dev-dependencies]
5757
tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] }
58+
tower = "0.4"
5859
futures = "0.3"
5960

6061
[package.metadata.docs.rs]

console-subscriber/src/aggregator/id_data.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,18 @@ impl<T: Unsent> IdData<T> {
104104
if let Some(dropped_at) = stats.dropped_at() {
105105
let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default();
106106
let dirty = stats.is_unsent();
107-
let should_drop =
107+
let should_retain =
108108
// if there are any clients watching, retain all dirty tasks regardless of age
109109
(dirty && has_watchers)
110-
|| dropped_for > retention;
110+
|| dropped_for <= retention;
111111
tracing::trace!(
112112
stats.id = ?id,
113113
stats.dropped_at = ?dropped_at,
114114
stats.dropped_for = ?dropped_for,
115115
stats.dirty = dirty,
116-
should_drop,
116+
should_retain,
117117
);
118-
return !should_drop;
118+
return should_retain;
119119
}
120120

121121
true

console-subscriber/src/lib.rs

+113-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![doc = include_str!("../README.md")]
22
use console_api as proto;
3-
use proto::resources::resource;
3+
use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
44
use serde::Serialize;
55
use std::{
66
cell::RefCell,
@@ -15,7 +15,10 @@ use std::{
1515
use thread_local::ThreadLocal;
1616
#[cfg(unix)]
1717
use tokio::net::UnixListener;
18-
use tokio::sync::{mpsc, oneshot};
18+
use tokio::{
19+
sync::{mpsc, oneshot},
20+
task::JoinHandle,
21+
};
1922
#[cfg(unix)]
2023
use tokio_stream::wrappers::UnixListenerStream;
2124
use tracing_core::{
@@ -933,18 +936,15 @@ impl Server {
933936
///
934937
/// [`tonic`]: https://docs.rs/tonic/
935938
pub async fn serve_with(
936-
mut self,
939+
self,
937940
mut builder: tonic::transport::Server,
938941
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
939-
let aggregate = self
940-
.aggregator
941-
.take()
942-
.expect("cannot start server multiple times");
943-
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
944942
let addr = self.addr.clone();
945-
let router = builder.add_service(
946-
proto::instrument::instrument_server::InstrumentServer::new(self),
947-
);
943+
let ServerParts {
944+
instrument_server: service,
945+
aggregator_handle: aggregate,
946+
} = self.into_parts();
947+
let router = builder.add_service(service);
948948
let res = match addr {
949949
ServerAddr::Tcp(addr) => {
950950
let serve = router.serve(addr);
@@ -957,9 +957,110 @@ impl Server {
957957
spawn_named(serve, "console::serve").await
958958
}
959959
};
960-
aggregate.abort();
960+
drop(aggregate);
961961
res?.map_err(Into::into)
962962
}
963+
964+
/// Returns the parts needed to spawn a gRPC server and keep the aggregation
965+
/// worker running.
966+
///
967+
/// Note that a server spawned in this way will overwrite any value set by
968+
/// [`Builder::server_addr`] as the user becomes responsible for defining
969+
/// the address when calling [`Router::serve`].
970+
///
971+
/// # Examples
972+
///
973+
/// The parts can be used to serve the instrument server together with
974+
/// other endpoints from the same gRPC server.
975+
///
976+
/// ```
977+
/// use console_subscriber::{ConsoleLayer, ServerParts};
978+
///
979+
/// # let runtime = tokio::runtime::Builder::new_current_thread()
980+
/// # .enable_all()
981+
/// # .build()
982+
/// # .unwrap();
983+
/// # runtime.block_on(async {
984+
/// let (console_layer, server) = ConsoleLayer::builder().build();
985+
/// let ServerParts {
986+
/// instrument_server,
987+
/// aggregator_handle,
988+
/// ..
989+
/// } = server.into_parts();
990+
///
991+
/// let router = tonic::transport::Server::builder()
992+
/// //.add_service(some_other_service)
993+
/// .add_service(instrument_server);
994+
/// let serve = router.serve(std::net::SocketAddr::new(
995+
/// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
996+
/// 6669,
997+
/// ));
998+
///
999+
/// // Finally, spawn the server.
1000+
/// tokio::spawn(serve);
1001+
/// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
1002+
/// # drop(console_layer);
1003+
/// # drop(aggregator_handle);
1004+
/// # });
1005+
/// ```
1006+
///
1007+
/// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1008+
pub fn into_parts(mut self) -> ServerParts {
1009+
let aggregate = self
1010+
.aggregator
1011+
.take()
1012+
.expect("cannot start server multiple times");
1013+
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
1014+
1015+
let service = proto::instrument::instrument_server::InstrumentServer::new(self);
1016+
1017+
ServerParts {
1018+
instrument_server: service,
1019+
aggregator_handle: AggregatorHandle {
1020+
join_handle: aggregate,
1021+
},
1022+
}
1023+
}
1024+
}
1025+
1026+
/// Server Parts
1027+
///
1028+
/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1029+
/// further parts in the future, an as such is marked as `non_exhaustive`.
1030+
///
1031+
/// The `InstrumentServer<Server>` can be used to construct a router which
1032+
/// can be added to a [`tonic`] gRPC server.
1033+
///
1034+
/// The [`AggregatorHandle`] must be kept until after the server has been
1035+
/// shut down.
1036+
///
1037+
/// See the [`Server::into_parts`] documentation for usage.
1038+
#[non_exhaustive]
1039+
pub struct ServerParts {
1040+
/// The instrument server.
1041+
///
1042+
/// See the documentation for [`InstrumentServer`] for details.
1043+
pub instrument_server: InstrumentServer<Server>,
1044+
1045+
/// The aggregate handle.
1046+
///
1047+
/// See the documentation for [`AggregatorHandle`] for details.
1048+
pub aggregator_handle: AggregatorHandle,
1049+
}
1050+
1051+
/// Aggregator handle.
1052+
///
1053+
/// This object is returned from [`Server::into_parts`] and must be
1054+
/// kept as long as the `InstrumentServer<Server>` - which is also
1055+
/// returned - is in use.
1056+
pub struct AggregatorHandle {
1057+
join_handle: JoinHandle<()>,
1058+
}
1059+
1060+
impl Drop for AggregatorHandle {
1061+
fn drop(&mut self) {
1062+
self.join_handle.abort();
1063+
}
9631064
}
9641065

9651066
#[tonic::async_trait]

console-subscriber/tests/framework.rs

+184
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
//! Framework tests
2+
//!
3+
//! The tests in this module are here to verify the testing framework itself.
4+
//! As such, some of these tests may be repeated elsewhere (where we wish to
5+
//! actually test the functionality of `console-subscriber`) and others are
6+
//! negative tests that should panic.
7+
8+
use std::time::Duration;
9+
10+
use tokio::{task, time::sleep};
11+
12+
mod support;
13+
use support::{assert_task, assert_tasks, ExpectedTask, MAIN_TASK_NAME};
14+
15+
#[test]
16+
fn expect_present() {
17+
let expected_task = ExpectedTask::default()
18+
.match_default_name()
19+
.expect_present();
20+
21+
let future = async {
22+
sleep(Duration::ZERO).await;
23+
};
24+
25+
assert_task(expected_task, future);
26+
}
27+
28+
#[test]
29+
#[should_panic(expected = "Test failed: Task validation failed:
30+
- Task<name=main>: no expectations set, if you want to just expect that a matching task is present, use `expect_present()`
31+
")]
32+
fn fail_no_expectations() {
33+
let expected_task = ExpectedTask::default().match_default_name();
34+
35+
let future = async {
36+
sleep(Duration::ZERO).await;
37+
};
38+
39+
assert_task(expected_task, future);
40+
}
41+
42+
#[test]
43+
fn wakes() {
44+
let expected_task = ExpectedTask::default().match_default_name().expect_wakes(1);
45+
46+
let future = async {
47+
sleep(Duration::ZERO).await;
48+
};
49+
50+
assert_task(expected_task, future);
51+
}
52+
53+
#[test]
54+
#[should_panic(expected = "Test failed: Task validation failed:
55+
- Task<name=main>: expected `wakes` to be 5, but actual was 1
56+
")]
57+
fn fail_wakes() {
58+
let expected_task = ExpectedTask::default().match_default_name().expect_wakes(5);
59+
60+
let future = async {
61+
sleep(Duration::ZERO).await;
62+
};
63+
64+
assert_task(expected_task, future);
65+
}
66+
67+
#[test]
68+
fn self_wakes() {
69+
let expected_task = ExpectedTask::default()
70+
.match_default_name()
71+
.expect_self_wakes(1);
72+
73+
let future = async { task::yield_now().await };
74+
75+
assert_task(expected_task, future);
76+
}
77+
78+
#[test]
79+
#[should_panic(expected = "Test failed: Task validation failed:
80+
- Task<name=main>: expected `self_wakes` to be 1, but actual was 0
81+
")]
82+
fn fail_self_wake() {
83+
let expected_task = ExpectedTask::default()
84+
.match_default_name()
85+
.expect_self_wakes(1);
86+
87+
let future = async {
88+
sleep(Duration::ZERO).await;
89+
};
90+
91+
assert_task(expected_task, future);
92+
}
93+
94+
#[test]
95+
fn test_spawned_task() {
96+
let expected_task = ExpectedTask::default()
97+
.match_name("another-name".into())
98+
.expect_present();
99+
100+
let future = async {
101+
task::Builder::new()
102+
.name("another-name")
103+
.spawn(async { task::yield_now().await })
104+
};
105+
106+
assert_task(expected_task, future);
107+
}
108+
109+
#[test]
110+
#[should_panic(expected = "Test failed: Task validation failed:
111+
- Task<name=wrong-name>: no matching actual task was found
112+
")]
113+
fn fail_wrong_task_name() {
114+
let expected_task = ExpectedTask::default().match_name("wrong-name".into());
115+
116+
let future = async { task::yield_now().await };
117+
118+
assert_task(expected_task, future);
119+
}
120+
121+
#[test]
122+
fn multiple_tasks() {
123+
let expected_tasks = vec![
124+
ExpectedTask::default()
125+
.match_name("task-1".into())
126+
.expect_wakes(1),
127+
ExpectedTask::default()
128+
.match_name("task-2".into())
129+
.expect_wakes(1),
130+
];
131+
132+
let future = async {
133+
let task1 = task::Builder::new()
134+
.name("task-1")
135+
.spawn(async { task::yield_now().await })
136+
.unwrap();
137+
let task2 = task::Builder::new()
138+
.name("task-2")
139+
.spawn(async { task::yield_now().await })
140+
.unwrap();
141+
142+
tokio::try_join! {
143+
task1,
144+
task2,
145+
}
146+
.unwrap();
147+
};
148+
149+
assert_tasks(expected_tasks, future);
150+
}
151+
152+
#[test]
153+
#[should_panic(expected = "Test failed: Task validation failed:
154+
- Task<name=task-2>: expected `wakes` to be 2, but actual was 1
155+
")]
156+
fn fail_1_of_2_expected_tasks() {
157+
let expected_tasks = vec![
158+
ExpectedTask::default()
159+
.match_name("task-1".into())
160+
.expect_wakes(1),
161+
ExpectedTask::default()
162+
.match_name("task-2".into())
163+
.expect_wakes(2),
164+
];
165+
166+
let future = async {
167+
let task1 = task::Builder::new()
168+
.name("task-1")
169+
.spawn(async { task::yield_now().await })
170+
.unwrap();
171+
let task2 = task::Builder::new()
172+
.name("task-2")
173+
.spawn(async { task::yield_now().await })
174+
.unwrap();
175+
176+
tokio::try_join! {
177+
task1,
178+
task2,
179+
}
180+
.unwrap();
181+
};
182+
183+
assert_tasks(expected_tasks, future);
184+
}

0 commit comments

Comments
 (0)