From 94929725bdfcd5e4bf788db6c613900b121c12ed Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 4 Feb 2026 13:02:12 -0800 Subject: [PATCH] chain/ethereum: Make sure we log the provider when the EthereumAdapter logs --- chain/ethereum/src/ethereum_adapter.rs | 111 +++++++++++++++---------- 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 64affbeec0b..7009133a186 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -14,6 +14,7 @@ use graph::data::store::scalar; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::data::subgraph::API_VERSION_0_0_7; use graph::data_source::common::ContractCall; +use graph::derive::CheapClone; use graph::futures01::stream; use graph::futures01::Future; use graph::futures01::Stream; @@ -127,11 +128,34 @@ impl CheapClone for EthereumAdapter { } } +/// Logger with provider context. Internal methods should take this type +/// to ensure provider info is always included in logs. +#[derive(Clone, CheapClone)] +struct ProviderLogger(Logger); + +impl ProviderLogger { + fn new(logger: &Logger, provider: &str) -> Self { + Self(Logger::new(logger, o!("provider" => provider.to_string()))) + } +} + +impl std::ops::Deref for ProviderLogger { + type Target = Logger; + fn deref(&self) -> &Logger { + &self.0 + } +} + impl EthereumAdapter { pub fn is_call_only(&self) -> bool { self.call_only } + /// Returns a logger with provider context added + fn provider_logger(&self, logger: &Logger) -> ProviderLogger { + ProviderLogger::new(logger, &self.provider) + } + pub async fn new( logger: Logger, provider: String, @@ -178,7 +202,7 @@ impl EthereumAdapter { async fn traces( self, - logger: Logger, + logger: ProviderLogger, subgraph_metrics: Arc, from: BlockNumber, to: BlockNumber, @@ -219,7 +243,7 @@ impl EthereumAdapter { async fn execute_trace_filter_request( &self, - logger: Logger, + logger: ProviderLogger, subgraph_metrics: Arc, from: BlockNumber, to: BlockNumber, @@ -264,7 +288,7 @@ impl EthereumAdapter { fn log_trace_results( &self, - logger: &Logger, + logger: &ProviderLogger, from: BlockNumber, to: BlockNumber, trace_len: usize, @@ -288,7 +312,7 @@ impl EthereumAdapter { result: &Result, RpcError>, from: BlockNumber, to: BlockNumber, - logger: &Logger, + logger: &ProviderLogger, ) { self.metrics .observe_request(elapsed, "trace_filter", &self.provider); @@ -312,7 +336,7 @@ impl EthereumAdapter { block_hash: B256, supports_eip_1898: bool, call_only: bool, - logger: Logger, + logger: ProviderLogger, ) -> bool { // This is the lazy part. If the result is already in `supports_block_receipts`, we don't need // to check again. @@ -362,7 +386,7 @@ impl EthereumAdapter { /// Alloy-exclusive version of logs_with_sigs using alloy types and methods async fn logs_with_sigs( &self, - logger: Logger, + logger: ProviderLogger, subgraph_metrics: Arc, from: BlockNumber, to: BlockNumber, @@ -416,7 +440,7 @@ impl EthereumAdapter { fn trace_stream( self, - logger: &Logger, + logger: ProviderLogger, subgraph_metrics: Arc, from: BlockNumber, to: BlockNumber, @@ -447,7 +471,6 @@ impl EthereumAdapter { }; let eth = self; - let logger = logger.clone(); futures03::stream::iter(ranges.into_iter().map(move |(start, end)| { let eth = eth.clone(); @@ -473,7 +496,7 @@ impl EthereumAdapter { fn log_stream( &self, - logger: Logger, + logger: ProviderLogger, subgraph_metrics: Arc, from: BlockNumber, to: BlockNumber, @@ -575,12 +598,11 @@ impl EthereumAdapter { async fn code( &self, - logger: &Logger, + logger: ProviderLogger, address: Address, block_ptr: BlockPtr, ) -> Result { let alloy = self.alloy.clone(); - let logger = Logger::new(logger, o!("provider" => self.provider.clone())); let block_id = self.block_ptr_to_id(&block_ptr); let retry_log_message = format!("eth_getCode RPC call for block {}", block_ptr); @@ -606,12 +628,11 @@ impl EthereumAdapter { async fn balance( &self, - logger: &Logger, + logger: ProviderLogger, address: Address, block_ptr: BlockPtr, ) -> Result { let alloy = self.alloy.clone(); - let logger = Logger::new(logger, o!("provider" => self.provider.clone())); let block_id = self.block_ptr_to_id(&block_ptr); let retry_log_message = format!("eth_getBalance RPC call for block {}", block_ptr); @@ -637,13 +658,12 @@ impl EthereumAdapter { async fn call( &self, - logger: Logger, + logger: ProviderLogger, call_data: call::Request, block_ptr: BlockPtr, gas: Option, ) -> Result { let alloy = self.alloy.clone(); - let logger = Logger::new(&logger, o!("provider" => self.provider.clone())); let alloy_block_id = self.block_ptr_to_id(&block_ptr); let retry_log_message = format!("eth_call RPC call for block {}", block_ptr); @@ -680,7 +700,7 @@ impl EthereumAdapter { async fn call_and_cache( &self, - logger: &Logger, + logger: &ProviderLogger, call: &ContractCall, req: call::Request, cache: Arc, @@ -896,7 +916,7 @@ impl EthereumAdapter { log_filter: EthereumLogFilter, ) -> DynTryFuture<'static, Vec, Error> { let eth: Self = self.cheap_clone(); - let logger = logger.clone(); + let logger = self.provider_logger(logger); futures03::stream::iter(log_filter.eth_get_logs_filters().map(move |filter| { eth.cheap_clone().log_stream( @@ -922,6 +942,7 @@ impl EthereumAdapter { call_filter: &'a EthereumCallFilter, ) -> Box + Send + 'a> { let eth = self.clone(); + let logger = self.provider_logger(logger); let EthereumCallFilter { contract_addresses_function_signatures, @@ -1039,6 +1060,7 @@ impl EthereumAdapter { block_hash: alloy::primitives::B256, ) -> Result, Error> { let eth = self.clone(); + let logger = self.provider_logger(logger); let addresses = Vec::new(); let traces: Vec = eth .trace_stream( @@ -1326,7 +1348,7 @@ impl EthereumAdapterTrait for EthereumAdapter { block: AnyBlock, ) -> Result { let alloy = self.alloy.clone(); - let logger = logger.clone(); + let logger = self.provider_logger(logger); let block_hash = block.header.hash; // The early return is necessary for correctness, otherwise we'll @@ -1364,6 +1386,7 @@ impl EthereumAdapterTrait for EthereumAdapter { address: Address, block_ptr: BlockPtr, ) -> Result { + let logger = self.provider_logger(logger); debug!( logger, "eth_getBalance"; "address" => format!("{}", address), @@ -1378,6 +1401,7 @@ impl EthereumAdapterTrait for EthereumAdapter { address: Address, block_ptr: BlockPtr, ) -> Result { + let logger = self.provider_logger(logger); debug!( logger, "eth_getCode"; "address" => format!("{}", address), @@ -1457,7 +1481,7 @@ impl EthereumAdapterTrait for EthereumAdapter { cache: Arc, ) -> Result>, call::Source)>, ContractCallError> { fn as_req( - logger: &Logger, + logger: &ProviderLogger, call: &ContractCall, index: u32, ) -> Result { @@ -1498,7 +1522,7 @@ impl EthereumAdapterTrait for EthereumAdapter { } fn decode( - logger: &Logger, + logger: &ProviderLogger, resp: call::Response, call: &ContractCall, ) -> (Option>, call::Source) { @@ -1528,7 +1552,7 @@ impl EthereumAdapterTrait for EthereumAdapter { } } - fn log_call_error(logger: &Logger, e: &ContractCallError, call: &ContractCall) { + fn log_call_error(logger: &ProviderLogger, e: &ContractCallError, call: &ContractCall) { match e { ContractCallError::AlloyError(e) => error!(logger, "Ethereum node returned an error when calling function \"{}\" of contract \"{}\": {}", @@ -1542,6 +1566,8 @@ impl EthereumAdapterTrait for EthereumAdapter { } } + let logger = self.provider_logger(logger); + if calls.is_empty() { return Ok(Vec::new()); } @@ -1556,7 +1582,7 @@ impl EthereumAdapterTrait for EthereumAdapter { let reqs: Vec<_> = calls .iter() .enumerate() - .map(|(index, call)| as_req(logger, call, index as u32)) + .map(|(index, call)| as_req(&logger, call, index as u32)) .collect::>()?; let (mut resps, missing) = cache @@ -1567,12 +1593,13 @@ impl EthereumAdapterTrait for EthereumAdapter { let futs = missing.into_iter().map(|req| { let cache = cache.clone(); + let logger = logger.clone(); async move { let call = calls[req.index as usize]; - match self.call_and_cache(logger, call, req, cache.clone()).await { + match self.call_and_cache(&logger, call, req, cache.clone()).await { Ok(resp) => Ok(resp), Err(e) => { - log_call_error(logger, &e, call); + log_call_error(&logger, &e, call); Err(e) } } @@ -1590,7 +1617,7 @@ impl EthereumAdapterTrait for EthereumAdapter { .into_iter() .map(|res| { let call = &calls[res.req.index as usize]; - decode(logger, res, call) + decode(&logger, res, call) }) .collect(); @@ -1693,6 +1720,7 @@ pub(crate) async fn blocks_with_triggers( // while searching for a trigger type, the entire operation fails. let eth = adapter.clone(); let call_filter = EthereumCallFilter::from(&filter.block); + let logger = ProviderLogger::new(&logger, eth.provider()); // Scan the block range to find relevant triggers let trigger_futs: FuturesUnordered, anyhow::Error>>> = @@ -1708,8 +1736,9 @@ pub(crate) async fn blocks_with_triggers( // This is for `start` triggers which can be initialization handlers which needs to be run // before all other triggers if filter.block.trigger_every_block { + let logger = logger.clone(); let block_future = eth - .block_range_to_ptrs(logger.clone(), from, to) + .block_range_to_ptrs((*logger).clone(), from, to) .map(move |ptrs| { ptrs.into_iter() .flat_map(|ptr| { @@ -1725,7 +1754,7 @@ pub(crate) async fn blocks_with_triggers( trigger_futs.push(block_future) } else if !filter.block.polling_intervals.is_empty() { let block_futures_matching_once_filter = - eth.blocks_matching_polling_intervals(logger.clone(), from, to, &filter.block); + eth.blocks_matching_polling_intervals((*logger).clone(), from, to, &filter.block); trigger_futs.push(block_futures_matching_once_filter); } @@ -1733,7 +1762,7 @@ pub(crate) async fn blocks_with_triggers( if !filter.log.is_empty() { let logs_future = get_logs_and_transactions( ð, - &logger, + logger.clone(), subgraph_metrics.clone(), from, to, @@ -1798,7 +1827,7 @@ pub(crate) async fn blocks_with_triggers( let logger2 = logger.cheap_clone(); let blocks: Vec<_> = eth - .load_blocks(logger.cheap_clone(), chain_store.clone(), block_hashes) + .load_blocks((*logger).cheap_clone(), chain_store.clone(), block_hashes) .await? .into_iter() .map( @@ -2047,7 +2076,7 @@ async fn filter_call_triggers_from_unsuccessful_transactions( mut block: BlockWithTriggers, eth: &EthereumAdapter, chain_store: &Arc, - logger: &Logger, + logger: &ProviderLogger, ) -> anyhow::Result> { // Return early if there is no trigger data if block.trigger_data.is_empty() { @@ -2187,7 +2216,7 @@ async fn fetch_transaction_receipts_in_batch_with_retry( alloy: Arc, hashes: Vec, block_hash: B256, - logger: Logger, + logger: ProviderLogger, ) -> Result>, IngestorError> { let retry_log_message = format!( "batch eth_getTransactionReceipt RPC call for block {:?}", @@ -2213,7 +2242,7 @@ async fn fetch_transaction_receipts_in_batch( alloy: Arc, hashes: Vec, block_hash: B256, - logger: Logger, + logger: ProviderLogger, ) -> Result>, IngestorError> { // Use the batch method to get all receipts at once let receipts = batch_get_transaction_receipts(alloy, hashes.clone()) @@ -2303,7 +2332,7 @@ async fn fetch_receipts_with_retry( alloy: Arc, hashes: Vec, block_hash: B256, - logger: Logger, + logger: ProviderLogger, supports_block_receipts: bool, ) -> Result>, IngestorError> { if supports_block_receipts { @@ -2317,7 +2346,7 @@ async fn fetch_individual_receipts_with_retry( alloy: Arc, hashes: Vec, block_hash: B256, - logger: Logger, + logger: ProviderLogger, ) -> Result>, IngestorError> { if ENV_VARS.fetch_receipts_in_batches { return fetch_transaction_receipts_in_batch_with_retry(alloy, hashes, block_hash, logger) @@ -2348,10 +2377,9 @@ async fn fetch_block_receipts_with_retry( alloy: Arc, hashes: Vec, block_hash: B256, - logger: Logger, + logger: ProviderLogger, ) -> Result>, IngestorError> { use graph::prelude::alloy::rpc::types::BlockId; - let logger = logger.cheap_clone(); let retry_log_message = format!("eth_getBlockReceipts RPC call for block {:?}", block_hash); // Perform the retry operation @@ -2393,9 +2421,8 @@ async fn fetch_transaction_receipt_with_retry( alloy: Arc, transaction_hash: B256, block_hash: B256, - logger: Logger, + logger: ProviderLogger, ) -> Result, IngestorError> { - let logger = logger.cheap_clone(); let retry_log_message = format!( "eth_getTransactionReceipt RPC call for transaction {:?}", transaction_hash @@ -2421,7 +2448,7 @@ fn resolve_transaction_receipt( transaction_receipt: Option, transaction_hash: B256, block_hash: B256, - logger: Logger, + logger: ProviderLogger, ) -> Result { match transaction_receipt { // A receipt might be missing because the block was uncled, and the transaction never @@ -2475,7 +2502,7 @@ fn resolve_transaction_receipt( /// Retrieves logs and the associated transaction receipts, if required by the [`EthereumLogFilter`]. async fn get_logs_and_transactions( adapter: &Arc, - logger: &Logger, + logger: ProviderLogger, subgraph_metrics: Arc, from: BlockNumber, to: BlockNumber, @@ -2485,7 +2512,7 @@ async fn get_logs_and_transactions( // Obtain logs externally let logs = adapter .logs_in_block_range( - logger, + &logger, subgraph_metrics.cheap_clone(), from, to, @@ -2554,7 +2581,7 @@ async fn get_transaction_receipts_for_transaction_hashes( adapter: &EthereumAdapter, transaction_hashes_by_block: &HashMap>, subgraph_metrics: Arc, - logger: Logger, + logger: ProviderLogger, ) -> Result>, anyhow::Error> { use std::collections::hash_map::Entry::Vacant;