From 3056290782e6e71a66e18323cb8d1fa2808a87cf Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Sat, 24 Jan 2026 15:37:36 -0500 Subject: [PATCH 1/2] perf(index-node): make proofOfIndexing resolver async Remove synchronous `block_on` call in `resolve_proof_of_indexing` which was blocking tokio worker threads while waiting for database queries. Before this change, each POI query blocked an entire tokio worker thread. After this change, POI queries properly yield to the async runtime while waiting for database I/O, allowing the connection pool to be fully utilized. Co-Authored-By: Claude Opus 4.5 --- server/index-node/src/resolver.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index b8385866d33..3a64552dfe7 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -352,7 +352,10 @@ where )) } - fn resolve_proof_of_indexing(&self, field: &a::Field) -> Result { + async fn resolve_proof_of_indexing( + &self, + field: &a::Field, + ) -> Result { let deployment_id = field .get_required::("subgraph") .expect("Valid subgraphId required"); @@ -381,7 +384,7 @@ where let poi_fut = self .store .get_proof_of_indexing(&deployment_id, &indexer, block.clone()); - let poi = match graph::futures03::executor::block_on(poi_fut) { + let poi = match poi_fut.await { Ok(Some(poi)) => r::Value::String(format!("0x{}", hex::encode(poi))), Ok(None) => r::Value::Null, Err(e) => { @@ -791,7 +794,7 @@ where field.name.as_str(), scalar_type.name.as_str(), ) { - ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field), + ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field).await, ("Query", "blockData", "JSONObject") => self.resolve_block_data(field).await, ("Query", "blockHashFromNumber", "Bytes") => { self.resolve_block_hash_from_number(field).await From b8bd47d14e01fcfde6ab61573b8c6d1cfad7b781 Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Thu, 5 Feb 2026 22:02:33 -0400 Subject: [PATCH 2/2] feat(index-node): add blockForPoi query for dispute investigation Adds a new index-node GraphQL query that searches a block range to find which block produced a given POI. This supports dispute investigation where an indexer may have submitted a POI for block N that actually corresponds to block X < N. The resolver fetches all poi2$ digest entries in a single DB call, batch fetches block hashes in 50k chunks, and runs ProofOfIndexingFinisher for each block until a match is found. Also adds network_for_deployment to the StatusStore trait and its implementation chain (SubgraphStore, Store) as a supporting method for resolving the chain store from a deployment hash. Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 1 + graph/src/components/store/traits.rs | 36 ++++ server/index-node/Cargo.toml | 1 + server/index-node/src/resolver.rs | 235 ++++++++++++++++++++++++- server/index-node/src/schema.graphql | 18 ++ store/postgres/src/deployment_store.rs | 66 ++++++- store/postgres/src/store.rs | 14 ++ store/postgres/src/subgraph_store.rs | 34 +++- 8 files changed, 402 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae0ce8bd040..979592cb510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3970,6 +3970,7 @@ dependencies = [ "graph-chain-ethereum", "graph-chain-near", "graph-graphql", + "rayon", ] [[package]] diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 658baa8be3e..3b4f906cae7 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -10,6 +10,7 @@ use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr}; use crate::components::metrics::stopwatch::StopwatchMetrics; use crate::components::network_provider::ChainName; use crate::components::server::index_node::VersionInfo; +use crate::components::subgraph::ProofOfIndexingVersion; use crate::components::subgraph::SubgraphVersionSwitchingMode; use crate::components::transaction_receipt; use crate::components::versions::ApiVersion; @@ -741,6 +742,28 @@ pub trait QueryStore: Send + Sync { fn deployment_id(&self) -> DeploymentId; } +/// A single POI digest entry from the `poi2$` table, representing the +/// accumulated digest for a causality region over a block range. +#[derive(Clone, Debug)] +pub struct PoiDigestEntry { + /// The causality region identifier (the entity id in poi2$) + pub id: Id, + /// The accumulated digest bytes + pub digest: Vec, + /// Start of the block range (inclusive) + pub start_block: BlockNumber, + /// End of the block range (exclusive, i32::MAX if open-ended) + pub end_block: BlockNumber, +} + +/// The full POI digest history for a deployment, containing all digest +/// entries and the POI version needed to compute proofs. +#[derive(Clone, Debug)] +pub struct PoiDigestHistory { + pub entries: Vec, + pub poi_version: ProofOfIndexingVersion, +} + /// A view of the store that can provide information about the indexing status /// of any subgraph and any deployment #[async_trait] @@ -790,6 +813,19 @@ pub trait StatusStore: Send + Sync + 'static { block_number: BlockNumber, fetch_block_ptr: &dyn BlockPtrForNumber, ) -> Result, StoreError>; + + /// Retrieve the full POI digest history for a deployment within a block + /// range. Returns all `poi2$` entries whose block ranges overlap the + /// given range, along with the deployment's `ProofOfIndexingVersion`. + /// Returns `None` if the deployment doesn't exist or has no POI data. + async fn get_poi_digest_history( + &self, + subgraph_id: &DeploymentHash, + block_range: std::ops::Range, + ) -> Result, StoreError>; + + /// Get the network for a deployment + async fn network_for_deployment(&self, id: &DeploymentHash) -> Result; } #[async_trait] diff --git a/server/index-node/Cargo.toml b/server/index-node/Cargo.toml index 9672f657e4a..e1f7e1d021c 100644 --- a/server/index-node/Cargo.toml +++ b/server/index-node/Cargo.toml @@ -11,3 +11,4 @@ graph-graphql = { path = "../../graphql" } graph-chain-ethereum = { path = "../../chain/ethereum" } graph-chain-near = { path = "../../chain/near" } git-testament = "0.2.6" +rayon = "1" diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 3a64552dfe7..70973a9962a 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -1,6 +1,7 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use async_trait::async_trait; +use graph::components::subgraph::ProofOfIndexingFinisher; use graph::data::query::Trace; use graph::data::store::Id; use graph::prelude::alloy::primitives::Address; @@ -454,6 +455,237 @@ where Ok(r::Value::List(public_poi_results)) } + async fn resolve_block_for_poi( + &self, + field: &a::Field, + ) -> Result { + const CHUNK_SIZE: i32 = 1_000_000; + + let deployment_id = field + .get_required::("subgraph") + .expect("Valid subgraph required"); + let target_poi_hash = field + .get_required::("targetPoi") + .expect("Valid targetPoi required"); + let start_block = field + .get_required::("startBlock") + .expect("Valid startBlock required"); + let end_block = field + .get_required::("endBlock") + .expect("Valid endBlock required"); + + let indexer = Some( + field + .get_required::
("indexer") + .expect("Valid indexer required"), + ); + + if end_block <= start_block { + return Ok(r::Value::Null); + } + + let target_bytes: [u8; 32] = match target_poi_hash.as_slice().try_into() { + Ok(bytes) => bytes, + Err(_) => { + error!( + self.logger, + "Invalid targetPoi: expected 32 bytes"; + "got_bytes" => target_poi_hash.as_slice().len() + ); + return Ok(r::Value::Null); + } + }; + + // Resolve the network for this deployment + let network = match self.store.network_for_deployment(&deployment_id).await { + Ok(n) => n, + Err(e) => { + error!( + self.logger, + "Failed to resolve network for deployment"; + "subgraph" => &deployment_id, + "error" => format!("{:?}", e) + ); + return Ok(r::Value::Null); + } + }; + + // Fetch the full digest history for the block range + let history = match self + .store + .get_poi_digest_history(&deployment_id, start_block..end_block) + .await + { + Ok(Some(h)) => h, + Ok(None) => return Ok(r::Value::Null), + Err(e) => { + error!( + self.logger, + "Failed to fetch POI digest history"; + "subgraph" => &deployment_id, + "error" => format!("{:?}", e) + ); + return Ok(r::Value::Null); + } + }; + + let poi_version = history.poi_version; + + // Build a lookup structure: for each causality region id, a sorted + // vec of (start_block, end_block, digest) for binary search. + let mut region_entries: HashMap)>> = + HashMap::new(); + for entry in history.entries { + region_entries.entry(entry.id).or_default().push(( + entry.start_block, + entry.end_block, + entry.digest, + )); + } + for entries in region_entries.values_mut() { + entries.sort_by_key(|(start, _, _)| *start); + } + + // Share across rayon threads + let region_entries = Arc::new(region_entries); + + let chain_store = match self.store.block_store().chain_store(&network).await { + Some(cs) => cs, + None => { + error!( + self.logger, + "Chain store not found for network"; + "network" => &network + ); + return Ok(r::Value::Null); + } + }; + + // Search backwards from end_block (the match is likely near the top). + // Pipeline: fetch the next chunk while computing POIs for the current one. + let mut chunk_end = end_block; + let chunk_start = std::cmp::max(chunk_end - CHUNK_SIZE, start_block); + + // Fetch first chunk + let block_numbers: Vec = (chunk_start..chunk_end).collect(); + let mut current_ptrs = match chain_store + .cheap_clone() + .block_ptrs_by_numbers(block_numbers) + .await + { + Ok(ptrs) => ptrs, + Err(e) => { + error!( + self.logger, + "Failed to fetch block hashes"; + "range" => format!("{}..{}", chunk_start, chunk_end), + "error" => format!("{:?}", e) + ); + return Ok(r::Value::Null); + } + }; + chunk_end = chunk_start; + + loop { + // Start prefetching the next chunk while we process the current one + let next_chunk_end = chunk_end; + let next_chunk_start = std::cmp::max(next_chunk_end - CHUNK_SIZE, start_block); + let prefetch = if next_chunk_start < next_chunk_end { + let cs = chain_store.cheap_clone(); + let numbers: Vec = (next_chunk_start..next_chunk_end).collect(); + Some(tokio::spawn(async move { + cs.block_ptrs_by_numbers(numbers).await + })) + } else { + None + }; + + // Collect blocks with unambiguous hashes for parallel search + let blocks_to_check: Vec<(BlockNumber, BlockHash)> = current_ptrs + .iter() + .filter_map(|(num, ptrs)| { + if ptrs.len() == 1 { + Some((*num, ptrs[0].hash.clone())) + } else { + None + } + }) + .collect(); + + // Parallel POI computation across all cores via rayon + let re = region_entries.clone(); + let did = deployment_id.clone(); + let result = graph::spawn_blocking_allow_panic(move || { + use rayon::prelude::*; + blocks_to_check + .par_iter() + .find_map_any(|(block_num, block_hash)| { + let block_ptr = BlockPtr::new(block_hash.clone(), *block_num); + let mut finisher = + ProofOfIndexingFinisher::new(&block_ptr, &did, &indexer, poi_version); + + for (region_id, entries) in re.as_ref() { + let idx = entries.partition_point(|(start, _, _)| *start <= *block_num); + if idx == 0 { + continue; + } + let (start, end, ref digest) = entries[idx - 1]; + if *block_num >= start && *block_num < end { + finisher.add_causality_region(region_id, digest); + } + } + + let computed = finisher.finish(); + if computed == target_bytes { + Some((*block_num, block_hash.clone(), computed)) + } else { + None + } + }) + }) + .await + .map_err(|e| QueryExecutionError::Panic(e.to_string()))?; + + if let Some((block_num, block_hash, computed_poi)) = result { + // Found it - abort any in-flight prefetch + if let Some(handle) = prefetch { + handle.abort(); + } + return Ok(object! { + __typename: "PoiSearchResult", + deployment: deployment_id.to_string(), + block: object! { + hash: block_hash.hash_hex(), + number: block_num, + }, + proofOfIndexing: format!("0x{}", hex::encode(computed_poi)), + }); + } + + // Move to the next chunk + match prefetch { + Some(handle) => { + current_ptrs = handle + .await + .map_err(|e| QueryExecutionError::Panic(e.to_string()))? + .map_err(|e| { + error!( + self.logger, + "Failed to fetch block hashes"; + "range" => format!("{}..{}", next_chunk_start, next_chunk_end), + "error" => format!("{:?}", e) + ); + QueryExecutionError::StoreError(e.into()) + })?; + chunk_end = next_chunk_start; + } + None => break, + } + } + + Ok(r::Value::Null) + } + async fn resolve_indexing_status_for_version( &self, field: &a::Field, @@ -858,6 +1090,7 @@ where // The top-level `subgraphVersions` field (None, "apiVersions") => self.resolve_api_versions(field), (None, "version") => self.version(), + (None, "blockForPoi") => self.resolve_block_for_poi(field).await, // Resolve fields of `Object` values (e.g. the `latestBlock` field of `EthereumBlock`) (value, _) => Ok(value.unwrap_or(r::Value::Null)), diff --git a/server/index-node/src/schema.graphql b/server/index-node/src/schema.graphql index 10475d75a0d..2fbe0a5e5dd 100644 --- a/server/index-node/src/schema.graphql +++ b/server/index-node/src/schema.graphql @@ -46,6 +46,18 @@ type Query { blockHash: Bytes! ): [CachedEthereumCall!] apiVersions(subgraphId: String!): [ApiVersion!]! + """ + Find the block number that produced a given proof of indexing. + Used for dispute investigation to verify which block an indexer + actually synced to when they submitted a POI. + """ + blockForPoi( + subgraph: String! + targetPoi: Bytes! + startBlock: Int! + endBlock: Int! + indexer: Bytes! + ): PoiSearchResult } type Version { @@ -203,6 +215,12 @@ type ProofOfIndexingResult { proofOfIndexing: Bytes } +type PoiSearchResult { + block: Block! + deployment: String! + proofOfIndexing: Bytes! +} + type ApiVersion { """ Version number in SemVer format diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 3703534979c..549c76150c8 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -8,6 +8,7 @@ use graph::anyhow::Context; use graph::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor}; use graph::blockchain::BlockTime; use graph::components::store::write::RowGroup; +use graph::components::store::PoiDigestHistory; use graph::components::store::{ Batch, DeploymentLocator, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest, PruningStrategy, QueryPermit, StoredDynamicDataSource, VersionStats, @@ -15,7 +16,7 @@ use graph::components::store::{ use graph::components::versions::VERSIONS; use graph::data::graphql::IntoValue; use graph::data::query::Trace; -use graph::data::store::{IdList, SqlQueryObject}; +use graph::data::store::{Id, IdList, SqlQueryObject}; use graph::data::subgraph::{status, SPEC_VERSION_0_0_6}; use graph::data_source::CausalityRegion; use graph::derive::CheapClone; @@ -1032,6 +1033,69 @@ impl DeploymentStore { Ok(Some(finisher.finish())) } + /// Retrieve all POI digest entries from the `poi2$` table whose block + /// ranges overlap the given `block_range`, along with the deployment's + /// POI version. Used by the `blockForPoi` resolver to reconstruct POIs + /// without per-block entity queries. + pub(crate) async fn get_poi_digest_history( + &self, + site: Arc, + block_range: Range, + ) -> Result, StoreError> { + use diesel::sql_types::{Binary, Integer, Text}; + use graph::components::store::PoiDigestEntry; + + let info = self.subgraph_info(site.cheap_clone()).await?; + + #[derive(QueryableByName)] + struct DigestRow { + #[diesel(sql_type = Text)] + id: String, + #[diesel(sql_type = Binary)] + digest: Vec, + #[diesel(sql_type = Integer)] + start_block: i32, + #[diesel(sql_type = Integer)] + end_block: i32, + } + + let query = format!( + r#"SELECT id, digest, lower(block_range) as start_block, + coalesce(upper(block_range), 2147483647) as end_block + FROM "{}"."poi2$" + WHERE block_range && int4range($1, $2) + ORDER BY id, lower(block_range)"#, + site.namespace, + ); + + let mut conn = self.pool.get_permitted().await?; + let rows = diesel::sql_query(query) + .bind::(block_range.start) + .bind::(block_range.end) + .load::(&mut conn) + .await + .map_err(StoreError::from)?; + + if rows.is_empty() { + return Ok(None); + } + + let entries = rows + .into_iter() + .map(|row| PoiDigestEntry { + id: Id::String(row.id.into()), + digest: row.digest, + start_block: row.start_block, + end_block: row.end_block, + }) + .collect(); + + Ok(Some(PoiDigestHistory { + entries, + poi_version: info.poi_version, + })) + } + /// Get the entity matching `key` from the deployment `site`. Only /// consider entities as of the given `block` pub(crate) async fn get( diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 4adec80ab5b..0056d9de6aa 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -171,6 +171,20 @@ impl StatusStore for Store { .await } + async fn get_poi_digest_history( + &self, + subgraph_id: &DeploymentHash, + block_range: std::ops::Range, + ) -> Result, StoreError> { + self.subgraph_store + .get_poi_digest_history(subgraph_id, block_range) + .await + } + + async fn network_for_deployment(&self, id: &DeploymentHash) -> Result { + self.subgraph_store.network_for_deployment(id).await + } + async fn query_permit(&self) -> QueryPermit { // Status queries go to the primary shard. self.block_store.query_permit_primary().await diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 478d21eba02..5b15b610210 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -19,7 +19,7 @@ use graph::{ server::index_node::VersionInfo, store::{ self, BlockPtrForNumber, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, - PruneReporter, PruneRequest, SubgraphFork, + PoiDigestHistory, PruneReporter, PruneRequest, SubgraphFork, }, }, data::query::QueryTarget, @@ -247,6 +247,21 @@ impl SubgraphStore { self.inner.get_proof_of_indexing(id, indexer, block).await } + pub(crate) async fn get_poi_digest_history( + &self, + id: &DeploymentHash, + block_range: std::ops::Range, + ) -> Result, StoreError> { + self.inner.get_poi_digest_history(id, block_range).await + } + + pub(crate) async fn network_for_deployment( + &self, + id: &DeploymentHash, + ) -> Result { + self.inner.network_for_deployment(id).await + } + pub(crate) async fn get_public_proof_of_indexing( &self, id: &DeploymentHash, @@ -1119,6 +1134,23 @@ impl Inner { store.get_proof_of_indexing(site, indexer, block).await } + pub(crate) async fn get_poi_digest_history( + &self, + id: &DeploymentHash, + block_range: std::ops::Range, + ) -> Result, StoreError> { + let (store, site) = self.store(id).await?; + store.get_poi_digest_history(site, block_range).await + } + + pub(crate) async fn network_for_deployment( + &self, + id: &DeploymentHash, + ) -> Result { + let site = self.site(id).await?; + Ok(site.network.clone()) + } + pub(crate) async fn get_public_proof_of_indexing( &self, id: &DeploymentHash,