Skip to content
69 changes: 44 additions & 25 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::codec::HeaderOnlyBlock;
use crate::data_source::DataSourceTemplate;
use crate::data_source::UnresolvedDataSourceTemplate;
use crate::ingestor::PollingBlockIngestor;
use crate::json_block::EthereumJsonBlock;
use crate::network::EthereumNetworkAdapters;
use crate::polling_block_stream::PollingBlockStream;
use crate::runtime::runtime_adapter::eth_call_gas;
Expand Down Expand Up @@ -1076,22 +1077,32 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
.await?;

// First check if we have the ancestor in cache and can deserialize it.
// recent_blocks_cache can have full format {"block": {...}, "transaction_receipts": [...]}
// or light format (just block fields). We need full format with receipts for
// ancestor_block since it's used for trigger processing.
// The cached JSON can be in one of three formats:
// 1. Full RPC format: {"block": {...}, "transaction_receipts": [...]}
// 2. Shallow/header-only: {"timestamp": "...", "data": null} - only timestamp, no block data
// 3. Legacy direct: block fields at root level {hash, number, transactions, ...}
// We need full format with receipts for ancestor_block (used for trigger processing).
let block_ptr = match cached {
Some((json, ptr)) => {
if json.get("block").is_none() {
warn!(
let json_block = EthereumJsonBlock::new(json);
if json_block.is_shallow() {
trace!(
self.logger,
"Cached block #{} {} is shallow (header-only). Falling back to Firehose/RPC.",
ptr.number,
ptr.hash_hex(),
);
ptr
} else if json_block.is_legacy_format() {
trace!(
self.logger,
"Cached ancestor block #{} {} has light format without receipts. \
Falling back to Firehose/RPC.",
"Cached block #{} {} is legacy light format. Falling back to Firehose/RPC.",
ptr.number,
ptr.hash_hex(),
);
ptr
} else {
match json::from_value::<EthereumBlock>(json.clone()) {
match json_block.into_full_block() {
Ok(block) => {
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
ethereum_block: block,
Expand Down Expand Up @@ -1169,24 +1180,32 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
ChainClient::Firehose(endpoints) => {
let chain_store = self.chain_store.cheap_clone();
// First try to get the block from the store
// See ancestor_block() for documentation of the 3 cached JSON formats.
if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await {
if let Some(cached_json) = blocks.first() {
// recent_blocks_cache can contain full format {"block": {...}, "transaction_receipts": [...]}
// or light format (just block fields). Extract block data for deserialization.
let inner = cached_json.get("block").unwrap_or(cached_json);
match json::from_value::<LightEthereumBlock>(inner.clone()) {
Ok(light_block) => {
return Ok(light_block.parent_ptr());
}
Err(e) => {
warn!(
self.logger,
"Failed to deserialize cached block #{} {}: {}. \
Falling back to Firehose.",
block.number,
block.hash_hex(),
e
);
if let Some(cached_json) = blocks.into_iter().next() {
let json_block = EthereumJsonBlock::new(cached_json);
if json_block.is_shallow() {
trace!(
self.logger,
"Cached block #{} {} is shallow. Falling back to Firehose.",
block.number,
block.hash_hex(),
);
} else {
match json_block.into_light_block() {
Ok(light_block) => {
return Ok(light_block.parent_ptr());
}
Err(e) => {
warn!(
self.logger,
"Failed to deserialize cached block #{} {}: {}. \
Falling back to Firehose.",
block.number,
block.hash_hex(),
e
);
}
}
}
}
Expand Down
24 changes: 11 additions & 13 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use graph::{
blockchain::{block_stream::BlockWithTriggers, BlockPtr, IngestorError},
prelude::{
anyhow::{self, anyhow, bail, ensure, Context},
debug, error, hex, info, retry, serde_json as json, trace, warn, BlockNumber, ChainStore,
CheapClone, DynTryFuture, Error, EthereumCallCache, Logger, TimeoutError,
debug, error, hex, info, retry, trace, warn, BlockNumber, ChainStore, CheapClone,
DynTryFuture, Error, EthereumCallCache, Logger, TimeoutError,
},
};
use itertools::Itertools;
Expand All @@ -65,6 +65,7 @@ use crate::adapter::EthereumRpcError;
use crate::adapter::ProviderStatus;
use crate::call_helper::interpret_eth_call_error;
use crate::chain::BlockFinality;
use crate::json_block::EthereumJsonBlock;
use crate::trigger::{LogPosition, LogRef};
use crate::Chain;
use crate::NodeCapabilities;
Expand Down Expand Up @@ -1614,25 +1615,22 @@ impl EthereumAdapterTrait for EthereumAdapter {
.unwrap_or_default()
.into_iter()
.filter_map(|value| {
// recent_blocks_cache can contain full format {"block": {...}, "transaction_receipts": [...]}
// or light format (just block fields). Extract block data for deserialization.
let inner = value.get("block").unwrap_or(&value);
json::from_value(inner.clone())
let json_block = EthereumJsonBlock::new(value);
if json_block.is_shallow() {
return None;
}
json_block
.into_light_block()
.map_err(|e| {
let block_num = inner.get("number").and_then(|n| n.as_str());
let block_hash = inner.get("hash").and_then(|h| h.as_str());
warn!(
&logger,
"Failed to deserialize cached block #{:?} {:?}: {}. \
Block will be re-fetched from RPC.",
block_num,
block_hash,
"Failed to deserialize cached block: {}. Block will be re-fetched from RPC.",
e
);
})
.ok()
})
.map(|b| Arc::new(LightEthereumBlock::new(b)))
.map(Arc::new)
.collect();

let missing_blocks = Vec::from_iter(
Expand Down
52 changes: 52 additions & 0 deletions chain/ethereum/src/json_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use graph::prelude::serde_json::{self as json, Value};
use graph::prelude::{EthereumBlock, LightEthereumBlock};

use crate::json_patch;

#[derive(Debug)]
pub struct EthereumJsonBlock(Value);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if we should call it just JsonBlock named it this because we already have a JsonBlock in chainstore and this one is very ethereum specific

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with either, but since we are in the ethereum crate, the Ethereum prefix isn't strictly necessary. But not a big deal either way.


impl EthereumJsonBlock {
pub fn new(value: Value) -> Self {
Self(value)
}

/// Returns true if this is a shallow/header-only block (no full block data).
pub fn is_shallow(&self) -> bool {
self.0.get("data") == Some(&Value::Null)
}

/// Returns true if this block is in the legacy format (direct block JSON
/// rather than wrapped in a `block` field).
pub fn is_legacy_format(&self) -> bool {
self.0.get("block").is_none()
}

/// Patches missing `type` fields in transactions and receipts.
/// Required for alloy compatibility with cached blocks from older graph-node versions.
pub fn patch(&mut self) {
if let Some(block) = self.0.get_mut("block") {
json_patch::patch_block_transactions(block);
}
if let Some(receipts) = self.0.get_mut("transaction_receipts") {
json_patch::patch_receipts(receipts);
}
}

/// Patches and deserializes into a full `EthereumBlock` with receipts.
pub fn into_full_block(mut self) -> Result<EthereumBlock, json::Error> {
self.patch();
json::from_value(self.0)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I suggested the type parameter T, I assumed we'd use this for different block forms. But since we need to have different methods for light and full blocks, I think we don't need T.


/// Extracts and patches the inner block, deserializing into a `LightEthereumBlock`.
pub fn into_light_block(mut self) -> Result<LightEthereumBlock, json::Error> {
let mut inner = self
.0
.as_object_mut()
.and_then(|obj| obj.remove("block"))
.unwrap_or(self.0);
json_patch::patch_block_transactions(&mut inner);
json::from_value(inner)
}
}
123 changes: 123 additions & 0 deletions chain/ethereum/src/json_patch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//! JSON patching utilities for Ethereum blocks and receipts.
//!
//! Some cached blocks are missing the transaction `type` field because
//! graph-node's rust-web3 fork didn't capture it. Alloy requires this field for
//! deserialization. These utilities patch the JSON to add `type: "0x0"` (legacy
//! transaction) where missing.
//!
//! Also used by `PatchingHttp` for chains that don't support EIP-2718 typed transactions.

use graph::prelude::serde_json::Value;

pub(crate) fn patch_type_field(obj: &mut Value) -> bool {
if let Value::Object(map) = obj {
if !map.contains_key("type") {
map.insert("type".to_string(), Value::String("0x0".to_string()));
return true;
}
}
false
}

pub(crate) fn patch_block_transactions(block: &mut Value) -> bool {
let Some(txs) = block.get_mut("transactions").and_then(|t| t.as_array_mut()) else {
return false;
};
let mut patched = false;
for tx in txs {
patched |= patch_type_field(tx);
}
patched
}

pub(crate) fn patch_receipts(result: &mut Value) -> bool {
match result {
Value::Object(_) => patch_type_field(result),
Value::Array(arr) => {
let mut patched = false;
for r in arr {
patched |= patch_type_field(r);
}
patched
}
_ => false,
}
}

#[cfg(test)]
mod tests {
use super::*;
use graph::prelude::serde_json::json;

#[test]
fn patch_type_field_adds_missing_type() {
let mut obj = json!({"status": "0x1", "gasUsed": "0x5208"});
assert!(patch_type_field(&mut obj));
assert_eq!(obj["type"], "0x0");
}

#[test]
fn patch_type_field_preserves_existing_type() {
let mut obj = json!({"status": "0x1", "type": "0x2"});
assert!(!patch_type_field(&mut obj));
assert_eq!(obj["type"], "0x2");
}

#[test]
fn patch_type_field_handles_non_object() {
let mut val = json!("not an object");
assert!(!patch_type_field(&mut val));
}

#[test]
fn patch_block_transactions_patches_all() {
let mut block = json!({
"hash": "0x123",
"transactions": [
{"hash": "0xabc", "nonce": "0x1"},
{"hash": "0xdef", "nonce": "0x2", "type": "0x2"},
{"hash": "0xghi", "nonce": "0x3"}
]
});
assert!(patch_block_transactions(&mut block));
assert_eq!(block["transactions"][0]["type"], "0x0");
assert_eq!(block["transactions"][1]["type"], "0x2");
assert_eq!(block["transactions"][2]["type"], "0x0");
}

#[test]
fn patch_block_transactions_handles_empty() {
let mut block = json!({"hash": "0x123", "transactions": []});
assert!(!patch_block_transactions(&mut block));
}

#[test]
fn patch_block_transactions_handles_missing_field() {
let mut block = json!({"hash": "0x123"});
assert!(!patch_block_transactions(&mut block));
}

#[test]
fn patch_receipts_single() {
let mut receipt = json!({"status": "0x1"});
assert!(patch_receipts(&mut receipt));
assert_eq!(receipt["type"], "0x0");
}

#[test]
fn patch_receipts_array() {
let mut receipts = json!([
{"status": "0x1"},
{"status": "0x1", "type": "0x2"}
]);
assert!(patch_receipts(&mut receipts));
assert_eq!(receipts[0]["type"], "0x0");
assert_eq!(receipts[1]["type"], "0x2");
}

#[test]
fn patch_receipts_handles_null() {
let mut val = Value::Null;
assert!(!patch_receipts(&mut val));
}
}
2 changes: 2 additions & 0 deletions chain/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod data_source;
mod env;
mod ethereum_adapter;
mod ingestor;
mod json_block;
mod json_patch;
mod polling_block_stream;
pub mod runtime;
mod transport;
Expand Down
Loading