Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/api-public/src/runner_configs/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async fn upsert_inner(
)
.await
{
tracing::warn!(?err, runner_name = ?path.runner_name, "failed to refresh runner config metadata");
tracing::warn!(?err, runner_name=?path.runner_name, "failed to refresh runner config metadata");
}
} else {
tracing::debug!("endpoint config unchanged, skipping metadata refresh");
Expand Down
6 changes: 1 addition & 5 deletions engine/packages/cache-purge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ use universalpubsub::NextOutput;

#[tracing::instrument(skip_all)]
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
tracing::info!("starting cache purge subscriber service");

// Subscribe to cache purge updates
let ups = pools.ups()?;
let mut sub = ups.subscribe(CACHE_PURGE_TOPIC).await?;

tracing::info!(subject = ?CACHE_PURGE_TOPIC, "subscribed to cache purge updates");
tracing::info!(subject=?CACHE_PURGE_TOPIC, "subscribed to cache purge updates");

// Get cache instance
let cache = rivet_cache::CacheInner::from_env(&config, pools)?;
Expand Down Expand Up @@ -42,7 +40,5 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
}
}

tracing::warn!("cache purge subscriber service stopped");

Ok(())
}
2 changes: 1 addition & 1 deletion engine/packages/engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main_inner() -> Result<()> {

// Load config
let config = rivet_config::Config::load(&cli.config).await?;
tracing::info!(config = ?*config, "loaded config");
tracing::info!(config=?*config, "loaded config");

// Initialize telemetry (does nothing if telemetry is disabled)
let _guard = rivet_telemetry::init(&config);
Expand Down
8 changes: 4 additions & 4 deletions engine/packages/epoxy/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
)
.collect::<FuturesUnordered<_>>()
.await;
tracing::info!(?quorum_size, len = ?responses.len(), ?quorum_type, "fanout quorum size");
tracing::debug!(?quorum_size, len = ?responses.len(), ?quorum_type, "fanout quorum size");

// Choose how many successful responses we need before considering a success
let target_responses = match quorum_type {
Expand Down Expand Up @@ -115,7 +115,7 @@ pub async fn send_message_to_address(
let to_replica_id = request.to_replica_id;

if from_replica_id == to_replica_id {
tracing::info!(
tracing::debug!(
to_replica = to_replica_id,
"sending message to replica directly"
);
Expand All @@ -126,7 +126,7 @@ pub async fn send_message_to_address(
let mut replica_url = url::Url::parse(&replica_url)?;
replica_url.set_path(&format!("/v{PROTOCOL_VERSION}/epoxy/message"));

tracing::info!(
tracing::debug!(
to_replica = to_replica_id,
%replica_url,
"sending message to replica via http"
Expand Down Expand Up @@ -183,7 +183,7 @@ pub async fn send_message_to_address(
let body = response.bytes().await?;
let response_body = versioned::Response::deserialize(&body)?;

tracing::info!(
tracing::debug!(
to_replica = to_replica_id,
"successfully sent message via http"
);
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/epoxy/src/ops/explicit_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn epoxy_explicit_prepare(
let replica_id = ctx.config().epoxy_replica_id();
let instance = &input.instance;

tracing::info!(
tracing::debug!(
?instance,
"starting explicit prepare for potentially failed replica"
);
Expand Down Expand Up @@ -317,7 +317,7 @@ async fn restart_phase1(
commands: commands.unwrap_or_else(|| vec![]), // Empty vec for no-op
};

tracing::info!(
tracing::debug!(
?instance,
commands_count = proposal.commands.len(),
"restarting phase1 with propose operation"
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/epoxy/src/replica/decide_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub fn decide_path(
pre_accept_oks: Vec<protocol::Payload>,
payload: &protocol::Payload,
) -> Result<protocol::Path> {
tracing::info!(instance=?payload.instance, "deciding path");
tracing::debug!(instance=?payload.instance, "deciding path");

let mut new_payload = payload.clone();
let mut path = protocol::Path::PathFast(protocol::PathFast {
Expand All @@ -29,7 +29,7 @@ pub fn decide_path(
// EPaxos Steps 11 (returns PathFast)
continue;
} else {
tracing::info!(?pre_accept_ok.deps, "received dissenting voice");
tracing::debug!(?pre_accept_ok.deps, "received dissenting voice");

// EPaxos Step 13
let new_deps = utils::union_deps(new_payload.deps, pre_accept_ok.deps);
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/epoxy/src/replica/lead_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub async fn lead_consensus(
replica_id: protocol::ReplicaId,
proposal: protocol::Proposal,
) -> Result<protocol::Payload> {
tracing::info!(?replica_id, "leading consensus");
tracing::debug!(?replica_id, "leading consensus");

// EPaxos Step 1
let instance_num_key = keys::replica::InstanceNumberKey;
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/epoxy/src/replica/message_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub async fn message_request(

let kind = match request.kind {
protocol::RequestKind::UpdateConfigRequest(req) => {
tracing::info!(
tracing::debug!(
epoch = ?req.config.epoch,
replica_count = req.config.replicas.len(),
"received configuration update request"
Expand Down Expand Up @@ -103,7 +103,7 @@ pub async fn message_request(
}
protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(req) => {
// Send signal to coordinator workflow
tracing::info!(
tracing::debug!(
?current_replica_id,
update_replica_id=?req.replica_id,
update_status=?req.status,
Expand All @@ -124,7 +124,7 @@ pub async fn message_request(
}
protocol::RequestKind::BeginLearningRequest(req) => {
// Send signal to replica workflow
tracing::info!(?current_replica_id, "received begin learning request");
tracing::debug!(?current_replica_id, "received begin learning request");

ctx.signal(crate::workflows::replica::BeginLearning {
config: req.config.clone().into(),
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/epoxy/src/replica/messages/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn accept(
instance,
} = accept_req.payload;

tracing::info!(?replica_id, ?instance, "handling accept message");
tracing::debug!(?replica_id, ?instance, "handling accept message");

// Validate ballot
let current_ballot = ballot::get_ballot(tx, replica_id).await?;
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/epoxy/src/replica/messages/accepted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub async fn accepted(
instance,
} = payload;

tracing::info!(?replica_id, ?instance, "handling accepted message");
tracing::debug!(?replica_id, ?instance, "handling accepted message");

// Create accepted log entry
let current_ballot = ballot::get_ballot(tx, replica_id).await?;
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/epoxy/src/replica/messages/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub async fn commit(
instance,
} = commit_req.payload;

tracing::info!(?replica_id, ?instance, "handling commit message");
tracing::debug!(?replica_id, ?instance, "handling commit message");

// EPaxos Step 24
let current_ballot = ballot::get_ballot(tx, replica_id).await?;
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/epoxy/src/replica/messages/committed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub async fn committed(
instance,
} = payload;

tracing::info!(?replica_id, ?instance, "handling committed message");
tracing::debug!(?replica_id, ?instance, "handling committed message");

// EPaxos Step 21: Create committed log entry
let current_ballot = ballot::get_ballot(tx, replica_id).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub async fn download_instances(
replica_id: ReplicaId,
req: protocol::DownloadInstancesRequest,
) -> Result<Vec<protocol::DownloadInstancesEntry>> {
tracing::info!(?replica_id, "handling download instances message");
tracing::debug!(?replica_id, "handling download instances message");

let mut entries = Vec::new();
let subspace = keys::subspace(replica_id);
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/epoxy/src/replica/messages/pre_accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub async fn pre_accept(
replica_id: protocol::ReplicaId,
pre_accept_req: protocol::PreAcceptRequest,
) -> Result<protocol::PreAcceptResponse> {
tracing::info!(?replica_id, "handling pre-accept message");
tracing::debug!(?replica_id, "handling pre-accept message");

let protocol::Payload {
proposal,
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/epoxy/src/replica/messages/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub async fn prepare(
replica_id: protocol::ReplicaId,
prepare_req: protocol::PrepareRequest,
) -> Result<protocol::PrepareResponse> {
tracing::info!(?replica_id, "handling prepare message");
tracing::debug!(?replica_id, "handling prepare message");

let protocol::PrepareRequest { ballot, instance } = prepare_req;

Expand Down
22 changes: 11 additions & 11 deletions engine/packages/epoxy/src/workflows/coordinator/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub async fn check_config_changes(
ctx: &ActivityCtx,
_input: &CheckConfigChangesInput,
) -> Result<Option<ConfigChange>> {
tracing::info!("checking for config changes");
tracing::debug!("checking for config changes");

let state = ctx.state::<State>()?;

Expand Down Expand Up @@ -107,7 +107,7 @@ pub async fn check_config_changes(
.collect();

if new_replicas.is_empty() {
tracing::info!("no new replicas found");
tracing::debug!("no new replicas found");
return Ok(None);
}

Expand Down Expand Up @@ -144,7 +144,7 @@ pub async fn health_check_new_replicas(
return Ok(false);
}

tracing::info!(
tracing::debug!(
new_replicas = ?input.new_replicas,
"health checking new replicas"
);
Expand All @@ -154,7 +154,7 @@ pub async fn health_check_new_replicas(
let replica_id = replica.replica_id;

async move {
tracing::info!(?replica_id, "sending health check to replica");
tracing::debug!(?replica_id, "sending health check to replica");

let from_replica_id = ctx.config().epoxy_replica_id();
let request = protocol::Request {
Expand All @@ -171,7 +171,7 @@ pub async fn health_check_new_replicas(
.await
.with_context(|| format!("health check failed for replica {}", replica_id))?;

tracing::info!(?replica_id, "health check successful");
tracing::debug!(?replica_id, "health check successful");
Ok(())
}
});
Expand Down Expand Up @@ -199,7 +199,7 @@ pub async fn add_replicas_as_joining(
state.config.replicas.push(replica.clone().into());
}

tracing::info!("added {} replicas as joining", input.new_replicas.len());
tracing::debug!("added {} replicas as joining", input.new_replicas.len());

// IMPORTANT: Do not increment epoch at this stage, despite what the EPaxos paper recommends.
// See epoxy/README.md for more details.
Expand Down Expand Up @@ -231,7 +231,7 @@ pub async fn send_begin_learning(
let config = config.clone();

async move {
tracing::info!(?replica_id, "sending begin learning to replica");
tracing::debug!(?replica_id, "sending begin learning to replica");

let request = protocol::Request {
from_replica_id: ctx.config().epoxy_replica_id(),
Expand All @@ -244,7 +244,7 @@ pub async fn send_begin_learning(
crate::http_client::send_message(&ApiCtx::new_from_activity(&ctx)?, &config, request)
.await?;

tracing::info!(?replica_id, "begin learning sent successfully");
tracing::debug!(?replica_id, "begin learning sent successfully");
Ok(())
}
});
Expand All @@ -266,21 +266,21 @@ fn should_abort_reconfigure(
.iter()
.find(|x| x.datacenter_label as u64 == replica.replica_id)
else {
tracing::info!(
tracing::debug!(
"config changed during reconfigure (replica removed), aborting reconfigure"
);
return Ok(true);
};

if url::Url::parse(&replica.api_peer_url)? != current_dc.peer_url {
tracing::info!(
tracing::debug!(
"config changed during reconfigure (api_peer_url changed), aborting reconfigure"
);
return Ok(true);
}

if url::Url::parse(&replica.guard_url)? != current_dc.public_url {
tracing::info!(
tracing::debug!(
"config changed during reconfigure (guard_url changed), aborting reconfigure"
);
return Ok(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ pub async fn update_replica_status(
// Update status
replica_state.status = input.new_status.clone().into();

tracing::info!(
replica_id = ?input.replica_id,
new_status = ?input.new_status,
tracing::debug!(
replica_id=?input.replica_id,
new_status=?input.new_status,
"updated replica status"
);

Expand All @@ -103,7 +103,7 @@ pub async fn increment_epoch(ctx: &ActivityCtx, _input: &IncrementEpochInput) ->

state.config.epoch += 1;

tracing::info!(new_epoch = state.config.epoch, "incremented epoch");
tracing::debug!(new_epoch = state.config.epoch, "incremented epoch");

Ok(())
}
Expand All @@ -119,7 +119,7 @@ pub async fn update_replica_urls(ctx: &ActivityCtx, _input: &UpdateReplicaUrlsIn
for replica in state.config.replicas.iter_mut() {
let Some(dc) = ctx.config().dc_for_label(replica.replica_id as u16) else {
tracing::warn!(
replica_id = ?replica.replica_id,
replica_id=?replica.replica_id,
"datacenter not found for replica, skipping url update"
);
continue;
Expand All @@ -128,10 +128,10 @@ pub async fn update_replica_urls(ctx: &ActivityCtx, _input: &UpdateReplicaUrlsIn
replica.api_peer_url = dc.peer_url.to_string();
replica.guard_url = dc.public_url.to_string();

tracing::info!(
replica_id = ?replica.replica_id,
api_peer_url = ?dc.peer_url,
guard_url = ?dc.public_url,
tracing::debug!(
replica_id=?replica.replica_id,
api_peer_url=?dc.peer_url,
guard_url=?dc.public_url,
"updated replica urls"
);
}
Expand All @@ -156,7 +156,7 @@ pub async fn notify_all_replicas(

let config: protocol::ClusterConfig = state.config.clone().into();

tracing::info!(
tracing::debug!(
epoch = config.epoch,
replica_count = config.replicas.len(),
"notifying all replicas of config change"
Expand All @@ -180,7 +180,7 @@ pub async fn notify_all_replicas(
.await
.with_context(|| format!("failed to update config for replica {}", replica_id))?;

tracing::info!(?replica_id, "config update sent");
tracing::debug!(?replica_id, "config update sent");
Ok(())
}
});
Expand Down
Loading
Loading