Skip to content

Commit 65c39a0

Browse files
hanabi1224Copilot
andauthored
refactor: optimize cache key type for resolve_to_key (#6961)
Co-authored-by: Copilot <copilot@github.com>
1 parent 605a179 commit 65c39a0

7 files changed

Lines changed: 82 additions & 62 deletions

File tree

src/message_pool/msgpool/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
1616
use crate::message::{MessageRead as _, SignedMessage};
1717
use crate::networks::ChainConfig;
1818
use crate::shim::{address::Address, crypto::Signature};
19+
use crate::state_manager::IdToAddressCache;
1920
use crate::utils::ShallowClone as _;
2021
use crate::utils::cache::SizeTrackingLruCache;
2122
use crate::utils::get_size::CidWrapper;
@@ -52,7 +53,7 @@ async fn republish_pending_messages<T>(
5253
cur_tipset: &SyncRwLock<Tipset>,
5354
republished: &SyncRwLock<HashSet<Cid>>,
5455
local_addrs: &SyncRwLock<Vec<Address>>,
55-
key_cache: &SizeTrackingLruCache<Address, Address>,
56+
key_cache: &IdToAddressCache,
5657
chain_config: &ChainConfig,
5758
) -> Result<(), Error>
5859
where
@@ -223,7 +224,7 @@ pub async fn head_change<T>(
223224
republished: &SyncRwLock<HashSet<Cid>>,
224225
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
225226
cur_tipset: &SyncRwLock<Tipset>,
226-
key_cache: &SizeTrackingLruCache<Address, Address>,
227+
key_cache: &IdToAddressCache,
227228
state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
228229
revert: Vec<Tipset>,
229230
apply: Vec<Tipset>,
@@ -326,7 +327,7 @@ where
326327

327328
pub(in crate::message_pool) struct MpoolCtx<'a, T> {
328329
pub api: &'a T,
329-
pub key_cache: &'a SizeTrackingLruCache<Address, Address>,
330+
pub key_cache: &'a IdToAddressCache,
330331
pub pending: &'a SyncRwLock<HashMap<Address, MsgSet>>,
331332
pub ts: &'a Tipset,
332333
}

src/message_pool/msgpool/msg_pool.rs

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use crate::shim::{
2323
econ::TokenAmount,
2424
gas::{Gas, price_list_by_network_version},
2525
};
26+
use crate::state_manager::IdToAddressCache;
2627
use crate::state_manager::utils::is_valid_for_sending;
28+
use crate::utils::ShallowClone as _;
2729
use crate::utils::cache::SizeTrackingLruCache;
2830
use crate::utils::get_size::CidWrapper;
2931
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
@@ -280,13 +282,13 @@ pub struct MessagePool<T> {
280282
/// Sender half to send messages to other components
281283
pub network_sender: flume::Sender<NetworkMessage>,
282284
/// A cache for BLS signature keyed by Cid
283-
pub bls_sig_cache: Arc<SizeTrackingLruCache<CidWrapper, Signature>>,
285+
pub bls_sig_cache: SizeTrackingLruCache<CidWrapper, Signature>,
284286
/// A cache for BLS signature keyed by Cid
285-
pub sig_val_cache: Arc<SizeTrackingLruCache<CidWrapper, ()>>,
286-
/// Cache for ID address to key address resolution.
287-
pub key_cache: Arc<SizeTrackingLruCache<Address, Address>>,
287+
pub sig_val_cache: SizeTrackingLruCache<CidWrapper, ()>,
288+
/// Cache for ID address ID to key address resolution.
289+
pub key_cache: IdToAddressCache,
288290
/// Cache for state nonce lookups keyed by (`TipsetKey`, `Address`).
289-
pub state_nonce_cache: Arc<SizeTrackingLruCache<StateNonceCacheKey, u64>>,
291+
pub state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64>,
290292
/// A set of republished messages identified by their Cid
291293
pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
292294
/// Acts as a signal to republish messages from the republished set of
@@ -303,25 +305,27 @@ pub struct MessagePool<T> {
303305
/// Non-ID addresses are returned unchanged.
304306
pub(in crate::message_pool) fn resolve_to_key<T: Provider>(
305307
api: &T,
306-
key_cache: &SizeTrackingLruCache<Address, Address>,
308+
key_cache: &IdToAddressCache,
307309
addr: &Address,
308310
cur_ts: &Tipset,
309311
) -> Result<Address, Error> {
310-
if addr.protocol() != Protocol::ID {
311-
return Ok(*addr);
312-
}
313-
if let Some(resolved) = key_cache.get_cloned(addr) {
312+
let id = addr.id().ok();
313+
if let Some(id) = &id
314+
&& let Some(resolved) = key_cache.get_cloned(id)
315+
{
314316
return Ok(resolved);
315317
}
316-
let resolved = api.resolve_to_key(addr, cur_ts)?;
317-
key_cache.push(*addr, resolved);
318+
let resolved = api.resolve_to_deterministic_address_at_finality(addr, cur_ts)?;
319+
if let Some(id) = id {
320+
key_cache.push(id, resolved);
321+
}
318322
Ok(resolved)
319323
}
320324

321325
/// Get the state nonce for an address, accounting for messages already included in `cur_ts`.
322326
pub(in crate::message_pool) fn get_state_sequence<T: Provider>(
323327
api: &T,
324-
key_cache: &SizeTrackingLruCache<Address, Address>,
328+
key_cache: &IdToAddressCache,
325329
state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
326330
addr: &Address,
327331
cur_ts: &Tipset,
@@ -371,7 +375,7 @@ where
371375
}
372376

373377
pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result<Address, Error> {
374-
resolve_to_key(self.api.as_ref(), self.key_cache.as_ref(), addr, cur_ts)
378+
resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts)
375379
}
376380

377381
/// Add a signed message to the pool and its address.
@@ -531,9 +535,9 @@ where
531535
let cur_ts = self.current_tipset();
532536
add_helper(
533537
self.api.as_ref(),
534-
self.bls_sig_cache.as_ref(),
538+
&self.bls_sig_cache,
535539
self.pending.as_ref(),
536-
self.key_cache.as_ref(),
540+
&self.key_cache,
537541
&cur_ts,
538542
msg,
539543
self.get_state_sequence(&from, &cur_ts)?,
@@ -570,8 +574,8 @@ where
570574
fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
571575
get_state_sequence(
572576
self.api.as_ref(),
573-
self.key_cache.as_ref(),
574-
self.state_nonce_cache.as_ref(),
577+
&self.key_cache,
578+
&self.state_nonce_cache,
575579
addr,
576580
cur_ts,
577581
)
@@ -640,7 +644,7 @@ where
640644

641645
msg_vec.append(smsgs.as_mut());
642646
for msg in umsg {
643-
let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?;
647+
let smsg = recover_sig(&self.bls_sig_cache, msg)?;
644648
msg_vec.push(smsg)
645649
}
646650
}
@@ -690,13 +694,13 @@ where
690694
{
691695
head_change(
692696
self.api.as_ref(),
693-
self.bls_sig_cache.as_ref(),
697+
&self.bls_sig_cache,
694698
self.repub_trigger.clone(),
695699
self.republished.as_ref(),
696700
self.pending.as_ref(),
697701
self.cur_tipset.as_ref(),
698-
self.key_cache.as_ref(),
699-
self.state_nonce_cache.as_ref(),
702+
&self.key_cache,
703+
&self.state_nonce_cache,
700704
revert,
701705
apply,
702706
)
@@ -722,22 +726,13 @@ where
722726
let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
723727
let pending = Arc::new(SyncRwLock::new(HashMap::new()));
724728
let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
725-
let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
726-
"bls_sig".into(),
727-
BLS_SIG_CACHE_SIZE,
728-
));
729-
let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
730-
"sig_val".into(),
731-
SIG_VAL_CACHE_SIZE,
732-
));
733-
let key_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
734-
"mpool_key".into(),
735-
KEY_CACHE_SIZE,
736-
));
737-
let state_nonce_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
738-
"state_nonce".into(),
739-
STATE_NONCE_CACHE_SIZE,
740-
));
729+
let bls_sig_cache =
730+
SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE);
731+
let sig_val_cache =
732+
SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE);
733+
let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE);
734+
let state_nonce_cache =
735+
SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE);
741736
let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
742737
let republished = Arc::new(SyncRwLock::new(HashSet::new()));
743738
let block_delay = chain_config.block_delay_secs;
@@ -765,11 +760,11 @@ where
765760
let mut head_changes_rx = mp.api.subscribe_head_changes();
766761

767762
let api = mp.api.clone();
768-
let bls_sig_cache = mp.bls_sig_cache.clone();
763+
let bls_sig_cache = mp.bls_sig_cache.shallow_clone();
769764
let pending = mp.pending.clone();
770765
let republished = mp.republished.clone();
771-
let key_cache = mp.key_cache.clone();
772-
let state_nonce_cache = mp.state_nonce_cache.clone();
766+
let key_cache = mp.key_cache.shallow_clone();
767+
let state_nonce_cache = mp.state_nonce_cache.shallow_clone();
773768

774769
let current_ts = mp.cur_tipset.clone();
775770
let repub_trigger = mp.repub_trigger.clone();
@@ -781,13 +776,13 @@ where
781776
Ok(HeadChanges { reverts, applies }) => {
782777
if let Err(e) = head_change(
783778
api.as_ref(),
784-
bls_sig_cache.as_ref(),
779+
&bls_sig_cache,
785780
repub_trigger.clone(),
786781
republished.as_ref(),
787782
pending.as_ref(),
788783
&current_ts,
789-
key_cache.as_ref(),
790-
state_nonce_cache.as_ref(),
784+
&key_cache,
785+
&state_nonce_cache,
791786
reverts,
792787
applies,
793788
)
@@ -811,7 +806,7 @@ where
811806
let cur_tipset = mp.cur_tipset.clone();
812807
let republished = mp.republished.clone();
813808
let local_addrs = mp.local_addrs.clone();
814-
let key_cache = mp.key_cache.clone();
809+
let key_cache = mp.key_cache.shallow_clone();
815810
let network_sender = Arc::new(mp.network_sender.clone());
816811
let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs);
817812
// Reacts to republishing requests
@@ -830,7 +825,7 @@ where
830825
cur_tipset.as_ref(),
831826
republished.as_ref(),
832827
local_addrs.as_ref(),
833-
key_cache.as_ref(),
828+
&key_cache,
834829
&chain_config,
835830
)
836831
.await
@@ -854,7 +849,7 @@ pub(in crate::message_pool) fn add_helper<T>(
854849
api: &T,
855850
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
856851
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
857-
key_cache: &SizeTrackingLruCache<Address, Address>,
852+
key_cache: &IdToAddressCache,
858853
cur_ts: &Tipset,
859854
msg: SignedMessage,
860855
sequence: u64,
@@ -1572,11 +1567,16 @@ mod tests {
15721567
.unwrap();
15731568

15741569
// f0300 exists in lookback state (root_a) → resolves successfully.
1575-
let result = Provider::resolve_to_key(&cs, &Address::new_id(300), &head).unwrap();
1570+
let result = Provider::resolve_to_deterministic_address_at_finality(
1571+
&cs,
1572+
&Address::new_id(300),
1573+
&head,
1574+
)
1575+
.unwrap();
15761576
assert_eq!(result, bls_a);
15771577

15781578
// f0400 exists only in head state (root_b), not in lookback → fails.
1579-
Provider::resolve_to_key(&cs, &Address::new_id(400), &head)
1579+
Provider::resolve_to_deterministic_address_at_finality(&cs, &Address::new_id(400), &head)
15801580
.expect_err("actor only in head state must not resolve via finality lookback");
15811581
}
15821582
}

src/message_pool/msgpool/provider.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,13 @@ pub trait Provider {
4848
fn load_tipset(&self, tsk: &TipsetKey) -> Result<Tipset, Error>;
4949
/// Computes the base fee
5050
fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error>;
51-
/// Resolve an address to its key form using the tipset's parent state.
52-
fn resolve_to_key(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error>;
51+
/// Similar to [`crate::state_manager::StateManager::resolve_to_deterministic_address`] but fails if the ID address being resolved isn't reorg-stable yet.
52+
/// It should not be used for consensus-critical subsystems.
53+
fn resolve_to_deterministic_address_at_finality(
54+
&self,
55+
addr: &Address,
56+
ts: &Tipset,
57+
) -> Result<Address, Error>;
5358
/// Return all messages included in the given tipset.
5459
fn messages_for_tipset(&self, ts: &Tipset) -> Result<Arc<Vec<ChainMessage>>, Error>;
5560
// Get max number of messages per actor in the pool
@@ -103,9 +108,11 @@ impl<DB: Blockstore> Provider for ChainStore<DB> {
103108
.map_err(|err| err.into())
104109
}
105110

106-
/// Resolves an address to its deterministic key form using the state at
107-
/// finality look-back, This ensures the resolved address is reorg-stable.
108-
fn resolve_to_key(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
111+
fn resolve_to_deterministic_address_at_finality(
112+
&self,
113+
addr: &Address,
114+
ts: &Tipset,
115+
) -> Result<Address, Error> {
109116
match addr.protocol() {
110117
BLS | Secp256k1 | Delegated => Ok(*addr),
111118
Actor => Err(Error::Other(
@@ -121,6 +128,7 @@ impl<DB: Blockstore> Provider for ChainStore<DB> {
121128
)
122129
.map_err(|e| Error::Other(e.to_string()))?
123130
} else {
131+
// Matches the logic at <https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/stmgr/stmgr.go#L361>
124132
ts.clone()
125133
};
126134

src/message_pool/msgpool/selection.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::message::{MessageRead as _, SignedMessage};
1313
use crate::message_pool::msg_chain::MsgChainNode;
1414
use crate::shim::crypto::SignatureType;
1515
use crate::shim::{address::Address, econ::TokenAmount};
16+
use crate::state_manager::IdToAddressCache;
1617
use ahash::{HashMap, HashMapExt};
1718
use anyhow::{Context, bail, ensure};
1819
use parking_lot::RwLock;
@@ -665,9 +666,9 @@ where
665666
// Run head change to do reorg detection
666667
run_head_change(
667668
self.api.as_ref(),
668-
self.bls_sig_cache.as_ref(),
669+
&self.bls_sig_cache,
669670
&self.pending,
670-
self.key_cache.as_ref(),
671+
&self.key_cache,
671672
cur_ts.clone(),
672673
ts.clone(),
673674
&mut result,
@@ -820,7 +821,7 @@ pub(in crate::message_pool) fn run_head_change<T>(
820821
api: &T,
821822
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
822823
pending: &RwLock<HashMap<Address, MsgSet>>,
823-
key_cache: &SizeTrackingLruCache<Address, Address>,
824+
key_cache: &IdToAddressCache,
824825
from: Tipset,
825826
to: Tipset,
826827
rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,

src/message_pool/msgpool/test_provider.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,11 @@ impl Provider for TestApi {
226226
Ok(TokenAmount::from_atto(100))
227227
}
228228

229-
fn resolve_to_key(&self, addr: &Address, _ts: &Tipset) -> Result<Address, Error> {
229+
fn resolve_to_deterministic_address_at_finality(
230+
&self,
231+
addr: &Address,
232+
_ts: &Tipset,
233+
) -> Result<Address, Error> {
230234
Ok(self.inner.lock().resolve_addr(addr))
231235
}
232236

src/shim/address.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::sync::{
1919
atomic::{AtomicU8, Ordering},
2020
};
2121

22+
pub type AddressId = u64;
23+
2224
/// Zero address used to avoid allowing it to be used for verification.
2325
/// This is intentionally disallowed because it is an edge case with Filecoin's BLS
2426
/// signature verification.

src/state_manager/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::shim::actors::init::{self, State};
3232
use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
3333
use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
3434
use crate::shim::actors::*;
35+
use crate::shim::address::AddressId;
3536
use crate::shim::crypto::{Signature, SignatureType};
3637
use crate::shim::{
3738
actors::{
@@ -87,6 +88,7 @@ use tracing::{error, info, instrument, warn};
8788
const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
8889
const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
8990
pub const EVENTS_AMT_BITWIDTH: u32 = 5;
91+
pub type IdToAddressCache = SizeTrackingLruCache<AddressId, Address>;
9092

9193
/// Result of executing an individual chain message in a tipset.
9294
///
@@ -192,7 +194,7 @@ pub struct StateManager<DB> {
192194
cs: Arc<ChainStore<DB>>,
193195
/// This is a cache which indexes tipsets to their calculated state output (state root, receipt root).
194196
cache: TipsetStateCache<ExecutedTipset>,
195-
id_to_deterministic_address_cache: SizeTrackingLruCache<u64, Address>,
197+
id_to_deterministic_address_cache: IdToAddressCache,
196198
beacon: Arc<crate::beacon::BeaconSchedule>,
197199
engine: Arc<MultiEngine>,
198200
}
@@ -1788,6 +1790,8 @@ where
17881790
state.verified_client_data_cap(self.blockstore(), id)
17891791
}
17901792

1793+
/// Similar to [`StateTree::resolve_to_deterministic_addr`] but does not allow [`crate::shim::address::Protocol::Actor`] type of addresses.
1794+
/// Uses the [`Tipset`] `ts` to generate the VM state.
17911795
pub async fn resolve_to_deterministic_address(
17921796
self: &Arc<Self>,
17931797
address: Address,

0 commit comments

Comments
 (0)