Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop pretending that there are subgraphs that don't support PoI #5863

Merged
merged 2 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use graph::{
substreams::Modules,
};
use graph_runtime_wasm::module::ToAscPtr;
use lazy_static::__Deref;
use std::{collections::BTreeSet, sync::Arc};

use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges};
Expand Down Expand Up @@ -179,18 +178,6 @@ impl blockchain::TriggersAdapter<Chain> for TriggersAdapter {
}
}

fn write_poi_event(
proof_of_indexing: &SharedProofOfIndexing,
poi_event: &ProofOfIndexingEvent,
causality_region: &str,
logger: &Logger,
) {
if let Some(proof_of_indexing) = proof_of_indexing {
let mut proof_of_indexing = proof_of_indexing.deref().borrow_mut();
proof_of_indexing.write(logger, causality_region, poi_event);
}
}

pub struct TriggerProcessor {
pub locator: DeploymentLocator,
}
Expand Down Expand Up @@ -226,8 +213,7 @@ where
return Err(MappingError::Unknown(anyhow!("Detected UNSET entity operation, either a server error or there's a new type of operation and we're running an outdated protobuf")));
}
ParsedChanges::Upsert { key, entity } => {
write_poi_event(
proof_of_indexing,
proof_of_indexing.write_event(
&ProofOfIndexingEvent::SetEntity {
entity_type: key.entity_type.typename(),
id: &key.entity_id.to_string(),
Expand All @@ -249,8 +235,7 @@ where
let id = entity_key.entity_id.clone();
state.entity_cache.remove(entity_key);

write_poi_event(
proof_of_indexing,
proof_of_indexing.write_event(
&ProofOfIndexingEvent::RemoveEntity {
entity_type: entity_type.typename(),
id: &id.to_string(),
Expand Down
20 changes: 6 additions & 14 deletions core/src/subgraph/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
) -> Result<BlockState, MappingError> {
let error_count = state.deterministic_errors.len();

if let Some(proof_of_indexing) = proof_of_indexing {
proof_of_indexing
.borrow_mut()
.start_handler(causality_region);
}
proof_of_indexing.start_handler(causality_region);

let start = Instant::now();

Expand All @@ -156,16 +152,12 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
let elapsed = start.elapsed().as_secs_f64();
subgraph_metrics.observe_trigger_processing_duration(elapsed);

if let Some(proof_of_indexing) = proof_of_indexing {
if state.deterministic_errors.len() != error_count {
assert!(state.deterministic_errors.len() == error_count + 1);
if state.deterministic_errors.len() != error_count {
assert!(state.deterministic_errors.len() == error_count + 1);

// If a deterministic error has happened, write a new
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
proof_of_indexing
.borrow_mut()
.write_deterministic_error(logger, causality_region);
}
// If a deterministic error has happened, write a new
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
proof_of_indexing.write_deterministic_error(logger, causality_region);
}

Ok(state)
Expand Down
29 changes: 7 additions & 22 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::subgraph::error::BlockProcessingError;
use crate::subgraph::inputs::IndexingInputs;
use crate::subgraph::state::IndexingState;
use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{
BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor,
};
Expand Down Expand Up @@ -367,14 +366,8 @@ where
debug!(logger, "Start processing block";
"triggers" => triggers.len());

let proof_of_indexing = if self.inputs.store.supports_proof_of_indexing().await? {
Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new(
block_ptr.number,
self.inputs.poi_version,
))))
} else {
None
};
let proof_of_indexing =
SharedProofOfIndexing::new(block_ptr.number, self.inputs.poi_version);

// Causality region for onchain triggers.
let causality_region = PoICausalityRegion::from_network(&self.inputs.network);
Expand Down Expand Up @@ -633,8 +626,7 @@ where
return Err(BlockProcessingError::Canceled);
}

if let Some(proof_of_indexing) = proof_of_indexing {
let proof_of_indexing = Arc::try_unwrap(proof_of_indexing).unwrap().into_inner();
if let Some(proof_of_indexing) = proof_of_indexing.into_inner() {
update_proof_of_indexing(
proof_of_indexing,
block.timestamp(),
Expand Down Expand Up @@ -1160,7 +1152,7 @@ where

// PoI ignores offchain events.
// See also: poi-ignores-offchain
let proof_of_indexing = None;
let proof_of_indexing = SharedProofOfIndexing::ignored();
let causality_region = "";

let trigger = TriggerData::Offchain(trigger);
Expand Down Expand Up @@ -1318,14 +1310,8 @@ where
.deployment_head
.set(block_ptr.number as f64);

let proof_of_indexing = if self.inputs.store.supports_proof_of_indexing().await? {
Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new(
block_ptr.number,
self.inputs.poi_version,
))))
} else {
None
};
let proof_of_indexing =
SharedProofOfIndexing::new(block_ptr.number, self.inputs.poi_version);

// Causality region for onchain triggers.
let causality_region = PoICausalityRegion::from_network(&self.inputs.network);
Expand Down Expand Up @@ -1380,8 +1366,7 @@ where
return Err(BlockProcessingError::Canceled.into());
}

if let Some(proof_of_indexing) = proof_of_indexing {
let proof_of_indexing = Arc::try_unwrap(proof_of_indexing).unwrap().into_inner();
if let Some(proof_of_indexing) = proof_of_indexing.into_inner() {
update_proof_of_indexing(
proof_of_indexing,
block_time,
Expand Down
20 changes: 6 additions & 14 deletions core/src/subgraph/trigger_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ where
return Ok(state);
}

if let Some(proof_of_indexing) = proof_of_indexing {
proof_of_indexing
.borrow_mut()
.start_handler(causality_region);
}
proof_of_indexing.start_handler(causality_region);

for HostedTrigger {
host,
Expand Down Expand Up @@ -73,16 +69,12 @@ where
}
}

if let Some(proof_of_indexing) = proof_of_indexing {
if state.deterministic_errors.len() != error_count {
assert!(state.deterministic_errors.len() == error_count + 1);
if state.deterministic_errors.len() != error_count {
assert!(state.deterministic_errors.len() == error_count + 1);

// If a deterministic error has happened, write a new
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
proof_of_indexing
.borrow_mut()
.write_deterministic_error(logger, causality_region);
}
// If a deterministic error has happened, write a new
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
proof_of_indexing.write_deterministic_error(logger, causality_region);
}

Ok(state)
Expand Down
2 changes: 0 additions & 2 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,6 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker {
/// Set subgraph status to failed with the given error as the cause.
async fn fail_subgraph(&self, error: SubgraphError) -> Result<(), StoreError>;

async fn supports_proof_of_indexing(&self) -> Result<bool, StoreError>;

/// Transact the entity changes from a single block atomically into the store, and update the
/// subgraph block pointer to `block_ptr_to`, and update the firehose cursor to `firehose_cursor`
///
Expand Down
66 changes: 56 additions & 10 deletions graph/src/components/subgraph/proof_of_indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ mod online;
mod reference;

pub use event::ProofOfIndexingEvent;
use graph_derive::CheapClone;
pub use online::{ProofOfIndexing, ProofOfIndexingFinisher};
pub use reference::PoICausalityRegion;

use atomic_refcell::AtomicRefCell;
use std::sync::Arc;
use slog::Logger;
use std::{ops::Deref, sync::Arc};

use crate::prelude::BlockNumber;

#[derive(Copy, Clone, Debug)]
pub enum ProofOfIndexingVersion {
Expand All @@ -22,15 +26,57 @@ pub enum ProofOfIndexingVersion {
/// intentionally disallowed - PoI requires sequential access to the hash
/// function within a given causality region even if ownership is shared across
/// multiple mapping contexts.
///
/// The Option<_> is because not all subgraphs support PoI until re-deployed.
/// Eventually this can be removed.
///
/// This is not a great place to define this type, since the ProofOfIndexing
/// shouldn't "know" these details about wasmtime and subgraph re-deployments,
/// but the APIs that would make use of this are in graph/components so this
/// lives here for lack of a better choice.
pub type SharedProofOfIndexing = Option<Arc<AtomicRefCell<ProofOfIndexing>>>;
#[derive(Clone, CheapClone)]
pub struct SharedProofOfIndexing {
poi: Option<Arc<AtomicRefCell<ProofOfIndexing>>>,
}

impl SharedProofOfIndexing {
pub fn new(block: BlockNumber, version: ProofOfIndexingVersion) -> Self {
SharedProofOfIndexing {
poi: Some(Arc::new(AtomicRefCell::new(ProofOfIndexing::new(
block, version,
)))),
}
}

pub fn ignored() -> Self {
SharedProofOfIndexing { poi: None }
}

pub fn write_event(
&self,
poi_event: &ProofOfIndexingEvent,
causality_region: &str,
logger: &Logger,
) {
if let Some(poi) = &self.poi {
let mut poi = poi.deref().borrow_mut();
poi.write(logger, causality_region, poi_event);
}
}

pub fn start_handler(&self, causality_region: &str) {
if let Some(poi) = &self.poi {
let mut poi = poi.deref().borrow_mut();
poi.start_handler(causality_region);
}
}

pub fn write_deterministic_error(&self, logger: &Logger, causality_region: &str) {
if let Some(proof_of_indexing) = &self.poi {
proof_of_indexing
.deref()
.borrow_mut()
.write_deterministic_error(logger, causality_region);
}
}

pub fn into_inner(self) -> Option<ProofOfIndexing> {
self.poi
.map(|poi| Arc::try_unwrap(poi).unwrap().into_inner())
}
}

#[cfg(test)]
mod tests {
Expand Down
3 changes: 2 additions & 1 deletion runtime/test/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ethabi::Contract;
use graph::blockchain::BlockTime;
use graph::components::store::DeploymentLocator;
use graph::components::subgraph::SharedProofOfIndexing;
use graph::data::subgraph::*;
use graph::data_source;
use graph::data_source::common::MappingABI;
Expand Down Expand Up @@ -127,7 +128,7 @@ pub fn mock_context(
.unwrap(),
Default::default(),
),
proof_of_indexing: None,
proof_of_indexing: SharedProofOfIndexing::ignored(),
host_fns: Arc::new(Vec::new()),
debug_fork: None,
mapping_logger: Logger::root(slog::Discard, o!()),
Expand Down
19 changes: 2 additions & 17 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::str::FromStr;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -33,18 +32,6 @@ use crate::{error::DeterminismLevel, module::IntoTrap};

use super::module::WasmInstanceData;

fn write_poi_event(
proof_of_indexing: &SharedProofOfIndexing,
poi_event: &ProofOfIndexingEvent,
causality_region: &str,
logger: &Logger,
) {
if let Some(proof_of_indexing) = proof_of_indexing {
let mut proof_of_indexing = proof_of_indexing.deref().borrow_mut();
proof_of_indexing.write(logger, causality_region, poi_event);
}
}

impl IntoTrap for HostExportError {
fn determinism_level(&self) -> DeterminismLevel {
match self {
Expand Down Expand Up @@ -336,8 +323,7 @@ impl HostExports {
.map_err(|e| HostExportError::Deterministic(anyhow!(e)))?;

let poi_section = stopwatch.start_section("host_export_store_set__proof_of_indexing");
write_poi_event(
proof_of_indexing,
proof_of_indexing.write_event(
&ProofOfIndexingEvent::SetEntity {
entity_type: &key.entity_type.typename(),
id: &key.entity_id.to_string(),
Expand Down Expand Up @@ -369,8 +355,7 @@ impl HostExports {
entity_id: String,
gas: &GasCounter,
) -> Result<(), HostExportError> {
write_poi_event(
proof_of_indexing,
proof_of_indexing.write_event(
&ProofOfIndexingEvent::RemoveEntity {
entity_type: &entity_type,
id: &entity_id,
Expand Down
18 changes: 0 additions & 18 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,20 +916,6 @@ impl DeploymentStore {
}
}

pub(crate) async fn supports_proof_of_indexing<'a>(
&self,
site: Arc<Site>,
) -> Result<bool, StoreError> {
let store = self.clone();
self.with_conn(move |conn, cancel| {
cancel.check_cancel()?;
let layout = store.layout(conn, site)?;
Ok(layout.supports_proof_of_indexing())
})
.await
.map_err(Into::into)
}

pub(crate) async fn get_proof_of_indexing(
&self,
site: Arc<Site>,
Expand All @@ -950,10 +936,6 @@ impl DeploymentStore {

let layout = store.layout(conn, site.cheap_clone())?;

if !layout.supports_proof_of_indexing() {
return Ok(None);
}

conn.transaction::<_, CancelableError<anyhow::Error>, _>(move |conn| {
let mut block_ptr = block.cheap_clone();
let latest_block_ptr =
Expand Down
4 changes: 0 additions & 4 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,6 @@ impl Layout {
}
}

pub fn supports_proof_of_indexing(&self) -> bool {
self.tables.contains_key(&self.input_schema.poi_type())
}

pub fn create_relational_schema(
conn: &mut PgConnection,
site: Arc<Site>,
Expand Down
Loading
Loading