Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CongestionTracker: #21164

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::congestion_tracker::CongestionTracker;
use crate::consensus_adapter::ConsensusOverloadChecker;
use crate::execution_cache::ExecutionCacheTraitPointers;
use crate::execution_cache::TransactionCacheRead;
Expand Down Expand Up @@ -831,6 +832,8 @@ pub struct AuthorityState {

/// The chain identifier is derived from the digest of the genesis checkpoint.
chain_identifier: ChainIdentifier,

pub(crate) congestion_tracker: Arc<CongestionTracker>,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -2938,6 +2941,7 @@ impl AuthorityState {
overload_info: AuthorityOverloadInfo::default(),
validator_tx_finalizer,
chain_identifier,
congestion_tracker: Arc::new(CongestionTracker::new()),
});

// Start a task to execute ready certificates.
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,14 @@ async fn finalize_checkpoint(
debug!("finalizing checkpoint");
epoch_store.insert_finalized_transactions(tx_digests, checkpoint.sequence_number)?;

if state.is_fullnode(epoch_store) {
state.congestion_tracker.process_checkpoint_effects(
transaction_cache_reader,
&checkpoint,
&effects,
);
}
Comment on lines +1394 to +1400
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't care that this won't be called in-order right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want you can move this logic down into where we already have a checkpoint data loaded in order to eliminate redundant loads that'll happen here as well as down there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not worried about the out of order-ness - we already use the checkpoint timestamps to ensure we have recent data, and up-to-date FNs will naturally process checkpoints in order anyway. If the FN isn't up to date the data will be useless no matter what.


// TODO remove once we no longer need to support this table for read RPC
state
.get_checkpoint_cache()
Expand Down
334 changes: 334 additions & 0 deletions crates/sui-core/src/congestion_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use moka::ops::compute::Op;
use moka::sync::Cache;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use sui_types::base_types::ObjectID;
use sui_types::effects::{InputSharedObject, TransactionEffects, TransactionEffectsAPI};
use sui_types::execution_status::CongestedObjects;
use sui_types::messages_checkpoint::{CheckpointTimestamp, VerifiedCheckpoint};
use sui_types::transaction::{TransactionData, TransactionDataAPI};

use crate::execution_cache::TransactionCacheRead;

#[derive(Clone, Copy, Debug)]
pub struct CongestionInfo {
pub last_cancellation_time: CheckpointTimestamp,

pub highest_cancelled_gas_price: u64,

pub last_success_time: Option<CheckpointTimestamp>,
pub lowest_executed_gas_price: Option<u64>,
}

impl CongestionInfo {
/// Update the congestion info with the latest congestion info from a new checkpoint
fn update_for_new_checkpoint(&mut self, new: &CongestionInfo) {
// If there are more recent cancellations, we need to know the latest highest
// cancelled price
if new.last_cancellation_time > self.last_cancellation_time {
self.last_cancellation_time = new.last_cancellation_time;
self.highest_cancelled_gas_price = new.highest_cancelled_gas_price;
}
// If there are more recent successful transactions, we need to know the latest lowest
// executed price
if new.last_success_time > self.last_success_time {
self.last_success_time = new.last_success_time;
self.lowest_executed_gas_price = new.lowest_executed_gas_price;
}
}

fn update_for_cancellation(&mut self, now: CheckpointTimestamp, gas_price: u64) {
self.last_cancellation_time = now;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does now ever differ from self.last_cancellation_time? Seems this is only called in the scope of one checkpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can differ because there may not be any cancellations in the most recently processed checkpoint, in which case this wouldn't be called.

self.highest_cancelled_gas_price =
std::cmp::max(self.highest_cancelled_gas_price, gas_price);
}

fn update_for_success(&mut self, now: CheckpointTimestamp, gas_price: u64) {
self.last_success_time = Some(now);
self.lowest_executed_gas_price = Some(match self.lowest_executed_gas_price {
Some(current_min) => std::cmp::min(current_min, gas_price),
None => gas_price,
});
}
}

pub struct CongestionTracker {
pub congestion_clearing_prices: Cache<ObjectID, CongestionInfo>,
}

impl Default for CongestionTracker {
fn default() -> Self {
Self::new()
}
}

impl CongestionTracker {
pub fn new() -> Self {
Self {
congestion_clearing_prices: Cache::new(10_000),
}
}

pub fn process_checkpoint_effects(
&self,
transaction_cache_reader: &dyn TransactionCacheRead,
checkpoint: &VerifiedCheckpoint,
effects: &[TransactionEffects],
) {
let mut congestion_events = Vec::with_capacity(effects.len());
let mut cleared_events = Vec::with_capacity(effects.len());

for effect in effects {
let gas_price = transaction_cache_reader
.get_transaction_block(effect.transaction_digest())
.unwrap()
.transaction_data()
.gas_price();
if let Some(CongestedObjects(congested_objects)) =
effect.status().get_congested_objects()
{
congestion_events.push((gas_price, congested_objects.clone()));
} else {
cleared_events.push((
gas_price,
effect
.input_shared_objects()
.into_iter()
.filter_map(|object| match object {
InputSharedObject::Mutate((id, _, _)) => Some(id),
InputSharedObject::Cancelled(_, _)
| InputSharedObject::ReadOnly(_)
| InputSharedObject::ReadDeleted(_, _)
| InputSharedObject::MutateDeleted(_, _) => None,
halfprice marked this conversation as resolved.
Show resolved Hide resolved
})
.collect::<Vec<_>>(),
));
}
}
Comment on lines +94 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we only care if we don't have congested objects, not just that the txn succeeded (which is why this logic doesn't actually look at the success status)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

congested objects indicates that the tx was aborted due to congestion.


self.process_per_checkpoint_events(
checkpoint.timestamp_ms,
&congestion_events,
&cleared_events,
);
}

/// For all the mutable shared inputs, get the highest minimum clearing price (if any exists)
/// and the lowest maximum cancelled price.
pub fn get_suggested_gas_prices(&self, transaction: &TransactionData) -> Option<u64> {
self.get_suggested_gas_price_for_objects(
transaction
.shared_input_objects()
.into_iter()
.filter(|id| id.mutable)
.map(|id| id.id),
)
}
}

impl CongestionTracker {
fn process_per_checkpoint_events(
&self,
now: CheckpointTimestamp,
congestion_events: &[(u64, Vec<ObjectID>)],
cleared_events: &[(u64, Vec<ObjectID>)],
) {
let congestion_info_map =
self.compute_per_checkpoint_congestion_info(now, congestion_events, cleared_events);
self.process_checkpoint_congestion(congestion_info_map);
}

fn get_suggested_gas_price_for_objects(
&self,
objects: impl Iterator<Item = ObjectID>,
) -> Option<u64> {
let mut clearing_price = None;
for object_id in objects {
if let Some(info) = self.get_congestion_info(object_id) {
let clearing_price_for_object = match info
.last_success_time
.cmp(&Some(info.last_cancellation_time))
{
std::cmp::Ordering::Greater => {
// there were no cancellations in the most recent checkpoint,
// so the object is probably not congested any more
None
}
std::cmp::Ordering::Less => {
// there were no successes in the most recent checkpoint. This should be a rare case,
// but we know we will have to bid at least as much as the highest cancelled price.
Some(info.highest_cancelled_gas_price)
}
std::cmp::Ordering::Equal => {
// there were both successes and cancellations.
info.lowest_executed_gas_price
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can highest_cancelled_gas_price > lowest_executed_gas_price? If so, I wonder if this should be max(lowest_executed_gas_price, highest_cancelled_gas_price)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the last success and last cancel time indicate that the events are from the same checkpoint. So no, I don't believe the highest cancelled could be higher than the lowest executed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I guess it could, because a transaction could have been cancelled because it was also using some other even more expensive object. But in this case lowest executed is still probably the right thing.

}
};
clearing_price = std::cmp::max(clearing_price, clearing_price_for_object);
}
}
clearing_price
}

fn compute_per_checkpoint_congestion_info(
&self,
now: CheckpointTimestamp,
congestion_events: &[(u64, Vec<ObjectID>)],
cleared_events: &[(u64, Vec<ObjectID>)],
) -> HashMap<ObjectID, CongestionInfo> {
let mut congestion_info_map: HashMap<ObjectID, CongestionInfo> = HashMap::new();

for (gas_price, objects) in congestion_events {
for object in objects {
match congestion_info_map.entry(*object) {
Entry::Occupied(entry) => {
entry.into_mut().update_for_cancellation(now, *gas_price);
}
Entry::Vacant(entry) => {
let info = CongestionInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this branch take self.get_congestion_info(*object) into consideration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so since that would be from the previous checkpoint. We consult it in the second loop to determine whether cleared events should be added, as explained in the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If previous checkpoint is not considered, meaning all the CongestionInfo are just created in this loop, does "now" ever differ from self.last_cancellation_time when calling update_for_cancellation (from my other comment)? Or do I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure I understand the question - I'm setting last_cancellation_time equal to now so no it would not differ?

last_cancellation_time: now,
highest_cancelled_gas_price: *gas_price,
last_success_time: None,
lowest_executed_gas_price: None,
};

entry.insert(info);
}
}
}
}

for (gas_price, objects) in cleared_events {
for object in objects {
// We only record clearing prices if the object has observed cancellations recently
match congestion_info_map.entry(*object) {
Entry::Occupied(entry) => {
entry.into_mut().update_for_success(now, *gas_price);
}
Entry::Vacant(entry) => {
if let Some(prev) = self.get_congestion_info(*object) {
let info = CongestionInfo {
last_cancellation_time: prev.last_cancellation_time,
highest_cancelled_gas_price: prev.highest_cancelled_gas_price,
last_success_time: Some(now),
lowest_executed_gas_price: Some(*gas_price),
};
entry.insert(info);
}
}
}
}
}

congestion_info_map
}

fn process_checkpoint_congestion(
&self,
congestion_info_map: HashMap<ObjectID, CongestionInfo>,
) {
for (object_id, info) in congestion_info_map {
self.congestion_clearing_prices
.entry(object_id)
.and_compute_with(|maybe_entry| {
if let Some(e) = maybe_entry {
let mut e = e.into_value();
e.update_for_new_checkpoint(&info);
Op::Put(e)
} else {
Op::Put(info)
}
});
}
}

fn get_congestion_info(&self, object_id: ObjectID) -> Option<CongestionInfo> {
self.congestion_clearing_prices.get(&object_id)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_process_events_new_congestion() {
let tracker = CongestionTracker::new();
let obj1 = ObjectID::random();
let obj2 = ObjectID::random();
let now = 1000;

tracker.process_per_checkpoint_events(now, &[(100, vec![obj1]), (200, vec![obj2])], &[]);

assert_eq!(
tracker.get_suggested_gas_price_for_objects(vec![obj1].into_iter()),
Some(100)
);
assert_eq!(
tracker.get_suggested_gas_price_for_objects(vec![obj2].into_iter()),
Some(200)
);
}

#[test]
fn test_process_events_congestion_then_success() {
let tracker = CongestionTracker::new();
let obj = ObjectID::random();

// Cancellations only, no successes. Highest cancelled price is used.
tracker.process_per_checkpoint_events(1000, &[(100, vec![obj]), (75, vec![obj])], &[]);
assert_eq!(
tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
Some(100)
);

// No cancellations in last checkpoint, so no congestion
tracker.process_per_checkpoint_events(2000, &[], &[(150, vec![obj])]);
assert_eq!(
tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
None,
);

// next checkpoint has cancellations and successes, so the lowest success price is used.
tracker.process_per_checkpoint_events(
3000,
&[(100, vec![obj])],
&[(175, vec![obj]), (125, vec![obj])],
);
assert_eq!(
tracker.get_suggested_gas_price_for_objects(vec![obj].into_iter()),
Some(125)
);
}

#[test]
fn test_get_suggested_gas_price_multiple_objects() {
let tracker = CongestionTracker::new();
let obj1 = ObjectID::random();
let obj2 = ObjectID::random();

// Process different congestion events
tracker.process_per_checkpoint_events(1000, &[(100, vec![obj1]), (200, vec![obj2])], &[]);

// Should suggest highest congestion price
assert_eq!(
tracker.get_suggested_gas_price_for_objects(vec![obj1, obj2].into_iter()),
Some(200)
);

// Process different congestion events
tracker.process_per_checkpoint_events(
2000,
&[(100, vec![obj1]), (200, vec![obj2])],
&[(100, vec![obj1]), (150, vec![obj2])],
);
// Should suggest the highest lowest success price
assert_eq!(
tracker.get_suggested_gas_price_for_objects(vec![obj1, obj2].into_iter()),
Some(150)
);
}
}
1 change: 1 addition & 0 deletions crates/sui-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod authority_aggregator;
pub mod authority_client;
pub mod authority_server;
pub mod checkpoints;
pub mod congestion_tracker;
pub mod consensus_adapter;
pub mod consensus_handler;
pub mod consensus_manager;
Expand Down
Loading
Loading