diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index b2e95c753f5..02b20c089e3 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -17,6 +17,7 @@ pub struct IndexingInputs { pub start_blocks: Vec, pub end_blocks: BTreeSet, pub stop_block: Option, + pub max_end_block: Option, pub store: Arc, pub debug_fork: Option>, pub triggers_adapter: Arc>, @@ -40,6 +41,7 @@ impl IndexingInputs { start_blocks, end_blocks, stop_block, + max_end_block, store: _, debug_fork, triggers_adapter, @@ -57,6 +59,7 @@ impl IndexingInputs { start_blocks: start_blocks.clone(), end_blocks: end_blocks.clone(), stop_block: stop_block.clone(), + max_end_block: max_end_block.clone(), store, debug_fork: debug_fork.clone(), triggers_adapter: triggers_adapter.clone(), diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index c98641539d9..996268da460 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -331,6 +331,18 @@ impl SubgraphInstanceManager { }) .collect(); + // We can set `max_end_block` to the maximum of `end_blocks` and stop the subgraph + // only when there are no dynamic data sources and no offchain data sources present. This is because: + // - Dynamic data sources do not have a defined `end_block`, so we can't determine + // when to stop processing them. + // - Offchain data sources might require processing beyond the end block of + // onchain data sources, so the subgraph needs to continue. + let max_end_block: Option = if data_sources.len() == end_blocks.len() { + end_blocks.iter().max().cloned() + } else { + None + }; + let templates = Arc::new(manifest.templates.clone()); // Obtain the debug fork from the subgraph store @@ -419,6 +431,7 @@ impl SubgraphInstanceManager { start_blocks, end_blocks, stop_block, + max_end_block, store, debug_fork, triggers_adapter, diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index cd341ce2f99..9b81c420ec2 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -197,6 +197,17 @@ where .unfail_deterministic_error(¤t_ptr, &parent_ptr) .await?; } + + // Stop subgraph when we reach maximum endblock. + if let Some(max_end_block) = self.inputs.max_end_block { + if max_end_block <= current_ptr.block_number() { + info!(self.logger, "Stopping subgraph as we reached maximum endBlock"; + "max_end_block" => max_end_block, + "current_block" => current_ptr.block_number()); + self.inputs.store.flush().await?; + return Ok(self); + } + } } loop { @@ -837,9 +848,21 @@ where } } - if let Some(stop_block) = &self.inputs.stop_block { - if block_ptr.number >= *stop_block { - info!(self.logger, "stop block reached for subgraph"); + if let Some(stop_block) = self.inputs.stop_block { + if block_ptr.number >= stop_block { + info!(self.logger, "Stop block reached for subgraph"); + return Ok(Action::Stop); + } + } + + if let Some(max_end_block) = self.inputs.max_end_block { + if block_ptr.number >= max_end_block { + info!( + self.logger, + "Stopping subgraph as maximum endBlock reached"; + "max_end_block" => max_end_block, + "current_block" => block_ptr.number + ); return Ok(Action::Stop); } } diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 25a923dd502..0daf4c33eda 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -541,6 +541,16 @@ pub enum BlockStreamEvent { ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor), } +impl BlockStreamEvent { + pub fn block_ptr(&self) -> BlockPtr { + match self { + BlockStreamEvent::Revert(ptr, _) => ptr.clone(), + BlockStreamEvent::ProcessBlock(block, _) => block.ptr(), + BlockStreamEvent::ProcessWasmBlock(ptr, _, _, _, _) => ptr.clone(), + } + } +} + impl Clone for BlockStreamEvent where C::TriggerData: Clone, diff --git a/tests/runner-tests/end-block/subgraph.yaml b/tests/runner-tests/end-block/subgraph.yaml index a20a593e8b8..76ed7ca3cd5 100644 --- a/tests/runner-tests/end-block/subgraph.yaml +++ b/tests/runner-tests/end-block/subgraph.yaml @@ -23,4 +23,24 @@ dataSources: eventHandlers: - event: TestEvent(string) handler: handleTestEvent + file: ./src/mapping.ts + # Datasource without endBlock to keep the subgraph running + - kind: ethereum/contract + name: Contract2 + network: test + source: + address: "0x0000000000000000000000000000000000000001" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.7 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + eventHandlers: + - event: TestEvent(string) + handler: handleTestEvent file: ./src/mapping.ts \ No newline at end of file