diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 85103cc5498..6cc098651a2 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -46,6 +46,7 @@ use crate::codec::HeaderOnlyBlock; use crate::data_source::DataSourceTemplate; use crate::data_source::UnresolvedDataSourceTemplate; use crate::ingestor::PollingBlockIngestor; +use crate::json_block::EthereumJsonBlock; use crate::network::EthereumNetworkAdapters; use crate::polling_block_stream::PollingBlockStream; use crate::runtime::runtime_adapter::eth_call_gas; @@ -1076,22 +1077,32 @@ impl TriggersAdapterTrait for TriggersAdapter { .await?; // First check if we have the ancestor in cache and can deserialize it. - // recent_blocks_cache can have full format {"block": {...}, "transaction_receipts": [...]} - // or light format (just block fields). We need full format with receipts for - // ancestor_block since it's used for trigger processing. + // The cached JSON can be in one of three formats: + // 1. Full RPC format: {"block": {...}, "transaction_receipts": [...]} + // 2. Shallow/header-only: {"timestamp": "...", "data": null} - only timestamp, no block data + // 3. Legacy direct: block fields at root level {hash, number, transactions, ...} + // We need full format with receipts for ancestor_block (used for trigger processing). let block_ptr = match cached { Some((json, ptr)) => { - if json.get("block").is_none() { - warn!( + let json_block = EthereumJsonBlock::new(json); + if json_block.is_shallow() { + trace!( + self.logger, + "Cached block #{} {} is shallow (header-only). Falling back to Firehose/RPC.", + ptr.number, + ptr.hash_hex(), + ); + ptr + } else if json_block.is_legacy_format() { + trace!( self.logger, - "Cached ancestor block #{} {} has light format without receipts. \ - Falling back to Firehose/RPC.", + "Cached block #{} {} is legacy light format. Falling back to Firehose/RPC.", ptr.number, ptr.hash_hex(), ); ptr } else { - match json::from_value::(json.clone()) { + match json_block.into_full_block() { Ok(block) => { return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { ethereum_block: block, @@ -1169,24 +1180,32 @@ impl TriggersAdapterTrait for TriggersAdapter { ChainClient::Firehose(endpoints) => { let chain_store = self.chain_store.cheap_clone(); // First try to get the block from the store + // See ancestor_block() for documentation of the 3 cached JSON formats. if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await { - if let Some(cached_json) = blocks.first() { - // recent_blocks_cache can contain full format {"block": {...}, "transaction_receipts": [...]} - // or light format (just block fields). Extract block data for deserialization. - let inner = cached_json.get("block").unwrap_or(cached_json); - match json::from_value::(inner.clone()) { - Ok(light_block) => { - return Ok(light_block.parent_ptr()); - } - Err(e) => { - warn!( - self.logger, - "Failed to deserialize cached block #{} {}: {}. \ - Falling back to Firehose.", - block.number, - block.hash_hex(), - e - ); + if let Some(cached_json) = blocks.into_iter().next() { + let json_block = EthereumJsonBlock::new(cached_json); + if json_block.is_shallow() { + trace!( + self.logger, + "Cached block #{} {} is shallow. Falling back to Firehose.", + block.number, + block.hash_hex(), + ); + } else { + match json_block.into_light_block() { + Ok(light_block) => { + return Ok(light_block.parent_ptr()); + } + Err(e) => { + warn!( + self.logger, + "Failed to deserialize cached block #{} {}: {}. \ + Falling back to Firehose.", + block.number, + block.hash_hex(), + e + ); + } } } } diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 64affbeec0b..6a811cd134d 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -46,8 +46,8 @@ use graph::{ blockchain::{block_stream::BlockWithTriggers, BlockPtr, IngestorError}, prelude::{ anyhow::{self, anyhow, bail, ensure, Context}, - debug, error, hex, info, retry, serde_json as json, trace, warn, BlockNumber, ChainStore, - CheapClone, DynTryFuture, Error, EthereumCallCache, Logger, TimeoutError, + debug, error, hex, info, retry, trace, warn, BlockNumber, ChainStore, CheapClone, + DynTryFuture, Error, EthereumCallCache, Logger, TimeoutError, }, }; use itertools::Itertools; @@ -65,6 +65,7 @@ use crate::adapter::EthereumRpcError; use crate::adapter::ProviderStatus; use crate::call_helper::interpret_eth_call_error; use crate::chain::BlockFinality; +use crate::json_block::EthereumJsonBlock; use crate::trigger::{LogPosition, LogRef}; use crate::Chain; use crate::NodeCapabilities; @@ -1614,25 +1615,22 @@ impl EthereumAdapterTrait for EthereumAdapter { .unwrap_or_default() .into_iter() .filter_map(|value| { - // recent_blocks_cache can contain full format {"block": {...}, "transaction_receipts": [...]} - // or light format (just block fields). Extract block data for deserialization. - let inner = value.get("block").unwrap_or(&value); - json::from_value(inner.clone()) + let json_block = EthereumJsonBlock::new(value); + if json_block.is_shallow() { + return None; + } + json_block + .into_light_block() .map_err(|e| { - let block_num = inner.get("number").and_then(|n| n.as_str()); - let block_hash = inner.get("hash").and_then(|h| h.as_str()); warn!( &logger, - "Failed to deserialize cached block #{:?} {:?}: {}. \ - Block will be re-fetched from RPC.", - block_num, - block_hash, + "Failed to deserialize cached block: {}. Block will be re-fetched from RPC.", e ); }) .ok() }) - .map(|b| Arc::new(LightEthereumBlock::new(b))) + .map(Arc::new) .collect(); let missing_blocks = Vec::from_iter( diff --git a/chain/ethereum/src/json_block.rs b/chain/ethereum/src/json_block.rs new file mode 100644 index 00000000000..5525e7fb7a8 --- /dev/null +++ b/chain/ethereum/src/json_block.rs @@ -0,0 +1,52 @@ +use graph::prelude::serde_json::{self as json, Value}; +use graph::prelude::{EthereumBlock, LightEthereumBlock}; + +use crate::json_patch; + +#[derive(Debug)] +pub struct EthereumJsonBlock(Value); + +impl EthereumJsonBlock { + pub fn new(value: Value) -> Self { + Self(value) + } + + /// Returns true if this is a shallow/header-only block (no full block data). + pub fn is_shallow(&self) -> bool { + self.0.get("data") == Some(&Value::Null) + } + + /// Returns true if this block is in the legacy format (direct block JSON + /// rather than wrapped in a `block` field). + pub fn is_legacy_format(&self) -> bool { + self.0.get("block").is_none() + } + + /// Patches missing `type` fields in transactions and receipts. + /// Required for alloy compatibility with cached blocks from older graph-node versions. + pub fn patch(&mut self) { + if let Some(block) = self.0.get_mut("block") { + json_patch::patch_block_transactions(block); + } + if let Some(receipts) = self.0.get_mut("transaction_receipts") { + json_patch::patch_receipts(receipts); + } + } + + /// Patches and deserializes into a full `EthereumBlock` with receipts. + pub fn into_full_block(mut self) -> Result { + self.patch(); + json::from_value(self.0) + } + + /// Extracts and patches the inner block, deserializing into a `LightEthereumBlock`. + pub fn into_light_block(mut self) -> Result { + let mut inner = self + .0 + .as_object_mut() + .and_then(|obj| obj.remove("block")) + .unwrap_or(self.0); + json_patch::patch_block_transactions(&mut inner); + json::from_value(inner) + } +} diff --git a/chain/ethereum/src/json_patch.rs b/chain/ethereum/src/json_patch.rs new file mode 100644 index 00000000000..d6a46f79ceb --- /dev/null +++ b/chain/ethereum/src/json_patch.rs @@ -0,0 +1,123 @@ +//! JSON patching utilities for Ethereum blocks and receipts. +//! +//! Some cached blocks are missing the transaction `type` field because +//! graph-node's rust-web3 fork didn't capture it. Alloy requires this field for +//! deserialization. These utilities patch the JSON to add `type: "0x0"` (legacy +//! transaction) where missing. +//! +//! Also used by `PatchingHttp` for chains that don't support EIP-2718 typed transactions. + +use graph::prelude::serde_json::Value; + +pub(crate) fn patch_type_field(obj: &mut Value) -> bool { + if let Value::Object(map) = obj { + if !map.contains_key("type") { + map.insert("type".to_string(), Value::String("0x0".to_string())); + return true; + } + } + false +} + +pub(crate) fn patch_block_transactions(block: &mut Value) -> bool { + let Some(txs) = block.get_mut("transactions").and_then(|t| t.as_array_mut()) else { + return false; + }; + let mut patched = false; + for tx in txs { + patched |= patch_type_field(tx); + } + patched +} + +pub(crate) fn patch_receipts(result: &mut Value) -> bool { + match result { + Value::Object(_) => patch_type_field(result), + Value::Array(arr) => { + let mut patched = false; + for r in arr { + patched |= patch_type_field(r); + } + patched + } + _ => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use graph::prelude::serde_json::json; + + #[test] + fn patch_type_field_adds_missing_type() { + let mut obj = json!({"status": "0x1", "gasUsed": "0x5208"}); + assert!(patch_type_field(&mut obj)); + assert_eq!(obj["type"], "0x0"); + } + + #[test] + fn patch_type_field_preserves_existing_type() { + let mut obj = json!({"status": "0x1", "type": "0x2"}); + assert!(!patch_type_field(&mut obj)); + assert_eq!(obj["type"], "0x2"); + } + + #[test] + fn patch_type_field_handles_non_object() { + let mut val = json!("not an object"); + assert!(!patch_type_field(&mut val)); + } + + #[test] + fn patch_block_transactions_patches_all() { + let mut block = json!({ + "hash": "0x123", + "transactions": [ + {"hash": "0xabc", "nonce": "0x1"}, + {"hash": "0xdef", "nonce": "0x2", "type": "0x2"}, + {"hash": "0xghi", "nonce": "0x3"} + ] + }); + assert!(patch_block_transactions(&mut block)); + assert_eq!(block["transactions"][0]["type"], "0x0"); + assert_eq!(block["transactions"][1]["type"], "0x2"); + assert_eq!(block["transactions"][2]["type"], "0x0"); + } + + #[test] + fn patch_block_transactions_handles_empty() { + let mut block = json!({"hash": "0x123", "transactions": []}); + assert!(!patch_block_transactions(&mut block)); + } + + #[test] + fn patch_block_transactions_handles_missing_field() { + let mut block = json!({"hash": "0x123"}); + assert!(!patch_block_transactions(&mut block)); + } + + #[test] + fn patch_receipts_single() { + let mut receipt = json!({"status": "0x1"}); + assert!(patch_receipts(&mut receipt)); + assert_eq!(receipt["type"], "0x0"); + } + + #[test] + fn patch_receipts_array() { + let mut receipts = json!([ + {"status": "0x1"}, + {"status": "0x1", "type": "0x2"} + ]); + assert!(patch_receipts(&mut receipts)); + assert_eq!(receipts[0]["type"], "0x0"); + assert_eq!(receipts[1]["type"], "0x2"); + } + + #[test] + fn patch_receipts_handles_null() { + let mut val = Value::Null; + assert!(!patch_receipts(&mut val)); + } +} diff --git a/chain/ethereum/src/lib.rs b/chain/ethereum/src/lib.rs index 8850764d63b..2bbc53fa327 100644 --- a/chain/ethereum/src/lib.rs +++ b/chain/ethereum/src/lib.rs @@ -7,6 +7,8 @@ mod data_source; mod env; mod ethereum_adapter; mod ingestor; +mod json_block; +mod json_patch; mod polling_block_stream; pub mod runtime; mod transport; diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index 4923e9c7b82..6c85b20841d 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -1,3 +1,4 @@ +use crate::json_patch; use alloy::transports::{TransportError, TransportErrorKind, TransportFut}; use graph::components::network_provider::ProviderName; use graph::endpoint::{ConnectionType, EndpointMetrics, RequestLabels}; @@ -148,34 +149,10 @@ impl PatchingHttp { method == "eth_getTransactionReceipt" || method == "eth_getBlockReceipts" } - fn patch_receipt(receipt: &mut Value) -> bool { - if let Value::Object(obj) = receipt { - if !obj.contains_key("type") { - obj.insert("type".to_string(), Value::String("0x0".to_string())); - return true; - } - } - false - } - - fn patch_result(result: &mut Value) -> bool { - match result { - Value::Object(_) => Self::patch_receipt(result), - Value::Array(arr) => { - let mut patched = false; - for r in arr { - patched |= Self::patch_receipt(r); - } - patched - } - _ => false, - } - } - fn patch_rpc_response(response: &mut Value) -> bool { response .get_mut("result") - .map(Self::patch_result) + .map(json_patch::patch_receipts) .unwrap_or(false) } @@ -262,21 +239,6 @@ impl Service for PatchingHttp { #[cfg(test)] mod tests { use super::*; - use serde_json::json; - - #[test] - fn patch_receipt_adds_missing_type() { - let mut receipt = json!({"status": "0x1", "gasUsed": "0x5208"}); - assert!(PatchingHttp::patch_receipt(&mut receipt)); - assert_eq!(receipt["type"], "0x0"); - } - - #[test] - fn patch_receipt_skips_existing_type() { - let mut receipt = json!({"status": "0x1", "type": "0x2"}); - assert!(!PatchingHttp::patch_receipt(&mut receipt)); - assert_eq!(receipt["type"], "0x2"); - } #[test] fn patch_response_single() {