Skip to content

Commit e51ac5a

Browse files
hdshawkw
authored andcommitted
feat(subscriber) expose server parts (#451)
The `ConsoleLayer` builder provides the user with a console layer and a server, which is used to start the gRPC server. However, it may be desireable to expose the instrumentation server together with other services on the same Tonic router. This was requested explicitly in #449 428. Additionally, to add tests which make use of the instrumentation server (as part of improving test coverage for #450), more flexibility is needed than what is provided by the current API. Specifically we would like to connect a client and server via an in memory channel, rather than a TCP connection. This change adds an additional method to `console_subscriber::Server` called `into_parts` which allows the user to access the `InstrumentServer` directly. The `Aggregator` is also returned and must be set to run for at least as long as the instrument server. This allows the aggregator to be spawned wherever the user wishes. To facilitate the addition of functionality which would result in more "parts" in the future, `into_parts` returns a non-exhaustive struct, rather than a tuple of parts. Closes: #428
1 parent 15cef93 commit e51ac5a

File tree

2 files changed

+155
-14
lines changed

2 files changed

+155
-14
lines changed

console-subscriber/src/aggregator/mod.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@ mod shrink;
2222
use self::id_data::{IdData, Include};
2323
use self::shrink::{ShrinkMap, ShrinkVec};
2424

25-
pub(crate) struct Aggregator {
25+
/// Aggregates instrumentation traces and prepares state for the instrument
26+
/// server.
27+
///
28+
/// The `Aggregator` is responsible for receiving and organizing the
29+
/// instrumentated events and preparing the data to be served to a instrument
30+
/// client.
31+
pub struct Aggregator {
2632
/// Channel of incoming events emitted by `TaskLayer`s.
2733
events: mpsc::Receiver<Event>,
2834

@@ -157,7 +163,12 @@ impl Aggregator {
157163
}
158164
}
159165

160-
pub(crate) async fn run(mut self) {
166+
/// Runs the aggregator.
167+
///
168+
/// This method will start the aggregator loop and should run as long as
169+
/// the instrument server is running. If the instrument server stops,
170+
/// this future can be aborted.
171+
pub async fn run(mut self) {
161172
let mut publish = tokio::time::interval(self.publish_interval);
162173
loop {
163174
let should_send = tokio::select! {

console-subscriber/src/lib.rs

+142-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![doc = include_str!("../README.md")]
22
use console_api as proto;
3-
use proto::resources::resource;
3+
use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
44
use serde::Serialize;
55
use std::{
66
cell::RefCell,
@@ -15,7 +15,10 @@ use std::{
1515
use thread_local::ThreadLocal;
1616
#[cfg(unix)]
1717
use tokio::net::UnixListener;
18-
use tokio::sync::{mpsc, oneshot};
18+
use tokio::{
19+
sync::{mpsc, oneshot},
20+
task::JoinHandle,
21+
};
1922
#[cfg(unix)]
2023
use tokio_stream::wrappers::UnixListenerStream;
2124
use tracing_core::{
@@ -39,7 +42,7 @@ mod stats;
3942
pub(crate) mod sync;
4043
mod visitors;
4144

42-
use aggregator::Aggregator;
45+
pub use aggregator::Aggregator;
4346
pub use builder::{Builder, ServerAddr};
4447
use callsites::Callsites;
4548
use record::Recorder;
@@ -933,18 +936,16 @@ impl Server {
933936
///
934937
/// [`tonic`]: https://docs.rs/tonic/
935938
pub async fn serve_with(
936-
mut self,
939+
self,
937940
mut builder: tonic::transport::Server,
938941
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
939-
let aggregate = self
940-
.aggregator
941-
.take()
942-
.expect("cannot start server multiple times");
943-
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
944942
let addr = self.addr.clone();
945-
let router = builder.add_service(
946-
proto::instrument::instrument_server::InstrumentServer::new(self),
947-
);
943+
let ServerParts {
944+
instrument_server,
945+
aggregator,
946+
} = self.into_parts();
947+
let aggregate = spawn_named(aggregator.run(), "console::aggregate");
948+
let router = builder.add_service(instrument_server);
948949
let res = match addr {
949950
ServerAddr::Tcp(addr) => {
950951
let serve = router.serve(addr);
@@ -960,6 +961,135 @@ impl Server {
960961
aggregate.abort();
961962
res?.map_err(Into::into)
962963
}
964+
965+
/// Returns the parts needed to spawn a gRPC server and the aggregator that
966+
/// supplies it.
967+
///
968+
/// Note that a server spawned in this way will disregard any value set by
969+
/// [`Builder::server_addr`], as the user becomes responsible for defining
970+
/// the address when calling [`Router::serve`].
971+
///
972+
/// Additionally, the user of this API must ensure that the [`Aggregator`]
973+
/// is running for as long as the gRPC server is. If the server stops
974+
/// running, the aggregator task can be aborted.
975+
///
976+
/// # Examples
977+
///
978+
/// The parts can be used to serve the instrument server together with
979+
/// other endpoints from the same gRPC server.
980+
///
981+
/// ```
982+
/// use console_subscriber::{ConsoleLayer, ServerParts};
983+
///
984+
/// # let runtime = tokio::runtime::Builder::new_current_thread()
985+
/// # .enable_all()
986+
/// # .build()
987+
/// # .unwrap();
988+
/// # runtime.block_on(async {
989+
/// let (console_layer, server) = ConsoleLayer::builder().build();
990+
/// let ServerParts {
991+
/// instrument_server,
992+
/// aggregator,
993+
/// ..
994+
/// } = server.into_parts();
995+
///
996+
/// let aggregator_handle = tokio::spawn(aggregator.run());
997+
/// let router = tonic::transport::Server::builder()
998+
/// //.add_service(some_other_service)
999+
/// .add_service(instrument_server);
1000+
/// let serve = router.serve(std::net::SocketAddr::new(
1001+
/// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1002+
/// 6669,
1003+
/// ));
1004+
///
1005+
/// // Finally, spawn the server.
1006+
/// tokio::spawn(serve);
1007+
/// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
1008+
/// # drop(console_layer);
1009+
/// # let mut aggregator_handle = aggregator_handle;
1010+
/// # aggregator_handle.abort();
1011+
/// # });
1012+
/// ```
1013+
///
1014+
/// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1015+
pub fn into_parts(mut self) -> ServerParts {
1016+
let aggregator = self
1017+
.aggregator
1018+
.take()
1019+
.expect("cannot start server multiple times");
1020+
1021+
let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);
1022+
1023+
ServerParts {
1024+
instrument_server,
1025+
aggregator,
1026+
}
1027+
}
1028+
}
1029+
1030+
/// Server Parts
1031+
///
1032+
/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1033+
/// further parts in the future, an as such is marked as `non_exhaustive`.
1034+
///
1035+
/// The `InstrumentServer<Server>` can be used to construct a router which
1036+
/// can be added to a [`tonic`] gRPC server.
1037+
///
1038+
/// The `aggregator` is a future which should be running as long as the server is.
1039+
/// Generally, this future should be spawned onto an appropriate runtime and then
1040+
/// aborted if the server gets shut down.
1041+
///
1042+
/// See the [`Server::into_parts`] documentation for usage.
1043+
#[non_exhaustive]
1044+
pub struct ServerParts {
1045+
/// The instrument server.
1046+
///
1047+
/// See the documentation for [`InstrumentServer`] for details.
1048+
pub instrument_server: InstrumentServer<Server>,
1049+
1050+
/// The aggregator.
1051+
///
1052+
/// Responsible for collecting and preparing traces for the instrument server
1053+
/// to send its clients.
1054+
///
1055+
/// The aggregator should be [`run`] when the instrument server is started.
1056+
/// If the server stops running for any reason, the aggregator task can be
1057+
/// aborted.
1058+
///
1059+
/// [`run`]: fn@crate::Aggregator::run
1060+
pub aggregator: Aggregator,
1061+
}
1062+
1063+
/// Aggregator handle.
1064+
///
1065+
/// This object is returned from [`Server::into_parts`]. It can be
1066+
/// used to abort the aggregator task.
1067+
///
1068+
/// The aggregator collects the traces that implement the async runtime
1069+
/// being observed and prepares them to be served by the gRPC server.
1070+
///
1071+
/// Normally, if the server, started with [`Server::serve`] or
1072+
/// [`Server::serve_with`] stops for any reason, the aggregator is aborted,
1073+
/// hoewver, if the server was started with the [`InstrumentServer`] returned
1074+
/// from [`Server::into_parts`], then it is the responsibility of the user
1075+
/// of the API to stop the aggregator task by calling [`abort`] on this
1076+
/// object.
1077+
///
1078+
/// [`abort`]: fn@crate::AggregatorHandle::abort
1079+
pub struct AggregatorHandle {
1080+
join_handle: JoinHandle<()>,
1081+
}
1082+
1083+
impl AggregatorHandle {
1084+
/// Aborts the task running this aggregator.
1085+
///
1086+
/// To avoid having a disconnected aggregator running forever, this
1087+
/// method should be called when the [`tonic::transport::Server`] started
1088+
/// with the [`InstrumentServer`] also returned from [`Server::into_parts`]
1089+
/// stops running.
1090+
pub fn abort(&mut self) {
1091+
self.join_handle.abort();
1092+
}
9631093
}
9641094

9651095
#[tonic::async_trait]

0 commit comments

Comments
 (0)