From c78190d6470a7b9b9369addf646637a4db24171e Mon Sep 17 00:00:00 2001 From: Sebastian Lorenz Date: Tue, 22 Jul 2025 19:52:39 +0200 Subject: [PATCH 1/3] node, chain: Add extensible compression support for RPC requests - Replace boolean compression_enabled with Compression enum (None, Gzip) - Support per-provider compression configuration via "compression" field - Add placeholders for future compression methods (Brotli, Deflate) - Update transport layer to handle compression enum with match statement - Add comprehensive unit tests for compression configuration parsing - Update example configuration and documentation Configuration examples: compression = "gzip" # Enable gzip compression compression = "none" # Disable compression (default) Addresses issue #5671 with future-extensible design. --- Cargo.lock | 14 +++++ chain/ethereum/src/network.rs | 6 +++ chain/ethereum/src/transport.rs | 14 +++-- graph/Cargo.toml | 2 +- graph/src/endpoint.rs | 11 ++++ node/resources/tests/full_config.toml | 1 + node/src/chain.rs | 4 +- node/src/config.rs | 73 +++++++++++++++++++++++++++ 8 files changed, 118 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7fb578e722..7a9cd556781 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1385,6 +1385,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddb939d66e4ae03cee6091612804ba446b12878410cfa17f785f4dd67d4014e8" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-graphql" version = "7.2.1" @@ -6566,6 +6579,7 @@ version = "0.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ + "async-compression", "base64 0.22.1", "bytes", "encoding_rs", diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index 882720e55f1..b1cdcdfc2b8 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -314,6 +314,7 @@ mod tests { use graph::components::network_provider::ProviderManager; use graph::components::network_provider::ProviderName; use graph::data::value::Word; + use graph::endpoint::Compression; use graph::http::HeaderMap; use graph::{ endpoint::EndpointMetrics, @@ -395,6 +396,7 @@ mod tests { metrics.clone(), "", false, + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -499,6 +501,7 @@ mod tests { metrics.clone(), "", false, + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -571,6 +574,7 @@ mod tests { metrics.clone(), "", false, + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -636,6 +640,7 @@ mod tests { metrics.clone(), "", false, + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -924,6 +929,7 @@ mod tests { endpoint_metrics.clone(), "", false, + Compression::None, ); Arc::new( diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index 4923e9c7b82..87b41f8d682 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -1,6 +1,6 @@ use alloy::transports::{TransportError, TransportErrorKind, TransportFut}; use graph::components::network_provider::ProviderName; -use graph::endpoint::{ConnectionType, EndpointMetrics, RequestLabels}; +use graph::endpoint::{Compression, ConnectionType, EndpointMetrics, RequestLabels}; use graph::prelude::alloy::rpc::json_rpc::{RequestPacket, ResponsePacket}; use graph::prelude::alloy::transports::{ipc::IpcConnect, ws::WsConnect}; use graph::prelude::*; @@ -49,11 +49,15 @@ impl Transport { metrics: Arc, provider: impl AsRef, no_eip2718: bool, + compression: Compression, ) -> Self { - let client = reqwest::Client::builder() - .default_headers(headers) - .build() - .expect("Failed to build HTTP client"); + let mut client_builder = reqwest::Client::builder().default_headers(headers); + + if matches!(compression, Compression::Gzip) { + client_builder = client_builder.gzip(true); + } + + let client = client_builder.build().expect("Failed to build HTTP client"); let patching_transport = PatchingHttp::new(client, rpc, no_eip2718); let metrics_transport = diff --git a/graph/Cargo.toml b/graph/Cargo.toml index f5346202dbc..c601764e394 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -26,7 +26,7 @@ diesel_derives = { workspace = true } chrono = "0.4.43" envconfig = { workspace = true } Inflector = "0.11.3" -reqwest = { version = "0.12.23", features = ["json", "stream", "multipart"] } +reqwest = { version = "0.12.23", features = ["json", "stream", "multipart", "gzip"] } ethabi = "17.2" hex = "0.4.3" http0 = { version = "0", package = "http" } diff --git a/graph/src/endpoint.rs b/graph/src/endpoint.rs index 1b563fb52f0..5a7b755f715 100644 --- a/graph/src/endpoint.rs +++ b/graph/src/endpoint.rs @@ -7,6 +7,7 @@ use std::{ }; use prometheus::IntCounterVec; +use serde::{Deserialize, Serialize}; use slog::{warn, Logger}; use crate::components::network_provider::ProviderName; @@ -17,6 +18,16 @@ use crate::{components::metrics::MetricsRegistry, data::value::Word}; /// avoid locking since we don't need to modify the entire struture. type ProviderCount = Arc>; +/// Compression methods for RPC transports +#[derive(Copy, Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub enum Compression { + #[default] + #[serde(rename = "none")] + None, + #[serde(rename = "gzip")] + Gzip, +} + /// This struct represents all the current labels except for the result /// which is added separately. If any new labels are necessary they should /// remain in the same order as added in [`EndpointMetrics::new`] diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 057e774d93e..360cf76d97f 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -48,6 +48,7 @@ shard = "primary" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, + { label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive"], compression = "gzip" }}, { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, ] diff --git a/node/src/chain.rs b/node/src/chain.rs index e417ad48e6f..8253689543a 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -210,7 +210,8 @@ pub async fn create_ethereum_networks_for_chain( logger, "Creating transport"; "url" => &web3.url, - "capabilities" => capabilities + "capabilities" => capabilities, + "compression" => ?web3.compression ); use crate::config::Transport::*; @@ -223,6 +224,7 @@ pub async fn create_ethereum_networks_for_chain( endpoint_metrics.cheap_clone(), &provider.label, no_eip2718, + web3.compression, ), Ipc => Transport::new_ipc(&web3.url).await, Ws => Transport::new_ws(&web3.url).await, diff --git a/node/src/config.rs b/node/src/config.rs index c06b5298ac0..d64c9207d7d 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -2,6 +2,7 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, components::network_provider::ChainName, + endpoint::Compression, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -516,6 +517,7 @@ impl ChainSection { features, headers: Default::default(), rules: vec![], + compression: Compression::None, }), }; let entry = chains.entry(name.to_string()).or_insert_with(|| Chain { @@ -694,6 +696,10 @@ pub struct Web3Provider { #[serde(default, rename = "match")] rules: Vec, + + /// Compression method for RPC requests and responses + #[serde(default)] + pub compression: Compression, } impl Web3Provider { @@ -891,6 +897,7 @@ impl<'de> Deserialize<'de> for Provider { .ok_or_else(|| serde::de::Error::missing_field("features"))?, headers: headers.unwrap_or_else(HeaderMap::new), rules: nodes, + compression: Compression::None, }), }; @@ -1203,6 +1210,7 @@ mod tests { Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, Web3Provider, }; use graph::blockchain::BlockchainKind; + use graph::endpoint::Compression; use graph::firehose::SubgraphLimit; use graph::http::{HeaderMap, HeaderValue}; use graph::prelude::regex::Regex; @@ -1291,6 +1299,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1317,6 +1326,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1378,6 +1388,7 @@ mod tests { features, headers, rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1403,6 +1414,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1613,6 +1625,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1625,6 +1638,66 @@ mod tests { assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled); } + #[test] + fn it_parses_web3_provider_with_compression() { + let actual = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "gzip" } + "#, + ) + .unwrap(); + + assert_eq!( + Provider { + label: "compressed".to_owned(), + details: ProviderDetails::Web3(Web3Provider { + transport: Transport::Rpc, + url: "http://localhost:8545".to_owned(), + features: { + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features + }, + headers: HeaderMap::new(), + rules: Vec::new(), + compression: Compression::Gzip, + }), + }, + actual + ); + } + + #[test] + fn it_parses_web3_provider_with_no_compression() { + let actual = toml::from_str( + r#" + label = "uncompressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "none" } + "#, + ) + .unwrap(); + + assert_eq!( + Provider { + label: "uncompressed".to_owned(), + details: ProviderDetails::Web3(Web3Provider { + transport: Transport::Rpc, + url: "http://localhost:8545".to_owned(), + features: { + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features + }, + headers: HeaderMap::new(), + rules: Vec::new(), + compression: Compression::None, + }), + }, + actual + ); + } + #[test] fn duplicated_labels_are_not_allowed_within_chain() { let mut actual = toml::from_str::( From 071912f2d6549a265acefbd87607a708682ec8d0 Mon Sep 17 00:00:00 2001 From: Sebastian Lorenz Date: Tue, 26 Aug 2025 13:45:09 +0200 Subject: [PATCH 2/3] graph: Fix ArweaveClient to disable gzip compression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When reqwest gzip feature is enabled globally, Client::default() enables automatic gzip compression which removes content-length headers from responses. ArweaveClient needs content-length to check file sizes, so explicitly disable gzip for Arweave requests. Fixes test: polling_monitor::ipfs_service::test::arweave_get 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- graph/src/components/link_resolver/arweave.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graph/src/components/link_resolver/arweave.rs b/graph/src/components/link_resolver/arweave.rs index abd3d80a503..a9b66516dee 100644 --- a/graph/src/components/link_resolver/arweave.rs +++ b/graph/src/components/link_resolver/arweave.rs @@ -42,7 +42,7 @@ impl Default for ArweaveClient { Self { base_url: "https://arweave.net".parse().unwrap(), - client: Client::default(), + client: Client::builder().gzip(false).build().unwrap(), logger: Logger::root(slog::Discard, o!()), } } @@ -53,7 +53,7 @@ impl ArweaveClient { Self { base_url, logger, - client: Client::default(), + client: Client::builder().gzip(false).build().unwrap(), } } } From 8c7cc8b8610fc79d56129714e6d1adb716d7f277 Mon Sep 17 00:00:00 2001 From: DaMandal0rian Date: Sun, 1 Feb 2026 10:28:43 -0400 Subject: [PATCH 3/3] node, chain: Use features pattern for RPC compression configuration --- Cargo.lock | 37 ++++++ chain/ethereum/src/lib.rs | 2 +- chain/ethereum/src/network.rs | 6 +- chain/ethereum/src/transport.rs | 36 +++++- graph/Cargo.toml | 2 +- graph/src/endpoint.rs | 11 -- node/resources/tests/full_config.toml | 2 +- node/src/chain.rs | 5 +- node/src/config.rs | 159 +++++++++++++++++++------- 9 files changed, 195 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a9cd556781..1011044fad5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -1391,6 +1406,7 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddb939d66e4ae03cee6091612804ba446b12878410cfa17f785f4dd67d4014e8" dependencies = [ + "brotli", "flate2", "futures-core", "memchr", @@ -1870,6 +1886,27 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.4.0" diff --git a/chain/ethereum/src/lib.rs b/chain/ethereum/src/lib.rs index 8850764d63b..509d239bf01 100644 --- a/chain/ethereum/src/lib.rs +++ b/chain/ethereum/src/lib.rs @@ -14,7 +14,7 @@ mod transport; pub use self::capabilities::NodeCapabilities; pub use self::ethereum_adapter::EthereumAdapter; pub use self::runtime::RuntimeAdapter; -pub use self::transport::Transport; +pub use self::transport::{Compression, Transport}; pub use env::ENV_VARS; pub use buffered_call_cache::BufferedCallCache; diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index b1cdcdfc2b8..98240ddea07 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -314,7 +314,7 @@ mod tests { use graph::components::network_provider::ProviderManager; use graph::components::network_provider::ProviderName; use graph::data::value::Word; - use graph::endpoint::Compression; + use graph::http::HeaderMap; use graph::{ endpoint::EndpointMetrics, @@ -325,7 +325,9 @@ mod tests { }; use std::sync::Arc; - use crate::{EthereumAdapter, EthereumAdapterTrait, ProviderEthRpcMetrics, Transport}; + use crate::{ + Compression, EthereumAdapter, EthereumAdapterTrait, ProviderEthRpcMetrics, Transport, + }; use super::{EthereumNetworkAdapter, EthereumNetworkAdapters, NodeCapabilities}; diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index 87b41f8d682..3d96473e14c 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -1,6 +1,6 @@ use alloy::transports::{TransportError, TransportErrorKind, TransportFut}; use graph::components::network_provider::ProviderName; -use graph::endpoint::{Compression, ConnectionType, EndpointMetrics, RequestLabels}; +use graph::endpoint::{ConnectionType, EndpointMetrics, RequestLabels}; use graph::prelude::alloy::rpc::json_rpc::{RequestPacket, ResponsePacket}; use graph::prelude::alloy::transports::{ipc::IpcConnect, ws::WsConnect}; use graph::prelude::*; @@ -10,6 +10,27 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tower::Service; +/// Compression method for RPC requests. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum Compression { + #[default] + None, + Gzip, + Brotli, + Deflate, +} + +impl std::fmt::Display for Compression { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Compression::None => write!(f, "none"), + Compression::Gzip => write!(f, "gzip"), + Compression::Brotli => write!(f, "brotli"), + Compression::Deflate => write!(f, "deflate"), + } + } +} + /// Abstraction over different transport types for Alloy providers. #[derive(Clone, Debug)] pub enum Transport { @@ -53,8 +74,17 @@ impl Transport { ) -> Self { let mut client_builder = reqwest::Client::builder().default_headers(headers); - if matches!(compression, Compression::Gzip) { - client_builder = client_builder.gzip(true); + match compression { + Compression::None => {} + Compression::Gzip => { + client_builder = client_builder.gzip(true); + } + Compression::Brotli => { + client_builder = client_builder.brotli(true); + } + Compression::Deflate => { + client_builder = client_builder.deflate(true); + } } let client = client_builder.build().expect("Failed to build HTTP client"); diff --git a/graph/Cargo.toml b/graph/Cargo.toml index c601764e394..4934839cb57 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -26,7 +26,7 @@ diesel_derives = { workspace = true } chrono = "0.4.43" envconfig = { workspace = true } Inflector = "0.11.3" -reqwest = { version = "0.12.23", features = ["json", "stream", "multipart", "gzip"] } +reqwest = { version = "0.12.23", features = ["json", "stream", "multipart", "gzip", "brotli", "deflate"] } ethabi = "17.2" hex = "0.4.3" http0 = { version = "0", package = "http" } diff --git a/graph/src/endpoint.rs b/graph/src/endpoint.rs index 5a7b755f715..1b563fb52f0 100644 --- a/graph/src/endpoint.rs +++ b/graph/src/endpoint.rs @@ -7,7 +7,6 @@ use std::{ }; use prometheus::IntCounterVec; -use serde::{Deserialize, Serialize}; use slog::{warn, Logger}; use crate::components::network_provider::ProviderName; @@ -18,16 +17,6 @@ use crate::{components::metrics::MetricsRegistry, data::value::Word}; /// avoid locking since we don't need to modify the entire struture. type ProviderCount = Arc>; -/// Compression methods for RPC transports -#[derive(Copy, Clone, Debug, Default, Deserialize, Serialize, PartialEq)] -pub enum Compression { - #[default] - #[serde(rename = "none")] - None, - #[serde(rename = "gzip")] - Gzip, -} - /// This struct represents all the current labels except for the result /// which is added separately. If any new labels are necessary they should /// remain in the same order as added in [`EndpointMetrics::new`] diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 360cf76d97f..69cc0bbe02f 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -48,7 +48,7 @@ shard = "primary" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, - { label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive"], compression = "gzip" }}, + { label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive", "compression/gzip"] }}, { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, ] diff --git a/node/src/chain.rs b/node/src/chain.rs index 8253689543a..7d851918c70 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -206,12 +206,13 @@ pub async fn create_ethereum_networks_for_chain( } let logger = logger.new(o!("provider" => provider.label.clone())); + let compression = web3.compression(); info!( logger, "Creating transport"; "url" => &web3.url, "capabilities" => capabilities, - "compression" => ?web3.compression + "compression" => compression.to_string() ); use crate::config::Transport::*; @@ -224,7 +225,7 @@ pub async fn create_ethereum_networks_for_chain( endpoint_metrics.cheap_clone(), &provider.label, no_eip2718, - web3.compression, + compression, ), Ipc => Transport::new_ipc(&web3.url).await, Ws => Transport::new_ws(&web3.url).await, diff --git a/node/src/config.rs b/node/src/config.rs index d64c9207d7d..9751844535f 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -2,7 +2,6 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, components::network_provider::ChainName, - endpoint::Compression, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -18,7 +17,7 @@ use graph::{ }, }; use graph_chain_ethereum as ethereum; -use graph_chain_ethereum::NodeCapabilities; +use graph_chain_ethereum::{Compression, NodeCapabilities}; use graph_store_postgres::{DeploymentPlacer, Shard as ShardName, PRIMARY_SHARD}; use graph::http::{HeaderMap, Uri}; @@ -517,7 +516,6 @@ impl ChainSection { features, headers: Default::default(), rules: vec![], - compression: Compression::None, }), }; let entry = chains.entry(name.to_string()).or_insert_with(|| Chain { @@ -696,10 +694,6 @@ pub struct Web3Provider { #[serde(default, rename = "match")] rules: Vec, - - /// Compression method for RPC requests and responses - #[serde(default)] - pub compression: Compression, } impl Web3Provider { @@ -710,6 +704,18 @@ impl Web3Provider { } } + pub fn compression(&self) -> Compression { + if self.features.contains("compression/gzip") { + Compression::Gzip + } else if self.features.contains("compression/brotli") { + Compression::Brotli + } else if self.features.contains("compression/deflate") { + Compression::Deflate + } else { + Compression::None + } + } + pub fn limit_for(&self, node: &NodeId) -> SubgraphLimit { self.rules.limit_for(node) } @@ -721,7 +727,15 @@ impl Web3Provider { /// - `no_eip1898`: Provider doesn't support EIP-1898 (block parameter by hash/number object) /// - `no_eip2718`: Provider doesn't return the `type` field in transaction receipts. /// When set, receipts are patched to add `"type": "0x0"` for legacy transaction compatibility. -const PROVIDER_FEATURES: [&str; 4] = ["traces", "archive", "no_eip1898", "no_eip2718"]; +const PROVIDER_FEATURES: [&str; 7] = [ + "traces", + "archive", + "no_eip1898", + "no_eip2718", + "compression/gzip", + "compression/brotli", + "compression/deflate", +]; const DEFAULT_PROVIDER_FEATURES: [&str; 2] = ["traces", "archive"]; impl Provider { @@ -782,6 +796,18 @@ impl Provider { } } + let compression_count = web3 + .features + .iter() + .filter(|f| f.starts_with("compression/")) + .count(); + if compression_count > 1 { + return Err(anyhow!( + "at most one compression method allowed for provider {}", + self.label + )); + } + web3.url = shellexpand::env(&web3.url)?.into_owned(); let label = &self.label; @@ -897,7 +923,6 @@ impl<'de> Deserialize<'de> for Provider { .ok_or_else(|| serde::de::Error::missing_field("features"))?, headers: headers.unwrap_or_else(HeaderMap::new), rules: nodes, - compression: Compression::None, }), }; @@ -1210,11 +1235,11 @@ mod tests { Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, Web3Provider, }; use graph::blockchain::BlockchainKind; - use graph::endpoint::Compression; use graph::firehose::SubgraphLimit; use graph::http::{HeaderMap, HeaderValue}; use graph::prelude::regex::Regex; use graph::prelude::{toml, NodeId}; + use graph_chain_ethereum::Compression; use std::collections::BTreeSet; use std::fs::read_to_string; use std::path::{Path, PathBuf}; @@ -1299,7 +1324,6 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), - compression: Compression::None, }), }, actual @@ -1326,7 +1350,6 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), - compression: Compression::None, }), }, actual @@ -1388,7 +1411,6 @@ mod tests { features, headers, rules: Vec::new(), - compression: Compression::None, }), }, actual @@ -1414,7 +1436,6 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), - compression: Compression::None, }), }, actual @@ -1625,7 +1646,6 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), - compression: Compression::None, }), }, actual @@ -1639,62 +1659,113 @@ mod tests { } #[test] - fn it_parses_web3_provider_with_compression() { - let actual = toml::from_str( + fn it_parses_web3_provider_with_gzip_compression_feature() { + let actual: Provider = toml::from_str( r#" label = "compressed" - details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "gzip" } + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/gzip"] } "#, ) .unwrap(); + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features.insert("compression/gzip".to_string()); + assert_eq!( Provider { label: "compressed".to_owned(), details: ProviderDetails::Web3(Web3Provider { transport: Transport::Rpc, url: "http://localhost:8545".to_owned(), - features: { - let mut features = BTreeSet::new(); - features.insert("archive".to_string()); - features - }, + features, headers: HeaderMap::new(), rules: Vec::new(), - compression: Compression::Gzip, }), }, actual ); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Gzip); + } + _ => panic!("expected Web3 provider"), + } } #[test] - fn it_parses_web3_provider_with_no_compression() { - let actual = toml::from_str( + fn it_parses_web3_provider_with_brotli_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/brotli"] } + "#, + ) + .unwrap(); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Brotli); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_parses_web3_provider_with_deflate_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/deflate"] } + "#, + ) + .unwrap(); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Deflate); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_parses_web3_provider_without_compression_feature() { + let actual: Provider = toml::from_str( r#" label = "uncompressed" - details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "none" } + details = { type = "web3", url = "http://localhost:8545", features = ["archive"] } "#, ) .unwrap(); - assert_eq!( - Provider { - label: "uncompressed".to_owned(), - details: ProviderDetails::Web3(Web3Provider { - transport: Transport::Rpc, - url: "http://localhost:8545".to_owned(), - features: { - let mut features = BTreeSet::new(); - features.insert("archive".to_string()); - features - }, - headers: HeaderMap::new(), - rules: Vec::new(), - compression: Compression::None, - }), - }, - actual + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::None); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_rejects_multiple_compression_features() { + let mut actual: Provider = toml::from_str( + r#" + label = "multi-comp" + details = { type = "web3", url = "http://localhost:8545", features = ["compression/gzip", "compression/brotli"] } + "#, + ) + .unwrap(); + + let err = actual.validate(); + assert!(err.is_err()); + let err = err.unwrap_err(); + assert!( + err.to_string() + .contains("at most one compression method allowed"), + "result: {:?}", + err ); }