diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 37c8167de27..28b9a8381ae 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -6,22 +6,39 @@ use std::time::Duration; use std::time::Instant; use clap::Parser; +use clap::ValueEnum; use indicatif::ProgressBar; -use vortex_bench::BenchmarkOutput; use vortex_bench::Engine; use vortex_bench::Format; use vortex_bench::Target; +use vortex_bench::create_output_writer; +use vortex_bench::datasets::gharchive::gharchive_parquet; +use vortex_bench::datasets::gharchive::gharchive_vortex; use vortex_bench::datasets::taxi_data::*; use vortex_bench::display::DisplayFormat; use vortex_bench::display::print_measurements_json; use vortex_bench::display::render_table; use vortex_bench::measurements::TimingMeasurement; +use vortex_bench::random_access::FieldPath; +use vortex_bench::random_access::ParquetProjectingAccessor; use vortex_bench::random_access::ParquetRandomAccessor; +use vortex_bench::random_access::ProjectingRandomAccessor; use vortex_bench::random_access::RandomAccessor; +use vortex_bench::random_access::VortexProjectingAccessor; use vortex_bench::random_access::VortexRandomAccessor; use vortex_bench::setup_logging_and_tracing; use vortex_bench::utils::constants::STORAGE_NVME; +/// Available datasets for random access benchmarks. +#[derive(Clone, Copy, Debug, Default, ValueEnum)] +enum Dataset { + /// NYC Taxi trip data - flat schema with many columns. + #[default] + Taxi, + /// GitHub Archive event data - deeply nested schema with struct fields. + GhArchive, +} + #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { @@ -32,6 +49,9 @@ struct Args { default_values_t = vec![Format::Parquet, Format::OnDiskVortex] )] formats: Vec, + /// Dataset to benchmark. + #[arg(long, value_enum, default_value_t = Dataset::Taxi)] + dataset: Dataset, /// Time limit in seconds for each benchmark target (e.g., 10 for 10 seconds). #[arg(long, default_value_t = 10)] time_limit: u64, @@ -51,21 +71,40 @@ async fn main() -> anyhow::Result<()> { setup_logging_and_tracing(args.verbose, args.tracing)?; - // Row count of the dataset is 3,339,715. - let indices = vec![10u64, 11, 12, 13, 100_000, 3_000_000]; - - run_random_access( - args.formats, - args.time_limit, - args.display_format, - indices, - args.output_path, - ) - .await + match args.dataset { + Dataset::Taxi => { + // Row count of the taxi dataset is 3,339,715. + let indices = vec![10u64, 11, 12, 13, 100_000, 3_000_000]; + run_taxi_random_access( + args.formats, + args.time_limit, + args.display_format, + indices, + args.output_path, + ) + .await + } + Dataset::GhArchive => { + // Run gharchive benchmark with nested field projection. + // The field path is payload.ref - a deeply nested string field. + let field_path = vec!["actor".to_string(), "login".to_string()]; + // Use smaller indices as gharchive may have fewer rows per row group. + let indices = vec![10u64, 11, 12, 13, 1_000, 10_000]; + run_gharchive_random_access( + args.formats, + args.time_limit, + args.display_format, + indices, + field_path, + args.output_path, + ) + .await + } + } } /// Create a random accessor for the given format using taxi data. -async fn get_accessor(format: Format) -> anyhow::Result> { +async fn get_taxi_accessor(format: Format) -> anyhow::Result> { match format { Format::OnDiskVortex => { let path = taxi_data_vortex().await?; @@ -91,6 +130,28 @@ async fn get_accessor(format: Format) -> anyhow::Result> } } +/// Create a projecting random accessor for the given format using gharchive data. +async fn get_gharchive_accessor( + format: Format, +) -> anyhow::Result> { + match format { + Format::OnDiskVortex => { + let path = gharchive_vortex().await?; + Ok(Box::new(VortexProjectingAccessor::new(path))) + } + Format::VortexCompact => { + // For now, use the same path as OnDiskVortex (compact not yet implemented for gharchive) + let path = gharchive_vortex().await?; + Ok(Box::new(VortexProjectingAccessor::compact(path))) + } + Format::Parquet => { + let path = gharchive_parquet().await?; + Ok(Box::new(ParquetProjectingAccessor::new(path))) + } + _ => unimplemented!("Projected random access bench not implemented for {format}"), + } +} + /// Run a random access benchmark for the given accessor. /// /// Runs the take operation repeatedly until the time limit is reached, @@ -125,6 +186,41 @@ async fn benchmark_random_access( }) } +/// Run a projected random access benchmark for the given accessor. +/// +/// Runs the take operation with field projection repeatedly until the time limit is reached, +/// collecting timing for each run. +async fn benchmark_projected_random_access( + accessor: &dyn ProjectingRandomAccessor, + indices: &[u64], + field_path: &FieldPath, + time_limit_secs: u64, + storage: &str, +) -> anyhow::Result { + let time_limit = Duration::from_secs(time_limit_secs); + let overall_start = Instant::now(); + let mut runs = Vec::new(); + + // Run at least once, then continue until time limit + loop { + let indices = indices.to_vec(); + let start = Instant::now(); + let _row_count = accessor.take_projected(indices, field_path).await?; + runs.push(start.elapsed()); + + if overall_start.elapsed() >= time_limit { + break; + } + } + + Ok(TimingMeasurement { + name: accessor.name().to_string(), + storage: storage.to_string(), + target: Target::new(format_to_engine(accessor.format()), accessor.format()), + runs, + }) +} + /// Map format to the appropriate engine for random access benchmarks. fn format_to_engine(format: Format) -> Engine { match format { @@ -137,9 +233,10 @@ fn format_to_engine(format: Format) -> Engine { } /// The benchmark ID used for output path. -const BENCHMARK_ID: &str = "random-access"; +const TAXI_BENCHMARK_ID: &str = "random-access-taxi"; +const GHARCHIVE_BENCHMARK_ID: &str = "random-access-gharchive"; -async fn run_random_access( +async fn run_taxi_random_access( formats: Vec, time_limit: u64, display_format: DisplayFormat, @@ -152,7 +249,7 @@ async fn run_random_access( let mut measurements = Vec::new(); for format in formats { - let accessor = get_accessor(format).await?; + let accessor = get_taxi_accessor(format).await?; let measurement = benchmark_random_access(accessor.as_ref(), &indices, time_limit, STORAGE_NVME).await?; @@ -164,8 +261,53 @@ async fn run_random_access( progress.finish(); - let output = BenchmarkOutput::with_path(BENCHMARK_ID, output_path); - let mut writer = output.create_writer()?; + let mut writer = create_output_writer(&display_format, output_path, TAXI_BENCHMARK_ID)?; + + match display_format { + DisplayFormat::Table => { + render_table(&mut writer, measurements, &targets)?; + } + DisplayFormat::GhJson => { + print_measurements_json(&mut writer, measurements)?; + } + } + + Ok(()) +} + +async fn run_gharchive_random_access( + formats: Vec, + time_limit: u64, + display_format: DisplayFormat, + indices: Vec, + field_path: FieldPath, + output_path: Option, +) -> anyhow::Result<()> { + let progress = ProgressBar::new(formats.len() as u64); + + let mut targets = Vec::new(); + let mut measurements = Vec::new(); + + for format in formats { + let accessor = get_gharchive_accessor(format).await?; + let measurement = benchmark_projected_random_access( + accessor.as_ref(), + &indices, + &field_path, + time_limit, + STORAGE_NVME, + ) + .await?; + + targets.push(measurement.target); + measurements.push(measurement); + + progress.inc(1); + } + + progress.finish(); + + let mut writer = create_output_writer(&display_format, output_path, GHARCHIVE_BENCHMARK_ID)?; match display_format { DisplayFormat::Table => { diff --git a/vortex-bench/src/datasets/gharchive.rs b/vortex-bench/src/datasets/gharchive.rs new file mode 100644 index 00000000000..401789605a4 --- /dev/null +++ b/vortex-bench/src/datasets/gharchive.rs @@ -0,0 +1,192 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! GitHub Archive dataset for random access benchmarks. +//! +//! This module provides functions to generate and access the GitHub Archive dataset +//! in both Parquet and Vortex formats. The dataset contains deeply nested event data +//! which is useful for benchmarking nested field access patterns. + +use std::path::PathBuf; +use std::process::Command; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::fs::File as TokioFile; +use tokio::io::AsyncWriteExt; +use tracing::info; +use vortex::array::ArrayRef; +use vortex::array::stream::ArrayStreamExt; +use vortex::file::OpenOptionsSessionExt; +use vortex::file::WriteOptionsSessionExt; + +use crate::CompactionStrategy; +use crate::IdempotentPath; +use crate::SESSION; +use crate::conversions::parquet_to_vortex_chunks; +use crate::datasets::Dataset; +use crate::idempotent; +use crate::idempotent_async; + +/// Template URL for raw JSON dataset. +fn raw_json_url(hour: usize) -> String { + assert!(hour <= 23); + format!("https://data.gharchive.org/2024-10-01-{hour}.json.gz") +} + +pub struct GhArchiveData; + +#[async_trait] +impl Dataset for GhArchiveData { + fn name(&self) -> &str { + "gharchive" + } + + async fn to_vortex_array(&self) -> Result { + fetch_gharchive_data().await + } + + async fn to_parquet_path(&self) -> Result { + gharchive_parquet().await + } +} + +/// Get the path to the compressed JSON data. +fn gharchive_json_path() -> PathBuf { + "gharchive/json/events.json.gz".to_data_path() +} + +/// Get the path to the Parquet file. +fn gharchive_parquet_path() -> PathBuf { + "gharchive/parquet/events.parquet".to_data_path() +} + +/// Download the GitHub Archive JSON data for all 24 hours of 2024-10-01. +pub async fn gharchive_json() -> Result { + idempotent_async(&gharchive_json_path(), |json_path| async move { + info!("Downloading GithubArchive JSON source files"); + let mut w = TokioFile::create(&json_path).await?; + let client = reqwest::Client::new(); + for hour in 0..=23 { + let url = raw_json_url(hour); + info!("Downloading archive {url}"); + let response = client + .get(url) + .send() + .await? + .error_for_status() + .map_err(|err| anyhow::anyhow!("error fetching gharchive data: {err}"))?; + + let body = response.bytes().await?; + + w.write_all(&body).await?; + w.flush().await?; + } + + Ok(json_path) + }) + .await +} + +/// Get the path to the Parquet file, generating it from JSON if necessary. +/// +/// This uses DuckDB to convert the JSON data to Parquet format. +pub async fn gharchive_parquet() -> Result { + let json = gharchive_json().await?; + let json_path_str = json.display().to_string(); + + idempotent(&gharchive_parquet_path(), move |parquet_path| { + let parquet_str = parquet_path.display().to_string(); + info!( + "Converting GithubArchive JSON to Parquet with DuckDB @ {}", + parquet_path.display() + ); + let result = Command::new("duckdb") + .arg("-c") + .arg(format!( + " + CREATE TABLE events AS select * from read_ndjson_auto('{json_path_str}', ignore_errors = true); + COPY events TO '{parquet_str}' (FORMAT parquet); + " + )) + .spawn()? + .wait()?; + + if !result.success() { + anyhow::bail!("DuckDB subprocess failed converting JSON to Parquet"); + } + + Ok(()) + }) +} + +/// Load the GitHub Archive data as a Vortex array. +pub async fn fetch_gharchive_data() -> Result { + let vortex_data = gharchive_vortex().await?; + Ok(SESSION + .open_options() + .open_path(vortex_data) + .await? + .scan()? + .into_array_stream()? + .read_all() + .await?) +} + +/// Get the path to the Vortex file, converting from Parquet if necessary. +pub async fn gharchive_vortex() -> Result { + idempotent_async( + "gharchive/vortex/events.vortex", + |output_fname| async move { + let buf = output_fname.to_path_buf(); + let mut output_file = TokioFile::create(output_fname).await?; + + let data = parquet_to_vortex_chunks(gharchive_parquet().await?).await?; + + SESSION + .write_options() + .write(&mut output_file, data.to_array_stream()) + .await?; + output_file.flush().await?; + Ok(buf) + }, + ) + .await +} + +/// Get the path to a compact Vortex file, converting from Parquet if necessary. +pub async fn gharchive_vortex_compact() -> Result { + idempotent_async( + "gharchive/vortex/events-compact.vortex", + |output_fname| async move { + let buf = output_fname.to_path_buf(); + let mut output_file = TokioFile::create(output_fname).await?; + + let write_options = CompactionStrategy::Compact.apply_options(SESSION.write_options()); + + let data = parquet_to_vortex_chunks(gharchive_parquet().await?).await?; + + write_options + .write(&mut output_file, data.to_array_stream()) + .await?; + + output_file.flush().await?; + Ok(buf) + }, + ) + .await +} + +/// Deeply nested fields in the GitHub Archive dataset that are useful for benchmarking. +/// +/// These fields represent common access patterns for nested data: +/// - `payload.ref` - String field nested under payload struct +/// - `repo.name` - String field nested under repo struct +/// - `actor.login` - String field nested under actor struct +/// - `org.id` - Integer field nested under org struct +pub const NESTED_FIELDS: &[(&str, &str)] = &[ + ("payload", "ref"), + ("repo", "name"), + ("actor", "login"), + ("org", "id"), +]; diff --git a/vortex-bench/src/datasets/mod.rs b/vortex-bench/src/datasets/mod.rs index 1b773938468..233defcfd63 100644 --- a/vortex-bench/src/datasets/mod.rs +++ b/vortex-bench/src/datasets/mod.rs @@ -12,6 +12,7 @@ use vortex::array::ArrayRef; use crate::clickbench::Flavor; pub mod data_downloads; +pub mod gharchive; pub mod struct_list_of_ints; pub mod taxi_data; pub mod tpch_l_comment; diff --git a/vortex-bench/src/random_access/mod.rs b/vortex-bench/src/random_access/mod.rs index 2193b76e673..26989ba72ff 100644 --- a/vortex-bench/src/random_access/mod.rs +++ b/vortex-bench/src/random_access/mod.rs @@ -11,7 +11,9 @@ use crate::Format; pub mod take; // Re-export implementations +pub use take::ParquetProjectingAccessor; pub use take::ParquetRandomAccessor; +pub use take::VortexProjectingAccessor; pub use take::VortexRandomAccessor; /// Trait for format-specific random access (take) operations. @@ -32,3 +34,28 @@ pub trait RandomAccessor: Send + Sync { /// Take rows at the given indices, returning the number of rows read. async fn take(&self, indices: Vec) -> Result; } + +/// A field path for nested field access. +/// +/// Represents a path to a nested field, e.g., `["payload", "ref"]` for `payload.ref`. +pub type FieldPath = Vec; + +/// Trait for format-specific random access with field projection. +/// +/// Extends RandomAccessor to support selecting specific nested fields. +#[async_trait] +pub trait ProjectingRandomAccessor: Send + Sync { + /// The format this accessor handles. + fn format(&self) -> Format; + + /// A descriptive name for this accessor (used in benchmark output). + fn name(&self) -> &str; + + /// The file path this accessor reads from. + fn path(&self) -> &PathBuf; + + /// Take rows at the given indices with a projection to a nested field. + /// + /// The field_path specifies the path to a nested field, e.g., `["payload", "ref"]`. + async fn take_projected(&self, indices: Vec, field_path: &FieldPath) -> Result; +} diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index 1f251c2d8db..1543d0bae33 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -24,11 +24,16 @@ use vortex::array::IntoArray; use vortex::array::VortexSessionExecute; use vortex::array::stream::ArrayStreamExt; use vortex::buffer::Buffer; +use vortex::expr::Expression; +use vortex::expr::get_item; +use vortex::expr::root; use vortex::file::OpenOptionsSessionExt; use vortex::utils::aliases::hash_map::HashMap; use crate::Format; use crate::SESSION; +use crate::random_access::FieldPath; +use crate::random_access::ProjectingRandomAccessor; use crate::random_access::RandomAccessor; /// Random accessor for Vortex format files. @@ -208,3 +213,297 @@ pub async fn take_parquet(path: &Path, indices: Vec) -> anyhow::Result Self { + Self { + path, + name: "random-access/vortex-projected-tokio-local-disk".to_string(), + format: Format::OnDiskVortex, + } + } + + /// Create a new projecting Vortex accessor with a custom name. + pub fn with_name(path: PathBuf, name: impl Into) -> Self { + Self { + path, + name: name.into(), + format: Format::OnDiskVortex, + } + } + + /// Create a new projecting Vortex accessor for compact format. + pub fn compact(path: PathBuf) -> Self { + Self { + path, + name: "random-access/vortex-compact-projected-tokio-local-disk".to_string(), + format: Format::VortexCompact, + } + } +} + +#[async_trait] +impl ProjectingRandomAccessor for VortexProjectingAccessor { + fn format(&self) -> Format { + self.format + } + + fn name(&self) -> &str { + &self.name + } + + fn path(&self) -> &PathBuf { + &self.path + } + + async fn take_projected( + &self, + indices: Vec, + field_path: &FieldPath, + ) -> anyhow::Result { + let result = take_vortex_projected(&self.path, indices.into(), field_path).await?; + Ok(result.len()) + } +} + +/// Random accessor for Parquet format files with field projection support. +pub struct ParquetProjectingAccessor { + path: PathBuf, + name: String, +} + +impl ParquetProjectingAccessor { + /// Create a new projecting Parquet accessor for the given file path. + pub fn new(path: PathBuf) -> Self { + Self { + path, + name: "random-access/parquet-projected-tokio-local-disk".to_string(), + } + } + + /// Create a new projecting Parquet accessor with a custom name. + pub fn with_name(path: PathBuf, name: impl Into) -> Self { + Self { + path, + name: name.into(), + } + } +} + +#[async_trait] +impl ProjectingRandomAccessor for ParquetProjectingAccessor { + fn format(&self) -> Format { + Format::Parquet + } + + fn name(&self) -> &str { + &self.name + } + + fn path(&self) -> &PathBuf { + &self.path + } + + async fn take_projected( + &self, + indices: Vec, + field_path: &FieldPath, + ) -> anyhow::Result { + let result = take_parquet_projected(&self.path, indices, field_path).await?; + Ok(result.num_rows()) + } +} + +/// Build a projection expression for a nested field path. +fn build_projection_expr(field_path: &FieldPath) -> Expression { + let mut expr = root(); + for field in field_path { + expr = get_item(field.as_str(), expr); + } + expr +} + +/// Take rows from a Vortex file with a projection to a nested field. +async fn take_vortex_projected( + reader: impl AsRef, + indices: Buffer, + field_path: &FieldPath, +) -> anyhow::Result { + let projection = build_projection_expr(field_path); + + let array = SESSION + .open_options() + .open_path(reader.as_ref()) + .await? + .scan()? + .with_projection(projection) + .with_row_indices(indices) + .into_array_stream()? + .read_all() + .await?; + + // We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es. + let mut ctx = SESSION.create_execution_ctx(); + Ok(array.execute::(&mut ctx)?.into_array()) +} + +/// Take rows from a Parquet file with a projection to a nested field. +pub async fn take_parquet_projected( + path: &Path, + indices: Vec, + field_path: &FieldPath, +) -> anyhow::Result { + let file = tokio::fs::File::open(path).await?; + + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + file, + ArrowReaderOptions::new().with_page_index(true), + ) + .await?; + + // Build projection mask for the nested field. + // For Parquet, we need to find the leaf column indices that correspond to the nested path. + let parquet_schema = builder.parquet_schema(); + let arrow_schema = builder.schema(); + + // Find the column indices for the projected field path. + let projection_mask = build_parquet_projection_mask(parquet_schema, arrow_schema, field_path)?; + + // We figure out which row groups we need to read and a selection filter for each of them. + let mut row_groups = HashMap::new(); + let row_group_offsets = iter::once(0) + .chain( + builder + .metadata() + .row_groups() + .iter() + .map(RowGroupMetaData::num_rows), + ) + .scan(0i64, |acc, x| { + *acc += x; + Some(*acc) + }) + .collect::>(); + + for idx in indices { + let row_group_idx = row_group_offsets + .binary_search(&(idx as i64)) + .unwrap_or_else(|e| e - 1); + row_groups + .entry(row_group_idx) + .or_insert_with(Vec::new) + .push((idx as i64) - row_group_offsets[row_group_idx]); + } + let row_group_indices = row_groups + .keys() + .sorted() + .map(|i| row_groups[i].clone()) + .collect_vec(); + + let reader = builder + .with_projection(projection_mask) + .with_row_groups(row_groups.keys().copied().collect_vec()) + // FIXME(ngates): our indices code assumes the batch size == the row group sizes + .with_batch_size(10_000_000) + .build()?; + + let schema = reader.schema().clone(); + + let batches = reader + .enumerate() + .map(|(idx, batch)| { + let batch = batch.unwrap(); + let indices = PrimitiveArray::::from(row_group_indices[idx].clone()); + take_record_batch(&batch, &indices).unwrap() + }) + .collect::>() + .await; + + Ok(concat_batches(&schema, &batches)?) +} + +/// Build a Parquet projection mask for a nested field path. +fn build_parquet_projection_mask( + parquet_schema: &parquet::schema::types::SchemaDescriptor, + arrow_schema: &arrow_schema::SchemaRef, + field_path: &FieldPath, +) -> anyhow::Result { + use parquet::arrow::ProjectionMask; + + if field_path.is_empty() { + return Ok(ProjectionMask::all()); + } + + // Find the leaf column indices for the nested field. + let leaf_indices = find_parquet_leaf_indices(parquet_schema, arrow_schema, field_path)?; + + Ok(ProjectionMask::leaves(parquet_schema, leaf_indices)) +} + +/// Find the leaf column indices in the Parquet schema for a nested field path. +fn find_parquet_leaf_indices( + parquet_schema: &parquet::schema::types::SchemaDescriptor, + arrow_schema: &arrow_schema::SchemaRef, + field_path: &FieldPath, +) -> anyhow::Result> { + use arrow_schema::DataType; + + // Navigate the Arrow schema to find the target field. + let mut current_field: Option<&arrow_schema::Field> = None; + + for (i, field_name) in field_path.iter().enumerate() { + if i == 0 { + // Find in root schema + current_field = arrow_schema.field_with_name(field_name).ok(); + } else if let Some(field) = current_field { + // Navigate into nested struct + match field.data_type() { + DataType::Struct(fields) => { + current_field = fields + .iter() + .find(|f| f.name() == field_name) + .map(|f| f.as_ref()); + } + _ => { + anyhow::bail!( + "Cannot navigate into non-struct field '{}' at path position {}", + field.name(), + i + ); + } + } + } + } + + // Now find all leaf columns under this field in the Parquet schema. + // We need to map from Arrow field to Parquet column indices. + let parquet_path = field_path.join("."); + + let mut leaf_indices = Vec::new(); + for (idx, col) in parquet_schema.columns().iter().enumerate() { + let col_path = col.path().string(); + if col_path.starts_with(&parquet_path) { + // Check if it's exactly the field or a child of it + if col_path == parquet_path || col_path.starts_with(&format!("{}.", parquet_path)) { + leaf_indices.push(idx); + } + } + } + + if leaf_indices.is_empty() { + anyhow::bail!( + "Could not find field path '{}' in Parquet schema", + parquet_path + ); + } + + Ok(leaf_indices) +}