Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ jobs:
cargo run --features="starter-log" --example json_stdout
cargo run --features="starter-log" --example rolling_file
cargo run --features="starter-log" --example single_file
cargo run --features="starter-log,append-async" --example asynchronous
cargo run --features="starter-log,diagnostic-fastrace,layout-google-cloud-logging" --example google_cloud_logging
cargo run --features="starter-log,append-fastrace,diagnostic-fastrace" --example fastrace

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/Cargo.lock
/target
/logs
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ All notable changes to this project will be documented in this file.
### New features

* `PlainTextLayout` is added to support plain text format without any extra dependency.
* `Async` appender is added to support async logging with configurable buffer size and worker threads.
* `Trap` trait and a default `DefaultTrap` is added to support handling internal errors.

## [0.27.0] 2025-08-18

Expand Down
28 changes: 15 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
members = [
"core",
"logforth",

"appenders/*",
"bridges/*",
"diagnostics/*",
Expand All @@ -35,22 +34,25 @@ rust-version = "1.85.0"

[workspace.dependencies]
# Workspace dependencies
logforth-append-fastrace = { version = "0.1.0", path = "appenders/fastrace" }
logforth-append-file = { version = "0.1.0", path = "appenders/file" }
logforth-append-journald = { version = "0.1.0", path = "appenders/journald" }
logforth-append-opentelemetry = { version = "0.1.0", path = "appenders/opentelemetry" }
logforth-append-syslog = { version = "0.1.0", path = "appenders/syslog" }
logforth-bridge-log = { version = "0.1.0", path = "bridges/log" }
logforth-core = { version = "0.1.0", path = "core" }
logforth-diagnostic-fastrace = { version = "0.1.0", path = "diagnostics/fastrace" }
logforth-layout-google-cloud-logging = { version = "0.1.0", path = "layouts/google-cloud-logging" }
logforth-layout-json = { version = "0.1.0", path = "layouts/json" }
logforth-layout-logfmt = { version = "0.1.0", path = "layouts/logfmt" }
logforth-layout-text = { version = "0.1.0", path = "layouts/text" }
logforth-append-async = { version = "0.2.0", path = "appenders/async" }
logforth-append-fastrace = { version = "0.2.0", path = "appenders/fastrace" }
logforth-append-file = { version = "0.2.0", path = "appenders/file" }
logforth-append-journald = { version = "0.2.0", path = "appenders/journald" }
logforth-append-opentelemetry = { version = "0.2.0", path = "appenders/opentelemetry" }
logforth-append-syslog = { version = "0.2.0", path = "appenders/syslog" }
logforth-bridge-log = { version = "0.2.0", path = "bridges/log" }
logforth-core = { version = "0.2.0", path = "core" }
logforth-diagnostic-fastrace = { version = "0.2.0", path = "diagnostics/fastrace" }
logforth-layout-google-cloud-logging = { version = "0.2.0", path = "layouts/google-cloud-logging" }
logforth-layout-json = { version = "0.2.0", path = "layouts/json" }
logforth-layout-logfmt = { version = "0.2.0", path = "layouts/logfmt" }
logforth-layout-text = { version = "0.2.0", path = "layouts/text" }

# Crates.io dependencies
anyhow = { version = "1.0" }
arc-swap = { version = "1.7.1" }
colored = { version = "3.0" }
crossbeam-channel = { version = "0.5.15" }
fastrace = { version = "0.7" }
fasyslog = { version = "1.0.0" }
insta = { version = "1.43.2" }
Expand Down
40 changes: 40 additions & 0 deletions appenders/async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2024 FastLabs Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

[package]
name = "logforth-append-async"
version = "0.2.0"

description = "Asynchronous appender for Logforth."
keywords = ["logging", "log", "async"]

categories.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
rust-version.workspace = true

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
arc-swap = { workspace = true }
crossbeam-channel = { workspace = true }
logforth-core = { workspace = true }

[lints]
workspace = true
5 changes: 5 additions & 0 deletions appenders/async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Async Appender

This appender is a remix of [spdlog-rs's AsyncPoolSink](https://docs.rs/spdlog-rs/*/spdlog/sink/struct.AsyncPoolSink.html), with several modifications to fit this crate's need:

* Instead of a thread pool, it uses a single background thread to drain the log queue.
176 changes: 176 additions & 0 deletions appenders/async/src/append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
use logforth_core::Trap;
use logforth_core::kv;
use logforth_core::kv::Visitor;
use logforth_core::record::Record;
use logforth_core::trap::DefaultTrap;

use crate::Overflow;
use crate::Task;
use crate::state::AsyncState;
use crate::worker::Worker;

/// A composable appender, logging and flushing asynchronously.
#[derive(Debug)]
pub struct Async {
appends: Arc<[Box<dyn Append>]>,
overflow: Overflow,
state: AsyncState,
trap: Arc<dyn Trap>,
}

impl Append for Async {
fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
let mut diagnostics = vec![];

let mut collector = DiagnosticCollector(&mut diagnostics);
for d in diags {
d.visit(&mut collector)?;
}

let overflow = self.overflow;
let task = Task::Log {
appends: self.appends.clone(),
record: Box::new(record.to_owned()),
diags: diagnostics,
};
self.state.send_task(task, overflow)
}

fn flush(&self) -> Result<(), Error> {
let overflow = self.overflow;
let task = Task::Flush {
appends: self.appends.clone(),
};
self.state.send_task(task, overflow)
}

fn exit(&self) -> Result<(), Error> {
// https://github.com/SpriteOvO/spdlog-rs/issues/64
//
// If the program is tearing down, this will be the final flush. `crossbeam`
// uses thread-local internally, which is not supported in `atexit` callback.
// This can be bypassed by flushing sinks directly on the current thread, but
// before we do that we have to destroy the thread pool to ensure that any
// pending log tasks are completed.
self.state.destroy();
for append in self.appends.iter() {
if let Err(err) = append.exit() {
self.trap.trap(&err);
}
}
Ok(())
}
}

/// A builder for configuring an async appender.
pub struct AsyncBuilder {
thread_name: String,
appends: Vec<Box<dyn Append>>,
buffered_lines_limit: Option<usize>,
trap: Arc<dyn Trap>,
overflow: Overflow,
}

impl AsyncBuilder {
/// Create a new async appender builder.
pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
AsyncBuilder {
thread_name: thread_name.into(),
appends: vec![],
buffered_lines_limit: None,
trap: Arc::new(DefaultTrap::default()),
overflow: Overflow::Block,
}
}

/// Set the buffer size of pending messages.
pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
self.buffered_lines_limit = buffered_lines_limit;
self
}

/// Set the overflow policy to block when the buffer is full.
pub fn overflow_block(mut self) -> Self {
self.overflow = Overflow::Block;
self
}

/// Set the overflow policy to drop incoming messages when the buffer is full.
pub fn overflow_drop_incoming(mut self) -> Self {
self.overflow = Overflow::DropIncoming;
self
}

/// Set the trap for this async appender.
pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
let trap = trap.into();
self.trap = trap.into();
self
}

/// Add an appender to this async appender.
pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
self.appends.push(append.into());
self
}

/// Build the async appender.
pub fn build(self) -> Async {
let Self {
thread_name,
appends,
buffered_lines_limit,
trap,
overflow,
} = self;

let appends = appends.into_boxed_slice().into();

let (sender, receiver) = match buffered_lines_limit {
Some(limit) => crossbeam_channel::bounded(limit),
None => crossbeam_channel::unbounded(),
};

let worker = Worker::new(receiver, trap.clone());
let thread_handle = std::thread::Builder::new()
.name(thread_name)
.spawn(move || worker.run())
.expect("failed to spawn async appender thread");
let state = AsyncState::new(sender, thread_handle);

Async {
appends,
overflow,
state,
trap,
}
}
}

struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);

impl<'a> Visitor for DiagnosticCollector<'a> {
fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> {
self.0.push((key.to_owned(), value.to_owned()));
Ok(())
}
}
49 changes: 49 additions & 0 deletions appenders/async/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! A composable appender, logging and flushing asynchronously.

#![cfg_attr(docsrs, feature(doc_auto_cfg))]

use std::sync::Arc;

use logforth_core::Append;
use logforth_core::kv;
use logforth_core::record::RecordOwned;

mod append;
mod state;
mod worker;

pub use self::append::Async;
pub use self::append::AsyncBuilder;

enum Task {
Log {
appends: Arc<[Box<dyn Append>]>,
record: Box<RecordOwned>,
diags: Vec<(kv::KeyOwned, kv::ValueOwned)>,
},
Flush {
appends: Arc<[Box<dyn Append>]>,
},
}

#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
enum Overflow {
/// Blocks until the channel is not full.
Block,
/// Drops the incoming operation.
DropIncoming,
}
Loading
Loading