Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/log-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ edition = "2021"
name = "log_service"
path = "src/bin/log.rs"

[features]
faults = []

[dependencies]
arrow = { workspace = true }
async-trait = { workspace = true }
Expand Down
141 changes: 137 additions & 4 deletions rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ use wal3::{
create_repl_factories, create_s3_factories,
interfaces::repl::ManifestManager as ReplManifestManager, interfaces::ManifestManagerFactory,
scan_from_manifest, Cursor, CursorName, CursorStore, CursorStoreOptions, CursorWitness,
Fragment, FragmentManagerFactory, GarbageCollectionOptions, Limits, LogPosition, LogReader,
LogReaderOptions, LogReaderTrait, LogWriter, LogWriterOptions, LogWriterTrait, Manifest,
ManifestAndWitness, MarkDirty as MarkDirtyTrait, ReplicatedFragmentOptions, Snapshot,
SnapshotCache, SnapshotPointer, StorageWrapper, INTRINSIC_CURSOR,
Fragment, FragmentManagerFactory, FragmentUploadFaultInjector, GarbageCollectionOptions,
Limits, LogPosition, LogReader, LogReaderOptions, LogReaderTrait, LogWriter, LogWriterOptions,
LogWriterTrait, Manifest, ManifestAndWitness, MarkDirty as MarkDirtyTrait,
ReplicatedFragmentOptions, Snapshot, SnapshotCache, SnapshotPointer, StorageWrapper,
INTRINSIC_CURSOR,
};
#[cfg(feature = "faults")]
use wal3::{
FaultInjectingFragmentManagerFactory, FragmentUploadFault, FRAGMENT_UPLOAD_FAULT_LABEL,
};

mod scrub;
Expand Down Expand Up @@ -117,6 +122,50 @@ const DEFAULT_CONFIG_PATH: &str = "./chroma_config.yaml";

const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH";

#[cfg(feature = "faults")]
#[derive(Clone)]
struct LogServiceFragmentUploadFaultInjector {
faults: Arc<FaultRegistry>,
}

#[cfg(feature = "faults")]
impl LogServiceFragmentUploadFaultInjector {
fn new(faults: Arc<FaultRegistry>) -> Self {
Self { faults }
}
}

#[cfg(feature = "faults")]
impl FragmentUploadFaultInjector for LogServiceFragmentUploadFaultInjector {
fn fault_for_upload(&self) -> Option<FragmentUploadFault> {
self.faults
.action_for_label(FRAGMENT_UPLOAD_FAULT_LABEL)
.map(|action| match action {
chroma_faults::FaultActionKind::Unavailable => FragmentUploadFault::Unavailable,
chroma_faults::FaultActionKind::Delay(delay) => FragmentUploadFault::Delay(delay),
})
}
}

#[cfg(feature = "faults")]
fn maybe_wrap_fragment_manager_factory<F>(
fragment_manager_factory: F,
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
) -> FaultInjectingFragmentManagerFactory<F> {
FaultInjectingFragmentManagerFactory::new(
fragment_manager_factory,
fragment_upload_fault_injector,
)
}

#[cfg(not(feature = "faults"))]
fn maybe_wrap_fragment_manager_factory<F>(
fragment_manager_factory: F,
_fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
) -> F {
fragment_manager_factory
}

// SAFETY(rescrv): There's a test that this produces a valid type.
static STABLE_PREFIX: CursorName = unsafe { CursorName::from_string_unchecked("stable_prefix") };

Expand Down Expand Up @@ -224,6 +273,7 @@ struct FactoryCreationContext<'a> {
collection_id: CollectionUuid,
prefix: String,
snapshot_cache: Arc<dyn SnapshotCache>,
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
}

impl<'a> FactoryCreationContext<'a> {
Expand All @@ -232,6 +282,7 @@ impl<'a> FactoryCreationContext<'a> {
topology_name: Option<&'a TopologyName>,
collection_id: CollectionUuid,
snapshot_cache: Arc<dyn SnapshotCache>,
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
) -> Self {
let prefix = collection_id.storage_prefix_for_log();
Self {
Expand All @@ -240,6 +291,7 @@ impl<'a> FactoryCreationContext<'a> {
collection_id,
prefix,
snapshot_cache,
fragment_upload_fault_injector,
}
}

Expand Down Expand Up @@ -390,6 +442,10 @@ impl<'a> FactoryCreationContext<'a> {
region_names,
self.collection_id.0,
);
let fragment_factory = maybe_wrap_fragment_manager_factory(
fragment_factory,
self.fragment_upload_fault_injector.as_ref().map(Arc::clone),
);
let fragment_publisher = fragment_factory.make_publisher().await?;
Ok(wal3::copy(reader, cursor, &fragment_publisher, manifest_factory, cmek).await?)
}
Expand Down Expand Up @@ -417,6 +473,10 @@ impl<'a> FactoryCreationContext<'a> {
Arc::new(()),
Arc::clone(&self.snapshot_cache),
);
let fragment_factory = maybe_wrap_fragment_manager_factory(
fragment_factory,
self.fragment_upload_fault_injector.as_ref().map(Arc::clone),
);
let fragment_publisher = fragment_factory.make_publisher().await?;
Ok(wal3::copy(reader, cursor, &fragment_publisher, manifest_factory, cmek).await?)
}
Expand Down Expand Up @@ -563,6 +623,7 @@ async fn get_log_from_handle<'a>(
prefix: &str,
mark_dirty: MarkDirty,
snapshot_cache: Arc<dyn SnapshotCache>,
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
cmek: Option<Cmek>,
) -> Result<LogRef<'a>, Error> {
let active = handle.active.lock().await;
Expand All @@ -577,6 +638,7 @@ async fn get_log_from_handle<'a>(
prefix,
mark_dirty,
snapshot_cache,
fragment_upload_fault_injector,
cmek,
)
.await
Expand All @@ -594,6 +656,7 @@ async fn get_log_from_handle_with_mutex_held<'a>(
prefix: &str,
mark_dirty: MarkDirty,
snapshot_cache: Arc<dyn SnapshotCache>,
fragment_upload_fault_injector: Option<Arc<dyn FragmentUploadFaultInjector>>,
cmek: Option<Cmek>,
) -> Result<LogRef<'a>, Error> {
if active.log.is_some() {
Expand Down Expand Up @@ -640,6 +703,10 @@ async fn get_log_from_handle_with_mutex_held<'a>(
region_names,
collection_id.0,
);
let fragment_publisher_factory = maybe_wrap_fragment_manager_factory(
fragment_publisher_factory,
fragment_upload_fault_injector.as_ref().map(Arc::clone),
);
let opened = LogWriter::open_or_initialize(
write_options.clone(),
"log writer",
Expand Down Expand Up @@ -690,6 +757,10 @@ async fn get_log_from_handle_with_mutex_held<'a>(
mark_dirty_arc,
snapshot_cache,
);
let fragment_publisher_factory = maybe_wrap_fragment_manager_factory(
fragment_publisher_factory,
fragment_upload_fault_injector.as_ref().map(Arc::clone),
);
let opened = LogWriter::open_or_initialize(
write_options.clone(),
"log writer",
Expand Down Expand Up @@ -1115,6 +1186,19 @@ impl LogServer {
.storage)
}

fn fragment_upload_fault_injector(&self) -> Option<Arc<dyn FragmentUploadFaultInjector>> {
#[cfg(feature = "faults")]
{
Some(Arc::new(LogServiceFragmentUploadFaultInjector::new(
Arc::clone(&self.faults),
)))
}
#[cfg(not(feature = "faults"))]
{
None
}
}

fn snapshot_cache_for_collection(
&self,
collection_id: CollectionUuid,
Expand Down Expand Up @@ -1155,6 +1239,7 @@ impl LogServer {
topology_name,
collection_id,
snapshot_cache,
self.fragment_upload_fault_injector(),
);
ctx.make_log_reader(&self.config.writer, &self.config.reader)
.await
Expand Down Expand Up @@ -1309,6 +1394,7 @@ impl LogServer {
&storage_prefix,
mark_dirty,
snapshot_cache,
self.fragment_upload_fault_injector(),
None, // Offset updates don't use CMEK
)
.await
Expand Down Expand Up @@ -2089,6 +2175,7 @@ impl LogServer {
&prefix,
mark_dirty,
snapshot_cache,
self.fragment_upload_fault_injector(),
cmek,
)
.await
Expand Down Expand Up @@ -2541,6 +2628,7 @@ impl LogServer {
topology_name.as_ref(),
target_collection_id,
snapshot_cache,
self.fragment_upload_fault_injector(),
);
target_ctx
.fork_to_target(
Expand Down Expand Up @@ -2926,6 +3014,7 @@ impl LogServer {
&prefix,
mark_dirty,
snapshot_cache,
self.fragment_upload_fault_injector(),
None, // GC doesn't use CMEK
)
.await
Expand Down Expand Up @@ -5088,6 +5177,50 @@ mod tests {
}
}

#[cfg(feature = "faults")]
#[tokio::test]
async fn fragment_upload_fault_injection_rejects_then_recovers() {
let (ctor, dtor) = s3_setup_log_server();
let log_server = ctor.await;
let collection_id = CollectionUuid::new();
let make_request = || PushLogsRequest {
collection_id: collection_id.to_string(),
records: vec![OperationRecord {
id: "fault-test".to_string(),
embedding: None,
encoding: None,
metadata: None,
document: None,
operation: Operation::Delete,
}
.try_into()
.expect("operation record should convert to proto")],
cmek: None,
database_name: "default_database".to_string(),
};

log_server.faults.inject(
chroma_faults::FaultSelectorKind::Label(FRAGMENT_UPLOAD_FAULT_LABEL.to_string()),
chroma_faults::FaultActionKind::Unavailable,
);

let err = log_server
.push_logs(Request::new(make_request()))
.await
.expect_err("fault injection should reject fragment upload");
assert_eq!(err.code(), Code::Unavailable);

log_server.faults.clear_all();

let response = log_server
.push_logs(Request::new(make_request()))
.await
.expect("write should succeed after clearing injected fault");
assert_eq!(response.into_inner().record_count, 1);

dtor.await;
}

async fn validate_log_on_server(
server: &LogServer,
db_name: &str,
Expand Down
Loading
Loading