@@ -46,11 +46,17 @@ use tokio::{sync::Notify, task::JoinSet};
4646use tracing:: { debug, error, info, trace, warn} ;
4747
4848pub struct ChainFollower < DB > {
49+ /// Tasks
50+ tasks : Arc < Mutex < HashSet < SyncTask > > > ,
51+
52+ /// State machine
53+ state_machine : Arc < Mutex < SyncStateMachine < DB > > > ,
54+
4955 /// Syncing status of the chain
5056 pub sync_status : SyncStatus ,
5157
5258 /// manages retrieving and updates state objects
53- state_manager : Arc < StateManager < DB > > ,
59+ pub state_manager : Arc < StateManager < DB > > ,
5460
5561 /// Context to be able to send requests to P2P network
5662 pub network : SyncNetworkContext < DB > ,
@@ -93,17 +99,26 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
9399 ) -> Self {
94100 crate :: def_is_env_truthy!( cache_disabled, "FOREST_DISABLE_BAD_BLOCK_CACHE" ) ;
95101 let ( tipset_sender, tipset_receiver) = flume:: bounded ( 20 ) ;
102+ let tasks: Arc < Mutex < HashSet < SyncTask > > > = Arc :: new ( Mutex :: new ( HashSet :: default ( ) ) ) ;
103+ let bad_blocks = if cache_disabled ( ) {
104+ tracing:: warn!( "bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`" ) ;
105+ None
106+ } else {
107+ Some ( Default :: default ( ) )
108+ } ;
109+ let state_machine = Arc :: new ( Mutex :: new ( SyncStateMachine :: new (
110+ state_manager. chain_store ( ) . clone ( ) ,
111+ bad_blocks. clone ( ) ,
112+ stateless_mode,
113+ ) ) ) ;
96114 Self {
115+ tasks,
116+ state_machine,
97117 sync_status : Arc :: new ( RwLock :: new ( SyncStatusReport :: init ( ) ) ) ,
98118 state_manager,
99119 network,
100120 genesis,
101- bad_blocks : if cache_disabled ( ) {
102- tracing:: warn!( "bad block cache is disabled by `FOREST_DISABLE_BAD_BLOCK_CACHE`" ) ;
103- None
104- } else {
105- Some ( Default :: default ( ) )
106- } ,
121+ bad_blocks,
107122 net_handler,
108123 tipset_sender,
109124 tipset_receiver,
@@ -112,16 +127,37 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
112127 }
113128 }
114129
115- pub async fn run ( self ) -> anyhow:: Result < ( ) > {
130+ /// Reset inner states
131+ pub fn reset ( & self ) {
132+ let start = Instant :: now ( ) ;
133+ self . tasks . lock ( ) . clear ( ) ;
134+ self . state_manager
135+ . chain_store ( )
136+ . validated_blocks
137+ . lock ( )
138+ . clear ( ) ;
139+ self . state_machine . lock ( ) . tipsets . clear ( ) ;
140+ if let Some ( bad_blocks) = & self . bad_blocks {
141+ bad_blocks. clear ( ) ;
142+ }
143+ tracing:: info!(
144+ "chain follower reset, took {}" ,
145+ humantime:: format_duration( start. elapsed( ) )
146+ ) ;
147+ }
148+
149+ pub async fn run ( & self ) -> anyhow:: Result < ( ) > {
116150 chain_follower (
117- self . state_manager ,
118- self . bad_blocks ,
119- self . net_handler ,
120- self . tipset_receiver ,
121- self . network ,
122- self . mem_pool ,
123- self . sync_status ,
124- self . genesis ,
151+ & self . tasks ,
152+ & self . state_machine ,
153+ & self . state_manager ,
154+ self . bad_blocks . clone ( ) ,
155+ self . net_handler . clone ( ) ,
156+ self . tipset_receiver . clone ( ) ,
157+ & self . network ,
158+ & self . mem_pool ,
159+ & self . sync_status ,
160+ & self . genesis ,
125161 self . stateless_mode ,
126162 )
127163 . await
@@ -130,24 +166,21 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
130166
131167#[ allow( clippy:: too_many_arguments) ]
132168// We receive new full tipsets from the p2p swarm, and from miners that use Forest as their frontend.
133- pub async fn chain_follower < DB : Blockstore + Sync + Send + ' static > (
134- state_manager : Arc < StateManager < DB > > ,
169+ async fn chain_follower < DB : Blockstore + Sync + Send + ' static > (
170+ tasks : & Arc < Mutex < HashSet < SyncTask > > > ,
171+ state_machine : & Arc < Mutex < SyncStateMachine < DB > > > ,
172+ state_manager : & Arc < StateManager < DB > > ,
135173 bad_block_cache : Option < Arc < BadBlockCache > > ,
136174 network_rx : flume:: Receiver < NetworkEvent > ,
137175 tipset_receiver : flume:: Receiver < FullTipset > ,
138- network : SyncNetworkContext < DB > ,
139- mem_pool : Arc < MessagePool < Arc < ChainStore < DB > > > > ,
140- sync_status : SyncStatus ,
141- genesis : Tipset ,
176+ network : & SyncNetworkContext < DB > ,
177+ mem_pool : & Arc < MessagePool < Arc < ChainStore < DB > > > > ,
178+ sync_status : & SyncStatus ,
179+ genesis : & Tipset ,
142180 stateless_mode : bool ,
143181) -> anyhow:: Result < ( ) > {
144182 let state_changed = Arc :: new ( Notify :: new ( ) ) ;
145- let state_machine = Arc :: new ( Mutex :: new ( SyncStateMachine :: new (
146- state_manager. chain_store ( ) . clone ( ) ,
147- bad_block_cache. clone ( ) ,
148- stateless_mode,
149- ) ) ) ;
150- let tasks: Arc < Mutex < HashSet < SyncTask > > > = Arc :: new ( Mutex :: new ( HashSet :: default ( ) ) ) ;
183+
151184 let seen_block_cache = SeenBlockCache :: default ( ) ;
152185
153186 let mut set = JoinSet :: new ( ) ;
@@ -158,6 +191,8 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
158191 let state_changed = state_changed. shallow_clone ( ) ;
159192 let state_machine = state_machine. shallow_clone ( ) ;
160193 let network = network. shallow_clone ( ) ;
194+ let mem_pool = mem_pool. shallow_clone ( ) ;
195+ let genesis = genesis. shallow_clone ( ) ;
161196 let bad_block_cache = bad_block_cache. shallow_clone ( ) ;
162197 let seen_block_cache = seen_block_cache. shallow_clone ( ) ;
163198 async move {
@@ -244,11 +279,13 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
244279
245280 // When the state machine is updated, we need to update the sync status and spawn tasks
246281 set. spawn ( {
247- let state_manager = state_manager. clone ( ) ;
248- let state_machine = state_machine. clone ( ) ;
249- let state_changed = state_changed. clone ( ) ;
250- let tasks = tasks. clone ( ) ;
251- let bad_block_cache = bad_block_cache. clone ( ) ;
282+ let state_manager = state_manager. shallow_clone ( ) ;
283+ let state_machine = state_machine. shallow_clone ( ) ;
284+ let network = network. shallow_clone ( ) ;
285+ let sync_status = sync_status. shallow_clone ( ) ;
286+ let state_changed = state_changed. shallow_clone ( ) ;
287+ let tasks = tasks. shallow_clone ( ) ;
288+ let bad_block_cache = bad_block_cache. shallow_clone ( ) ;
252289 async move {
253290 loop {
254291 state_changed. notified ( ) . await ;
@@ -726,7 +763,7 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
726763
727764 fn mark_validated_tipset ( & mut self , tipset : FullTipset , is_proposed_head : bool ) {
728765 if !self . is_parent_validated ( & tipset) {
729- tracing:: error!( epoch = %tipset. epoch( ) , tsk = %tipset. key( ) , "Tipset must be validated") ;
766+ tracing:: error!( epoch = %tipset. epoch( ) , tsk = %tipset. key( ) , parent_state = %tipset . parent_state ( ) , "Parent tipset must be validated") ;
730767 return ;
731768 }
732769
0 commit comments