Skip to content
Merged
28 changes: 15 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
members = [
"core",
"logforth",

"appenders/*",
"bridges/*",
"diagnostics/*",
Expand All @@ -35,22 +34,25 @@ rust-version = "1.85.0"

[workspace.dependencies]
# Workspace dependencies
logforth-append-fastrace = { version = "0.1.0", path = "appenders/fastrace" }
logforth-append-file = { version = "0.1.0", path = "appenders/file" }
logforth-append-journald = { version = "0.1.0", path = "appenders/journald" }
logforth-append-opentelemetry = { version = "0.1.0", path = "appenders/opentelemetry" }
logforth-append-syslog = { version = "0.1.0", path = "appenders/syslog" }
logforth-bridge-log = { version = "0.1.0", path = "bridges/log" }
logforth-core = { version = "0.1.0", path = "core" }
logforth-diagnostic-fastrace = { version = "0.1.0", path = "diagnostics/fastrace" }
logforth-layout-google-cloud-logging = { version = "0.1.0", path = "layouts/google-cloud-logging" }
logforth-layout-json = { version = "0.1.0", path = "layouts/json" }
logforth-layout-logfmt = { version = "0.1.0", path = "layouts/logfmt" }
logforth-layout-text = { version = "0.1.0", path = "layouts/text" }
logforth-append-asynchronous = { version = "0.2.0", path = "appenders/asynchronous" }
logforth-append-fastrace = { version = "0.2.0", path = "appenders/fastrace" }
logforth-append-file = { version = "0.2.0", path = "appenders/file" }
logforth-append-journald = { version = "0.2.0", path = "appenders/journald" }
logforth-append-opentelemetry = { version = "0.2.0", path = "appenders/opentelemetry" }
logforth-append-syslog = { version = "0.2.0", path = "appenders/syslog" }
logforth-bridge-log = { version = "0.2.0", path = "bridges/log" }
logforth-core = { version = "0.2.0", path = "core" }
logforth-diagnostic-fastrace = { version = "0.2.0", path = "diagnostics/fastrace" }
logforth-layout-google-cloud-logging = { version = "0.2.0", path = "layouts/google-cloud-logging" }
logforth-layout-json = { version = "0.2.0", path = "layouts/json" }
logforth-layout-logfmt = { version = "0.2.0", path = "layouts/logfmt" }
logforth-layout-text = { version = "0.2.0", path = "layouts/text" }

# Crates.io dependencies
anyhow = { version = "1.0" }
arc-swap = { version = "1.7.1" }
colored = { version = "3.0" }
crossbeam-channel = { version = "0.5.15" }
fastrace = { version = "0.7" }
fasyslog = { version = "1.0.0" }
insta = { version = "1.43.2" }
Expand Down
40 changes: 40 additions & 0 deletions appenders/asynchronous/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2024 FastLabs Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

[package]
name = "logforth-append-asynchronous"
version = "0.2.0"

description = "Asynchronous appender for Logforth."
keywords = ["logging", "log", "async"]

categories.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
rust-version.workspace = true

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
arc-swap = { workspace = true }
crossbeam-channel = { workspace = true }
logforth-core = { workspace = true }

[lints]
workspace = true
5 changes: 5 additions & 0 deletions appenders/asynchronous/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Asynchronous Appender

This appender is a remix of [spdlog-rs's AsyncPoolSink](https://docs.rs/spdlog-rs/*/spdlog/sink/struct.AsyncPoolSink.html), with several modifications to fit this crate's need:

* Instead of a thread pool, it uses a single background thread to drain the log queue.
169 changes: 169 additions & 0 deletions appenders/asynchronous/src/append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
use logforth_core::ErrorSink;
use logforth_core::kv;
use logforth_core::kv::Visitor;
use logforth_core::record::Record;

use crate::Task;
use crate::state::AppendState;
use crate::worker::Worker;

/// A composable appender, logging and flushing asynchronously.
#[derive(Debug)]
pub struct Asynchronous {
appends: Arc<[Box<dyn Append>]>,
overflow: Overflow,
state: AppendState,
}

impl Append for Asynchronous {
fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
let mut diagnostics = vec![];

let mut collector = DiagnosticCollector(&mut diagnostics);
for d in diags {
d.visit(&mut collector)?;
}

let overflow = self.overflow;
let task = Task::Log {
appends: self.appends.clone(),
record: Box::new(record.to_owned()),
diags: diagnostics,
};
self.state.send_task(task, overflow)
}

fn flush(&self) -> Result<(), Error> {
let overflow = self.overflow;
let task = Task::Flush {
appends: self.appends.clone(),
};
self.state.send_task(task, overflow)
}
}

/// A builder for configuring an asynchronous appender.
pub struct AsyncBuilder {
thread_name: String,
appends: Vec<Box<dyn Append>>,
buffered_lines_limit: Option<usize>,
error_sink: Box<dyn ErrorSink>,
overflow: Overflow,
}

impl AsyncBuilder {
/// Create a new asynchronous appender builder.
pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
AsyncBuilder {
thread_name: thread_name.into(),
appends: vec![],
buffered_lines_limit: None,
error_sink: Box::new(PrintErrorSink),
overflow: Overflow::Block,
}
}

/// Set the buffer size of pending messages.
pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
self.buffered_lines_limit = buffered_lines_limit;
self
}

/// Set the overflow policy for this asynchronous appender.
pub fn overflow(mut self, overflow: Overflow) -> Self {
self.overflow = overflow;
self
}

/// Set the error sink for this asynchronous appender.
pub fn error_sink(mut self, error_sink: impl Into<Box<dyn ErrorSink>>) -> Self {
self.error_sink = error_sink.into();
self
}

/// Add an appender to this asynchronous appender.
pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
self.appends.push(append.into());
self
}

/// Build the asynchronous appender.
pub fn build(self) -> Asynchronous {
let Self {
thread_name,
appends,
buffered_lines_limit,
error_sink,
overflow,
} = self;

let appends = appends.into_boxed_slice().into();

let (sender, receiver) = match buffered_lines_limit {
Some(limit) => crossbeam_channel::bounded(limit),
None => crossbeam_channel::unbounded(),
};

let worker = Worker::new(receiver, error_sink);
let thread_handle = std::thread::Builder::new()
.name(thread_name)
.spawn(move || worker.run())
.expect("failed to spawn asynchronous appender thread");
let state = AppendState::new(sender, thread_handle);

Asynchronous {
appends,
overflow,
state,
}
}
}

/// Overflow policy for [`Asynchronous`].
///
/// When the channel is full, an incoming operation is handled according to the
/// specified policy.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
#[non_exhaustive]
pub enum Overflow {
/// Blocks until the channel is not full.
Block,
/// Drops the incoming operation.
DropIncoming,
}

struct PrintErrorSink;

impl ErrorSink for PrintErrorSink {
fn sink(&self, err: &Error) {
eprintln!("{err}");
}
}

struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);

impl<'a> Visitor for DiagnosticCollector<'a> {
fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> {
self.0.push((key.to_owned(), value.to_owned()));
Ok(())
}
}
42 changes: 42 additions & 0 deletions appenders/asynchronous/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! A composable appender, logging and flushing asynchronously.

#![cfg_attr(docsrs, feature(doc_auto_cfg))]

use std::sync::Arc;

use logforth_core::Append;
use logforth_core::kv;
use logforth_core::record::RecordOwned;

mod append;
mod state;
mod worker;

pub use self::append::AsyncBuilder;
pub use self::append::Asynchronous;
pub use self::append::Overflow;

enum Task {
Log {
appends: Arc<[Box<dyn Append>]>,
record: Box<RecordOwned>,
diags: Vec<(kv::KeyOwned, kv::ValueOwned)>,
},
Flush {
appends: Arc<[Box<dyn Append>]>,
},
}
86 changes: 86 additions & 0 deletions appenders/asynchronous/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::thread::JoinHandle;

use arc_swap::ArcSwapOption;
use crossbeam_channel::Sender;
use logforth_core::Error;

use crate::Task;
use crate::append::Overflow;

#[derive(Debug)]
pub(crate) struct AppendState(ArcSwapOption<State>);

#[derive(Debug)]
struct State {
sender: Sender<Task>,
handle: JoinHandle<()>,
}

impl AppendState {
pub(crate) fn new(sender: Sender<Task>, handle: JoinHandle<()>) -> Self {
let state = State { sender, handle };
Self(ArcSwapOption::from(Some(Arc::new(state))))
}

pub(crate) fn send_task(&self, task: Task, overflow: Overflow) -> Result<(), Error> {
let state = self.0.load();
// SAFETY: state is always Some before dropped.
let state = state.as_ref().unwrap();
let sender = &state.sender;

match overflow {
Overflow::Block => sender.send(task).map_err(|err| {
Error::new(match err.0 {
Task::Log { .. } => "failed to send log task to asynchronous appender",
Task::Flush { .. } => "failed to send flush task to asynchronous appender",
})
}),
Overflow::DropIncoming => match sender.try_send(task) {
Ok(()) => Ok(()),
Err(crossbeam_channel::TrySendError::Full(_)) => Ok(()),
Err(crossbeam_channel::TrySendError::Disconnected(task)) => {
Err(Error::new(match task {
Task::Log { .. } => "failed to send log task to asynchronous appender",
Task::Flush { .. } => "failed to send flush task to asynchronous appender",
}))
}
},
}
}

pub(crate) fn destroy(&self) {
if let Some(state) = self.0.swap(None) {
// SAFETY: state has always one strong count before swapped.
let State { sender, handle } = Arc::into_inner(state).unwrap();

// drop our sender, threads will break the loop after receiving and processing
drop(sender);

// wait for the thread to finish
handle
.join()
.expect("failed to join asynchronous appender thread");
}
}
}

impl Drop for AppendState {
fn drop(&mut self) {
self.destroy();
}
}
Loading
Loading