diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index 13a5bed1a1d..405b6f8a116 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -15,7 +15,6 @@ use graph::{ substreams::Modules, }; use graph_runtime_wasm::module::ToAscPtr; -use lazy_static::__Deref; use std::{collections::BTreeSet, sync::Arc}; use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges}; @@ -179,18 +178,6 @@ impl blockchain::TriggersAdapter for TriggersAdapter { } } -fn write_poi_event( - proof_of_indexing: &SharedProofOfIndexing, - poi_event: &ProofOfIndexingEvent, - causality_region: &str, - logger: &Logger, -) { - if let Some(proof_of_indexing) = proof_of_indexing { - let mut proof_of_indexing = proof_of_indexing.deref().borrow_mut(); - proof_of_indexing.write(logger, causality_region, poi_event); - } -} - pub struct TriggerProcessor { pub locator: DeploymentLocator, } @@ -226,8 +213,7 @@ where return Err(MappingError::Unknown(anyhow!("Detected UNSET entity operation, either a server error or there's a new type of operation and we're running an outdated protobuf"))); } ParsedChanges::Upsert { key, entity } => { - write_poi_event( - proof_of_indexing, + proof_of_indexing.write_event( &ProofOfIndexingEvent::SetEntity { entity_type: key.entity_type.typename(), id: &key.entity_id.to_string(), @@ -249,8 +235,7 @@ where let id = entity_key.entity_id.clone(); state.entity_cache.remove(entity_key); - write_poi_event( - proof_of_indexing, + proof_of_indexing.write_event( &ProofOfIndexingEvent::RemoveEntity { entity_type: entity_type.typename(), id: &id.to_string(), diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index ef265978ede..3f35d570a7d 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -126,11 +126,7 @@ impl> IndexingContext { ) -> Result { let error_count = state.deterministic_errors.len(); - if let Some(proof_of_indexing) = proof_of_indexing { - proof_of_indexing - .borrow_mut() - .start_handler(causality_region); - } + proof_of_indexing.start_handler(causality_region); let start = Instant::now(); @@ -156,16 +152,12 @@ impl> IndexingContext { let elapsed = start.elapsed().as_secs_f64(); subgraph_metrics.observe_trigger_processing_duration(elapsed); - if let Some(proof_of_indexing) = proof_of_indexing { - if state.deterministic_errors.len() != error_count { - assert!(state.deterministic_errors.len() == error_count + 1); + if state.deterministic_errors.len() != error_count { + assert!(state.deterministic_errors.len() == error_count + 1); - // If a deterministic error has happened, write a new - // ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing. - proof_of_indexing - .borrow_mut() - .write_deterministic_error(logger, causality_region); - } + // If a deterministic error has happened, write a new + // ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing. + proof_of_indexing.write_deterministic_error(logger, causality_region); } Ok(state) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 380cf70f88d..922c7a4003c 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -3,7 +3,6 @@ use crate::subgraph::error::BlockProcessingError; use crate::subgraph::inputs::IndexingInputs; use crate::subgraph::state::IndexingState; use crate::subgraph::stream::new_block_stream; -use atomic_refcell::AtomicRefCell; use graph::blockchain::block_stream::{ BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, }; @@ -367,10 +366,8 @@ where debug!(logger, "Start processing block"; "triggers" => triggers.len()); - let proof_of_indexing = Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new( - block_ptr.number, - self.inputs.poi_version, - )))); + let proof_of_indexing = + SharedProofOfIndexing::new(block_ptr.number, self.inputs.poi_version); // Causality region for onchain triggers. let causality_region = PoICausalityRegion::from_network(&self.inputs.network); @@ -629,8 +626,7 @@ where return Err(BlockProcessingError::Canceled); } - if let Some(proof_of_indexing) = proof_of_indexing { - let proof_of_indexing = Arc::try_unwrap(proof_of_indexing).unwrap().into_inner(); + if let Some(proof_of_indexing) = proof_of_indexing.into_inner() { update_proof_of_indexing( proof_of_indexing, block.timestamp(), @@ -1156,7 +1152,7 @@ where // PoI ignores offchain events. // See also: poi-ignores-offchain - let proof_of_indexing = None; + let proof_of_indexing = SharedProofOfIndexing::ignored(); let causality_region = ""; let trigger = TriggerData::Offchain(trigger); @@ -1314,10 +1310,8 @@ where .deployment_head .set(block_ptr.number as f64); - let proof_of_indexing = Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new( - block_ptr.number, - self.inputs.poi_version, - )))); + let proof_of_indexing = + SharedProofOfIndexing::new(block_ptr.number, self.inputs.poi_version); // Causality region for onchain triggers. let causality_region = PoICausalityRegion::from_network(&self.inputs.network); @@ -1372,8 +1366,7 @@ where return Err(BlockProcessingError::Canceled.into()); } - if let Some(proof_of_indexing) = proof_of_indexing { - let proof_of_indexing = Arc::try_unwrap(proof_of_indexing).unwrap().into_inner(); + if let Some(proof_of_indexing) = proof_of_indexing.into_inner() { update_proof_of_indexing( proof_of_indexing, block_time, diff --git a/core/src/subgraph/trigger_processor.rs b/core/src/subgraph/trigger_processor.rs index 0057e9e1354..c3123e87268 100644 --- a/core/src/subgraph/trigger_processor.rs +++ b/core/src/subgraph/trigger_processor.rs @@ -39,11 +39,7 @@ where return Ok(state); } - if let Some(proof_of_indexing) = proof_of_indexing { - proof_of_indexing - .borrow_mut() - .start_handler(causality_region); - } + proof_of_indexing.start_handler(causality_region); for HostedTrigger { host, @@ -73,16 +69,12 @@ where } } - if let Some(proof_of_indexing) = proof_of_indexing { - if state.deterministic_errors.len() != error_count { - assert!(state.deterministic_errors.len() == error_count + 1); + if state.deterministic_errors.len() != error_count { + assert!(state.deterministic_errors.len() == error_count + 1); - // If a deterministic error has happened, write a new - // ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing. - proof_of_indexing - .borrow_mut() - .write_deterministic_error(logger, causality_region); - } + // If a deterministic error has happened, write a new + // ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing. + proof_of_indexing.write_deterministic_error(logger, causality_region); } Ok(state) diff --git a/graph/src/components/subgraph/proof_of_indexing/mod.rs b/graph/src/components/subgraph/proof_of_indexing/mod.rs index c8dd8c13314..36eeabc22cd 100644 --- a/graph/src/components/subgraph/proof_of_indexing/mod.rs +++ b/graph/src/components/subgraph/proof_of_indexing/mod.rs @@ -3,11 +3,15 @@ mod online; mod reference; pub use event::ProofOfIndexingEvent; +use graph_derive::CheapClone; pub use online::{ProofOfIndexing, ProofOfIndexingFinisher}; pub use reference::PoICausalityRegion; use atomic_refcell::AtomicRefCell; -use std::sync::Arc; +use slog::Logger; +use std::{ops::Deref, sync::Arc}; + +use crate::prelude::BlockNumber; #[derive(Copy, Clone, Debug)] pub enum ProofOfIndexingVersion { @@ -22,15 +26,57 @@ pub enum ProofOfIndexingVersion { /// intentionally disallowed - PoI requires sequential access to the hash /// function within a given causality region even if ownership is shared across /// multiple mapping contexts. -/// -/// The Option<_> is because not all subgraphs support PoI until re-deployed. -/// Eventually this can be removed. -/// -/// This is not a great place to define this type, since the ProofOfIndexing -/// shouldn't "know" these details about wasmtime and subgraph re-deployments, -/// but the APIs that would make use of this are in graph/components so this -/// lives here for lack of a better choice. -pub type SharedProofOfIndexing = Option>>; +#[derive(Clone, CheapClone)] +pub struct SharedProofOfIndexing { + poi: Option>>, +} + +impl SharedProofOfIndexing { + pub fn new(block: BlockNumber, version: ProofOfIndexingVersion) -> Self { + SharedProofOfIndexing { + poi: Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new( + block, version, + )))), + } + } + + pub fn ignored() -> Self { + SharedProofOfIndexing { poi: None } + } + + pub fn write_event( + &self, + poi_event: &ProofOfIndexingEvent, + causality_region: &str, + logger: &Logger, + ) { + if let Some(poi) = &self.poi { + let mut poi = poi.deref().borrow_mut(); + poi.write(logger, causality_region, poi_event); + } + } + + pub fn start_handler(&self, causality_region: &str) { + if let Some(poi) = &self.poi { + let mut poi = poi.deref().borrow_mut(); + poi.start_handler(causality_region); + } + } + + pub fn write_deterministic_error(&self, logger: &Logger, causality_region: &str) { + if let Some(proof_of_indexing) = &self.poi { + proof_of_indexing + .deref() + .borrow_mut() + .write_deterministic_error(logger, causality_region); + } + } + + pub fn into_inner(self) -> Option { + self.poi + .map(|poi| Arc::try_unwrap(poi).unwrap().into_inner()) + } +} #[cfg(test)] mod tests { diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 7641dd06d8b..461d4a08256 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -1,6 +1,7 @@ use ethabi::Contract; use graph::blockchain::BlockTime; use graph::components::store::DeploymentLocator; +use graph::components::subgraph::SharedProofOfIndexing; use graph::data::subgraph::*; use graph::data_source; use graph::data_source::common::MappingABI; @@ -127,7 +128,7 @@ pub fn mock_context( .unwrap(), Default::default(), ), - proof_of_indexing: None, + proof_of_indexing: SharedProofOfIndexing::ignored(), host_fns: Arc::new(Vec::new()), debug_fork: None, mapping_logger: Logger::root(slog::Discard, o!()), diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index bd1c8706c4a..12099c55b7e 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::ops::Deref; use std::str::FromStr; use std::time::{Duration, Instant}; @@ -33,18 +32,6 @@ use crate::{error::DeterminismLevel, module::IntoTrap}; use super::module::WasmInstanceData; -fn write_poi_event( - proof_of_indexing: &SharedProofOfIndexing, - poi_event: &ProofOfIndexingEvent, - causality_region: &str, - logger: &Logger, -) { - if let Some(proof_of_indexing) = proof_of_indexing { - let mut proof_of_indexing = proof_of_indexing.deref().borrow_mut(); - proof_of_indexing.write(logger, causality_region, poi_event); - } -} - impl IntoTrap for HostExportError { fn determinism_level(&self) -> DeterminismLevel { match self { @@ -336,8 +323,7 @@ impl HostExports { .map_err(|e| HostExportError::Deterministic(anyhow!(e)))?; let poi_section = stopwatch.start_section("host_export_store_set__proof_of_indexing"); - write_poi_event( - proof_of_indexing, + proof_of_indexing.write_event( &ProofOfIndexingEvent::SetEntity { entity_type: &key.entity_type.typename(), id: &key.entity_id.to_string(), @@ -369,8 +355,7 @@ impl HostExports { entity_id: String, gas: &GasCounter, ) -> Result<(), HostExportError> { - write_poi_event( - proof_of_indexing, + proof_of_indexing.write_event( &ProofOfIndexingEvent::RemoveEntity { entity_type: &entity_type, id: &entity_id,