From 9df4d7d392fd878157f2a5e11f7f3a0bd0277b79 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Mon, 9 Feb 2026 16:03:57 +0000 Subject: [PATCH 1/6] feat[bench]: add more random access Signed-off-by: Joe Isaacs --- Cargo.lock | 2 + benchmarks/lance-bench/src/random_access.rs | 48 ++++- benchmarks/random-access-bench/Cargo.toml | 2 + benchmarks/random-access-bench/src/main.rs | 211 ++++++++++++++++--- encodings/runend/benches/run_end_compress.rs | 2 +- vortex-bench/src/conversions.rs | 22 ++ vortex-bench/src/datasets/feature_vectors.rs | 105 +++++++++ vortex-bench/src/datasets/mod.rs | 3 + vortex-bench/src/datasets/nested_lists.rs | 103 +++++++++ vortex-bench/src/datasets/nested_structs.rs | 130 ++++++++++++ vortex-bench/src/random_access/take.rs | 17 +- 11 files changed, 610 insertions(+), 35 deletions(-) create mode 100644 vortex-bench/src/datasets/feature_vectors.rs create mode 100644 vortex-bench/src/datasets/nested_lists.rs create mode 100644 vortex-bench/src/datasets/nested_structs.rs diff --git a/Cargo.lock b/Cargo.lock index d44bf39ab22..b7f68dd8611 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7923,6 +7923,8 @@ dependencies = [ "clap", "indicatif", "lance-bench", + "rand 0.9.2", + "rand_distr 0.5.1", "tokio", "vortex", "vortex-bench", diff --git a/benchmarks/lance-bench/src/random_access.rs b/benchmarks/lance-bench/src/random_access.rs index 83ff5e7b314..d3ce1d9206d 100644 --- a/benchmarks/lance-bench/src/random_access.rs +++ b/benchmarks/lance-bench/src/random_access.rs @@ -12,14 +12,18 @@ use lance::dataset::WriteParams; use lance_encoding::version::LanceFileVersion; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use vortex_bench::Format; +use vortex_bench::datasets::feature_vectors::feature_vectors_parquet; +use vortex_bench::datasets::nested_lists::nested_lists_parquet; +use vortex_bench::datasets::nested_structs::nested_structs_parquet; use vortex_bench::datasets::taxi_data::taxi_data_parquet; use vortex_bench::idempotent_async; use vortex_bench::random_access::RandomAccessor; -pub async fn taxi_data_lance() -> anyhow::Result { - idempotent_async("taxi/taxi.lance", |output_fname| async move { - let parquet_path = taxi_data_parquet().await?; - +/// Convert a parquet file to lance format. +/// +/// Uses `idempotent_async` to skip conversion if the output already exists. +async fn parquet_to_lance_file(parquet_path: PathBuf, lance_path: &str) -> anyhow::Result { + idempotent_async(lance_path, |output_fname| async move { let file = File::open(&parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; let reader = builder.build()?; @@ -39,13 +43,45 @@ pub async fn taxi_data_lance() -> anyhow::Result { .await } +pub async fn taxi_data_lance() -> anyhow::Result { + let parquet_path = taxi_data_parquet().await?; + parquet_to_lance_file(parquet_path, "taxi/taxi.lance").await +} + +pub async fn feature_vectors_lance() -> anyhow::Result { + let parquet_path = feature_vectors_parquet().await?; + parquet_to_lance_file(parquet_path, "feature_vectors/feature_vectors.lance").await +} + +pub async fn nested_lists_lance() -> anyhow::Result { + let parquet_path = nested_lists_parquet().await?; + parquet_to_lance_file(parquet_path, "nested_lists/nested_lists.lance").await +} + +pub async fn nested_structs_lance() -> anyhow::Result { + let parquet_path = nested_structs_parquet().await?; + parquet_to_lance_file(parquet_path, "nested_structs/nested_structs.lance").await +} + pub struct LanceRandomAccessor { path: PathBuf, + name: String, } impl LanceRandomAccessor { pub fn new(path: PathBuf) -> Self { - Self { path } + Self { + path, + name: "random-access/lance-tokio-local-disk".to_string(), + } + } + + /// Create a new Lance random accessor with a custom name. + pub fn with_name(path: PathBuf, name: impl Into) -> Self { + Self { + path, + name: name.into(), + } } } @@ -56,7 +92,7 @@ impl RandomAccessor for LanceRandomAccessor { } fn name(&self) -> &str { - "random-access/lance-tokio-local-disk" + &self.name } fn path(&self) -> &PathBuf { diff --git a/benchmarks/random-access-bench/Cargo.toml b/benchmarks/random-access-bench/Cargo.toml index 6186090e3e4..2b5fad7b1e3 100644 --- a/benchmarks/random-access-bench/Cargo.toml +++ b/benchmarks/random-access-bench/Cargo.toml @@ -19,6 +19,8 @@ anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } indicatif = { workspace = true } lance-bench = { path = "../lance-bench", optional = true } +rand = { workspace = true } +rand_distr = { workspace = true } tokio = { workspace = true, features = ["full"] } vortex = { workspace = true } vortex-bench = { workspace = true } diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 37c8167de27..483e7c242c4 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -6,11 +6,20 @@ use std::time::Duration; use std::time::Instant; use clap::Parser; +use clap::ValueEnum; use indicatif::ProgressBar; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use rand_distr::Distribution; +use rand_distr::Exp; use vortex_bench::BenchmarkOutput; use vortex_bench::Engine; use vortex_bench::Format; use vortex_bench::Target; +use vortex_bench::datasets::feature_vectors::*; +use vortex_bench::datasets::nested_lists::*; +use vortex_bench::datasets::nested_structs::*; use vortex_bench::datasets::taxi_data::*; use vortex_bench::display::DisplayFormat; use vortex_bench::display::print_measurements_json; @@ -22,6 +31,110 @@ use vortex_bench::random_access::VortexRandomAccessor; use vortex_bench::setup_logging_and_tracing; use vortex_bench::utils::constants::STORAGE_NVME; +/// Which synthetic dataset to benchmark. +#[derive(ValueEnum, Clone, Copy, Debug)] +enum DatasetArg { + #[clap(name = "taxi")] + Taxi, + #[clap(name = "feature-vectors")] + FeatureVectors, + #[clap(name = "nested-lists")] + NestedLists, + #[clap(name = "nested-structs")] + NestedStructs, +} + +impl DatasetArg { + fn name(&self) -> &'static str { + match self { + DatasetArg::Taxi => "taxi", + DatasetArg::FeatureVectors => "feature-vectors", + DatasetArg::NestedLists => "nested-lists", + DatasetArg::NestedStructs => "nested-structs", + } + } + + fn row_count(&self) -> u64 { + match self { + DatasetArg::Taxi => 3_339_715, + _ => 1_000_000, + } + } +} + +/// Access pattern for random access benchmarks. +#[derive(Clone, Copy, Debug)] +enum AccessPattern { + /// Multiple clusters of sequential indices scattered across the dataset, + /// simulating workloads with spatial locality (e.g. scanning nearby records). + Correlated, + /// Indices generated by a Poisson process (exponential inter-arrival times) + /// spread uniformly across the dataset, simulating random lookups with no locality. + Uniform, +} + +impl AccessPattern { + fn name(&self) -> &'static str { + match self { + AccessPattern::Correlated => "correlated", + AccessPattern::Uniform => "uniform", + } + } +} + +const ACCESS_PATTERNS: [AccessPattern; 2] = [AccessPattern::Correlated, AccessPattern::Uniform]; + +/// Number of clusters for the correlated pattern. +const NUM_CLUSTERS: usize = 5; + +/// Number of consecutive indices per cluster. +const CLUSTER_SIZE: usize = 20; + +/// Expected number of indices for the Poisson (uniform) pattern. +const POISSON_EXPECTED_COUNT: usize = 100; + +/// Generate indices for the given dataset and access pattern. +fn generate_indices(dataset: DatasetArg, pattern: AccessPattern) -> Vec { + let row_count = dataset.row_count(); + let mut rng = StdRng::seed_from_u64(42); + + match pattern { + AccessPattern::Correlated => { + // Pick random cluster starts, then emit CLUSTER_SIZE consecutive indices from each. + let mut indices = Vec::with_capacity(NUM_CLUSTERS * CLUSTER_SIZE); + for _ in 0..NUM_CLUSTERS { + let start = rng.random_range(0..row_count.saturating_sub(CLUSTER_SIZE as u64)); + for offset in 0..CLUSTER_SIZE as u64 { + indices.push(start + offset); + } + } + indices.sort_unstable(); + indices + } + AccessPattern::Uniform => { + // Poisson process: exponential inter-arrival times with rate chosen to yield + // ~POISSON_EXPECTED_COUNT indices across the dataset. + let rate = POISSON_EXPECTED_COUNT as f64 / row_count as f64; + // SAFETY: rate is always positive (POISSON_EXPECTED_COUNT > 0, row_count > 0). + #[allow(clippy::unwrap_used)] + let exp = Exp::new(rate).unwrap(); + let mut indices = Vec::with_capacity(POISSON_EXPECTED_COUNT); + let mut pos = 0.0_f64; + loop { + let gap: f64 = exp.sample(&mut rng); + pos += gap; + #[allow(clippy::cast_possible_truncation)] + let idx = pos as u64; + if idx >= row_count { + break; + } + indices.push(idx); + } + indices + } + } +} + #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { @@ -43,6 +156,9 @@ struct Args { display_format: DisplayFormat, #[arg(short)] output_path: Option, + /// Which dataset to benchmark random access on. + #[arg(long, value_enum, default_value_t = DatasetArg::Taxi)] + dataset: DatasetArg, } #[tokio::main] @@ -51,41 +167,82 @@ async fn main() -> anyhow::Result<()> { setup_logging_and_tracing(args.verbose, args.tracing)?; - // Row count of the dataset is 3,339,715. - let indices = vec![10u64, 11, 12, 13, 100_000, 3_000_000]; - run_random_access( + args.dataset, args.formats, args.time_limit, args.display_format, - indices, args.output_path, ) .await } -/// Create a random accessor for the given format using taxi data. -async fn get_accessor(format: Format) -> anyhow::Result> { +/// Create a random accessor for the given format and dataset. +async fn get_accessor( + dataset: DatasetArg, + format: Format, + pattern: AccessPattern, +) -> anyhow::Result> { + let ds_name = dataset.name(); + let pat_name = pattern.name(); + match format { Format::OnDiskVortex => { - let path = taxi_data_vortex().await?; - Ok(Box::new(VortexRandomAccessor::new(path))) + let path = match dataset { + DatasetArg::Taxi => taxi_data_vortex().await?, + DatasetArg::FeatureVectors => feature_vectors_vortex().await?, + DatasetArg::NestedLists => nested_lists_vortex().await?, + DatasetArg::NestedStructs => nested_structs_vortex().await?, + }; + Ok(Box::new(VortexRandomAccessor::with_name_and_format( + path, + format!("random-access/{ds_name}/{pat_name}/vortex-tokio-local-disk"), + Format::OnDiskVortex, + ))) } Format::VortexCompact => { - let path = taxi_data_vortex_compact().await?; - Ok(Box::new(VortexRandomAccessor::compact(path))) + let path = match dataset { + DatasetArg::Taxi => taxi_data_vortex_compact().await?, + DatasetArg::FeatureVectors => feature_vectors_vortex_compact().await?, + DatasetArg::NestedLists => nested_lists_vortex_compact().await?, + DatasetArg::NestedStructs => nested_structs_vortex_compact().await?, + }; + Ok(Box::new(VortexRandomAccessor::with_name_and_format( + path, + format!("random-access/{ds_name}/{pat_name}/vortex-compact-tokio-local-disk"), + Format::VortexCompact, + ))) } Format::Parquet => { - let path = taxi_data_parquet().await?; - Ok(Box::new(ParquetRandomAccessor::new(path))) + let path = match dataset { + DatasetArg::Taxi => taxi_data_parquet().await?, + DatasetArg::FeatureVectors => feature_vectors_parquet().await?, + DatasetArg::NestedLists => nested_lists_parquet().await?, + DatasetArg::NestedStructs => nested_structs_parquet().await?, + }; + Ok(Box::new(ParquetRandomAccessor::with_name( + path, + format!("random-access/{ds_name}/{pat_name}/parquet-tokio-local-disk"), + ))) } #[cfg(feature = "lance")] Format::Lance => { use lance_bench::random_access::LanceRandomAccessor; - use lance_bench::random_access::taxi_data_lance; - let path = taxi_data_lance().await?; - Ok(Box::new(LanceRandomAccessor::new(path))) + let path = match dataset { + DatasetArg::Taxi => lance_bench::random_access::taxi_data_lance().await?, + DatasetArg::FeatureVectors => { + lance_bench::random_access::feature_vectors_lance().await? + } + DatasetArg::NestedLists => lance_bench::random_access::nested_lists_lance().await?, + DatasetArg::NestedStructs => { + lance_bench::random_access::nested_structs_lance().await? + } + }; + Ok(Box::new(LanceRandomAccessor::with_name( + path, + format!("random-access/{ds_name}/{pat_name}/lance-tokio-local-disk"), + ))) } _ => unimplemented!("Random access bench not implemented for {format}"), } @@ -140,26 +297,32 @@ fn format_to_engine(format: Format) -> Engine { const BENCHMARK_ID: &str = "random-access"; async fn run_random_access( + dataset: DatasetArg, formats: Vec, time_limit: u64, display_format: DisplayFormat, - indices: Vec, output_path: Option, ) -> anyhow::Result<()> { - let progress = ProgressBar::new(formats.len() as u64); + // Total work: formats x patterns + let total_steps = formats.len() * ACCESS_PATTERNS.len(); + let progress = ProgressBar::new(total_steps as u64); let mut targets = Vec::new(); let mut measurements = Vec::new(); - for format in formats { - let accessor = get_accessor(format).await?; - let measurement = - benchmark_random_access(accessor.as_ref(), &indices, time_limit, STORAGE_NVME).await?; + for format in &formats { + for pattern in &ACCESS_PATTERNS { + let accessor = get_accessor(dataset, *format, *pattern).await?; + let indices = generate_indices(dataset, *pattern); + let measurement = + benchmark_random_access(accessor.as_ref(), &indices, time_limit, STORAGE_NVME) + .await?; - targets.push(measurement.target); - measurements.push(measurement); + targets.push(measurement.target); + measurements.push(measurement); - progress.inc(1); + progress.inc(1); + } } progress.finish(); diff --git a/encodings/runend/benches/run_end_compress.rs b/encodings/runend/benches/run_end_compress.rs index aeb0d9b9c73..d8fc9aedeb5 100644 --- a/encodings/runend/benches/run_end_compress.rs +++ b/encodings/runend/benches/run_end_compress.rs @@ -97,5 +97,5 @@ fn take_indices(bencher: Bencher, (length, run_step): (usize, usize)) { bencher .with_inputs(|| (&source_array, &runend_array)) - .bench_refs(|(array, indices)| take(array.as_ref(), indices.as_ref()).unwrap()); + .bench_refs(|(array, indices)| array.as_ref().take(indices.to_array()).unwrap()); } diff --git a/vortex-bench/src/conversions.rs b/vortex-bench/src/conversions.rs index 761b720865d..c6f512d66e0 100644 --- a/vortex-bench/src/conversions.rs +++ b/vortex-bench/src/conversions.rs @@ -13,6 +13,7 @@ use sysinfo::System; use tokio::fs::File; use tokio::fs::OpenOptions; use tokio::fs::create_dir_all; +use tokio::io::AsyncWriteExt; use tracing::Instrument; use tracing::info; use tracing::trace; @@ -201,3 +202,24 @@ pub async fn convert_parquet_directory_to_vortex( Ok(()) } + +/// Convert a Parquet file to Vortex format with the specified compaction strategy. +/// +/// Uses `idempotent_async` to skip conversion if the output file already exists. +pub async fn write_parquet_as_vortex( + parquet_path: PathBuf, + vortex_path: &str, + compaction: CompactionStrategy, +) -> anyhow::Result { + idempotent_async(vortex_path, |output_fname| async move { + let mut output_file = File::create(&output_fname).await?; + let data = parquet_to_vortex_chunks(parquet_path).await?; + let write_options = compaction.apply_options(SESSION.write_options()); + write_options + .write(&mut output_file, data.to_array_stream()) + .await?; + output_file.flush().await?; + Ok(()) + }) + .await +} diff --git a/vortex-bench/src/datasets/feature_vectors.rs b/vortex-bench/src/datasets/feature_vectors.rs new file mode 100644 index 00000000000..b260260f975 --- /dev/null +++ b/vortex-bench/src/datasets/feature_vectors.rs @@ -0,0 +1,105 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Result; +use arrow_array::Int64Array; +use arrow_array::RecordBatch; +use arrow_array::builder::FixedSizeListBuilder; +use arrow_array::builder::Float32Builder; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use parquet::arrow::ArrowWriter; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; + +use crate::CompactionStrategy; +use crate::conversions::write_parquet_as_vortex; +use crate::idempotent_async; + +/// Number of rows in the feature vectors dataset. +pub const ROW_COUNT: usize = 1_000_000; + +/// Dimensionality of each feature vector. +const VECTOR_DIM: i32 = 1024; + +/// Batch size for data generation. +const BATCH_SIZE: usize = 100_000; + +/// Generate a synthetic feature vectors parquet file. +/// +/// Schema: `id: Int64, embedding: FixedSizeList`. +/// This simulates a table of embedding vectors, common in ML workloads. +pub async fn feature_vectors_parquet() -> Result { + idempotent_async( + "feature_vectors/feature_vectors.parquet", + |temp_path| async move { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "embedding", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + VECTOR_DIM, + ), + false, + ), + ])); + + let file = std::fs::File::create(&temp_path)?; + let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?; + let mut rng = StdRng::seed_from_u64(42); + + for batch_start in (0..ROW_COUNT).step_by(BATCH_SIZE) { + let batch_len = BATCH_SIZE.min(ROW_COUNT - batch_start); + + let ids = Int64Array::from_iter_values( + (batch_start as i64)..((batch_start + batch_len) as i64), + ); + + let mut list_builder = FixedSizeListBuilder::new(Float32Builder::new(), VECTOR_DIM); + for _ in 0..batch_len { + for _ in 0..VECTOR_DIM { + list_builder.values().append_value(rng.random::()); + } + list_builder.append(true); + } + let embedding = list_builder.finish(); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(embedding)])?; + writer.write(&batch)?; + } + + writer.close()?; + Ok(()) + }, + ) + .await +} + +/// Get the path to the feature vectors vortex file, converting from parquet if needed. +pub async fn feature_vectors_vortex() -> Result { + let parquet_path = feature_vectors_parquet().await?; + write_parquet_as_vortex( + parquet_path, + "feature_vectors/feature_vectors.vortex", + CompactionStrategy::Default, + ) + .await +} + +/// Get the path to the feature vectors compact vortex file, converting from parquet if needed. +pub async fn feature_vectors_vortex_compact() -> Result { + let parquet_path = feature_vectors_parquet().await?; + write_parquet_as_vortex( + parquet_path, + "feature_vectors/feature_vectors-compact.vortex", + CompactionStrategy::Compact, + ) + .await +} diff --git a/vortex-bench/src/datasets/mod.rs b/vortex-bench/src/datasets/mod.rs index 1b773938468..e7f53156500 100644 --- a/vortex-bench/src/datasets/mod.rs +++ b/vortex-bench/src/datasets/mod.rs @@ -12,6 +12,9 @@ use vortex::array::ArrayRef; use crate::clickbench::Flavor; pub mod data_downloads; +pub mod feature_vectors; +pub mod nested_lists; +pub mod nested_structs; pub mod struct_list_of_ints; pub mod taxi_data; pub mod tpch_l_comment; diff --git a/vortex-bench/src/datasets/nested_lists.rs b/vortex-bench/src/datasets/nested_lists.rs new file mode 100644 index 00000000000..9306e04077e --- /dev/null +++ b/vortex-bench/src/datasets/nested_lists.rs @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Result; +use arrow_array::Int64Array; +use arrow_array::RecordBatch; +use arrow_array::builder::Int64Builder; +use arrow_array::builder::ListBuilder; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use parquet::arrow::ArrowWriter; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; + +use crate::CompactionStrategy; +use crate::conversions::write_parquet_as_vortex; +use crate::idempotent_async; + +/// Number of rows in the nested lists dataset. +pub const ROW_COUNT: usize = 1_000_000; + +/// Maximum number of elements in each list. +const MAX_LIST_LEN: usize = 20; + +/// Batch size for data generation. +const BATCH_SIZE: usize = 100_000; + +/// Generate a synthetic nested lists parquet file. +/// +/// Schema: `id: Int64, values: List`. +/// Each row contains a variable-length list of 1 to 20 random integers. +pub async fn nested_lists_parquet() -> Result { + idempotent_async( + "nested_lists/nested_lists.parquet", + |temp_path| async move { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "values", + DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + false, + ), + ])); + + let file = std::fs::File::create(&temp_path)?; + let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?; + let mut rng = StdRng::seed_from_u64(42); + + for batch_start in (0..ROW_COUNT).step_by(BATCH_SIZE) { + let batch_len = BATCH_SIZE.min(ROW_COUNT - batch_start); + + let ids = Int64Array::from_iter_values( + (batch_start as i64)..((batch_start + batch_len) as i64), + ); + + let mut list_builder = ListBuilder::new(Int64Builder::new()); + for _ in 0..batch_len { + let list_len = rng.random_range(1..=MAX_LIST_LEN); + for _ in 0..list_len { + list_builder.values().append_value(rng.random::()); + } + list_builder.append(true); + } + let values = list_builder.finish(); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(values)])?; + writer.write(&batch)?; + } + + writer.close()?; + Ok(()) + }, + ) + .await +} + +/// Get the path to the nested lists vortex file, converting from parquet if needed. +pub async fn nested_lists_vortex() -> Result { + let parquet_path = nested_lists_parquet().await?; + write_parquet_as_vortex( + parquet_path, + "nested_lists/nested_lists.vortex", + CompactionStrategy::Default, + ) + .await +} + +/// Get the path to the nested lists compact vortex file, converting from parquet if needed. +pub async fn nested_lists_vortex_compact() -> Result { + let parquet_path = nested_lists_parquet().await?; + write_parquet_as_vortex( + parquet_path, + "nested_lists/nested_lists-compact.vortex", + CompactionStrategy::Compact, + ) + .await +} diff --git a/vortex-bench/src/datasets/nested_structs.rs b/vortex-bench/src/datasets/nested_structs.rs new file mode 100644 index 00000000000..c049cd945dd --- /dev/null +++ b/vortex-bench/src/datasets/nested_structs.rs @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Result; +use arrow_array::Float64Array; +use arrow_array::Int64Array; +use arrow_array::RecordBatch; +use arrow_array::StructArray; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Fields; +use arrow_schema::Schema; +use parquet::arrow::ArrowWriter; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; + +use crate::CompactionStrategy; +use crate::conversions::write_parquet_as_vortex; +use crate::idempotent_async; + +/// Number of rows in the nested structs dataset. +pub const ROW_COUNT: usize = 1_000_000; + +/// Batch size for data generation. +const BATCH_SIZE: usize = 100_000; + +/// Generate a synthetic nested structs parquet file. +/// +/// Schema: +/// ```text +/// id: Int64 +/// metadata: Struct { +/// a: Int64, +/// b: Float64, +/// inner: Struct { +/// x: Float64, +/// y: Float64, +/// z: Float64, +/// } +/// } +/// ``` +pub async fn nested_structs_parquet() -> Result { + idempotent_async( + "nested_structs/nested_structs.parquet", + |temp_path| async move { + let inner_fields = Fields::from(vec![ + Field::new("x", DataType::Float64, false), + Field::new("y", DataType::Float64, false), + Field::new("z", DataType::Float64, false), + ]); + let outer_fields = Fields::from(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Float64, false), + Field::new("inner", DataType::Struct(inner_fields.clone()), false), + ]); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("metadata", DataType::Struct(outer_fields.clone()), false), + ])); + + let file = std::fs::File::create(&temp_path)?; + let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?; + let mut rng = StdRng::seed_from_u64(42); + + for batch_start in (0..ROW_COUNT).step_by(BATCH_SIZE) { + let batch_len = BATCH_SIZE.min(ROW_COUNT - batch_start); + + let ids = Int64Array::from_iter_values( + (batch_start as i64)..((batch_start + batch_len) as i64), + ); + + let inner_x = + Float64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let inner_y = + Float64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let inner_z = + Float64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let inner = StructArray::try_new( + inner_fields.clone(), + vec![Arc::new(inner_x), Arc::new(inner_y), Arc::new(inner_z)], + None, + )?; + + let outer_a = + Int64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let outer_b = + Float64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let outer = StructArray::try_new( + outer_fields.clone(), + vec![Arc::new(outer_a), Arc::new(outer_b), Arc::new(inner)], + None, + )?; + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(outer)])?; + writer.write(&batch)?; + } + + writer.close()?; + Ok(()) + }, + ) + .await +} + +/// Get the path to the nested structs vortex file, converting from parquet if needed. +pub async fn nested_structs_vortex() -> Result { + let parquet_path = nested_structs_parquet().await?; + write_parquet_as_vortex( + parquet_path, + "nested_structs/nested_structs.vortex", + CompactionStrategy::Default, + ) + .await +} + +/// Get the path to the nested structs compact vortex file, converting from parquet if needed. +pub async fn nested_structs_vortex_compact() -> Result { + let parquet_path = nested_structs_parquet().await?; + write_parquet_as_vortex( + parquet_path, + "nested_structs/nested_structs-compact.vortex", + CompactionStrategy::Compact, + ) + .await +} diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index 1f251c2d8db..3060f212753 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -57,6 +57,15 @@ impl VortexRandomAccessor { } } + /// Create a new Vortex random accessor with a custom name and format. + pub fn with_name_and_format(path: PathBuf, name: impl Into, format: Format) -> Self { + Self { + path, + name: name.into(), + format, + } + } + /// Create a new Vortex random accessor for compact format. pub fn compact(path: PathBuf) -> Self { Self { @@ -182,14 +191,14 @@ pub async fn take_parquet(path: &Path, indices: Vec) -> anyhow::Result Date: Mon, 9 Feb 2026 18:38:59 +0000 Subject: [PATCH 2/6] feat[bench]: add more random access Signed-off-by: Joe Isaacs --- Cargo.lock | 1 + benchmarks-website-v2/server.js | 20 +- benchmarks-website-v2/src/config.js | 2 + benchmarks/lance-bench/src/random_access.rs | 62 ++-- benchmarks/random-access-bench/Cargo.toml | 1 + benchmarks/random-access-bench/src/main.rs | 330 +++++++++++++------ encodings/runend/benches/run_end_compress.rs | 18 +- vortex-bench/src/datasets/feature_vectors.rs | 23 +- vortex-bench/src/datasets/nested_lists.rs | 23 +- vortex-bench/src/datasets/nested_structs.rs | 23 +- vortex-bench/src/datasets/taxi_data.rs | 3 + vortex-bench/src/random_access/mod.rs | 44 ++- vortex-bench/src/random_access/take.rs | 212 +++++++----- 13 files changed, 493 insertions(+), 269 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7f68dd8611..496fd3303b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7920,6 +7920,7 @@ name = "random-access-bench" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "clap", "indicatif", "lance-bench", diff --git a/benchmarks-website-v2/server.js b/benchmarks-website-v2/server.js index 040e0aea483..65312910c8c 100644 --- a/benchmarks-website-v2/server.js +++ b/benchmarks-website-v2/server.js @@ -38,6 +38,7 @@ const RENAME = { 'duckdb:duckdb': 'duckdb:duckdb', 'duckdb:vortex-compact': 'duckdb:vortex-compact', 'vortex-tokio-local-disk': 'vortex-nvme', + 'vortex-compact-tokio-local-disk': 'vortex-compact-nvme', 'lance-tokio-local-disk': 'lance-nvme', 'parquet-tokio-local-disk': 'parquet-nvme', 'lance': 'lance', @@ -242,9 +243,22 @@ async function refresh() { } if (!groups[group]) continue; - const [query, series] = b.name.split('/'); - const seriesName = rename(series || 'default'); - const chartName = formatQuery(query); + // Random access names have the form: random-access/{dataset}/{pattern}/{format} + // Historical random access names: random-access/{format} + // Other benchmarks use: {query}/{series} + let seriesName, chartName; + const parts = b.name.split('/'); + if (group === 'Random Access' && parts.length === 4) { + chartName = `${parts[1]}/${parts[2]}`.toUpperCase().replace(/[_-]/g, ' '); + seriesName = rename(parts[3] || 'default'); + } else if (group === 'Random Access' && parts.length === 2) { + // Legacy format: random-access/{format} → chart "RANDOM ACCESS", series from format + chartName = 'RANDOM ACCESS'; + seriesName = rename(parts[1] || 'default'); + } else { + seriesName = rename(parts[1] || 'default'); + chartName = formatQuery(parts[0]); + } if (chartName.includes('PARQUET-UNC')) continue; // Skip throughput metrics (keep only time/size) diff --git a/benchmarks-website-v2/src/config.js b/benchmarks-website-v2/src/config.js index 5ba9a90a597..e7c4c560813 100644 --- a/benchmarks-website-v2/src/config.js +++ b/benchmarks-website-v2/src/config.js @@ -4,6 +4,7 @@ export const BENCHMARK_CONFIGS = [ name: 'Random Access', renamedDatasets: { 'vortex-tokio-local-disk': 'vortex-nvme', + 'vortex-compact-tokio-local-disk': 'vortex-compact-nvme', 'lance-tokio-local-disk': 'lance-nvme', 'parquet-tokio-local-disk': 'parquet-nvme', }, @@ -323,6 +324,7 @@ export const ENGINE_LABELS = { // Series color map export const SERIES_COLOR_MAP = { 'vortex-nvme': '#19a508', + 'vortex-compact-nvme': '#15850a', 'parquet-nvme': '#ef7f1d', 'lance-nvme': '#3B82F6', 'datafusion:arrow': '#7a27b1', diff --git a/benchmarks/lance-bench/src/random_access.rs b/benchmarks/lance-bench/src/random_access.rs index d3ce1d9206d..dd0c6ba0349 100644 --- a/benchmarks/lance-bench/src/random_access.rs +++ b/benchmarks/lance-bench/src/random_access.rs @@ -12,12 +12,13 @@ use lance::dataset::WriteParams; use lance_encoding::version::LanceFileVersion; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use vortex_bench::Format; -use vortex_bench::datasets::feature_vectors::feature_vectors_parquet; -use vortex_bench::datasets::nested_lists::nested_lists_parquet; -use vortex_bench::datasets::nested_structs::nested_structs_parquet; -use vortex_bench::datasets::taxi_data::taxi_data_parquet; +use vortex_bench::datasets::feature_vectors; +use vortex_bench::datasets::nested_lists; +use vortex_bench::datasets::nested_structs; +use vortex_bench::datasets::taxi_data; use vortex_bench::idempotent_async; use vortex_bench::random_access::RandomAccessor; +use vortex_bench::random_access::data_path; /// Convert a parquet file to lance format. /// @@ -44,28 +45,44 @@ async fn parquet_to_lance_file(parquet_path: PathBuf, lance_path: &str) -> anyho } pub async fn taxi_data_lance() -> anyhow::Result { - let parquet_path = taxi_data_parquet().await?; - parquet_to_lance_file(parquet_path, "taxi/taxi.lance").await + let parquet_path = taxi_data::taxi_data_parquet().await?; + parquet_to_lance_file(parquet_path, &data_path(taxi_data::DATASET, Format::Lance)).await } pub async fn feature_vectors_lance() -> anyhow::Result { - let parquet_path = feature_vectors_parquet().await?; - parquet_to_lance_file(parquet_path, "feature_vectors/feature_vectors.lance").await + let parquet_path = feature_vectors::feature_vectors_parquet().await?; + parquet_to_lance_file( + parquet_path, + &data_path(feature_vectors::DATASET, Format::Lance), + ) + .await } pub async fn nested_lists_lance() -> anyhow::Result { - let parquet_path = nested_lists_parquet().await?; - parquet_to_lance_file(parquet_path, "nested_lists/nested_lists.lance").await + let parquet_path = nested_lists::nested_lists_parquet().await?; + parquet_to_lance_file( + parquet_path, + &data_path(nested_lists::DATASET, Format::Lance), + ) + .await } pub async fn nested_structs_lance() -> anyhow::Result { - let parquet_path = nested_structs_parquet().await?; - parquet_to_lance_file(parquet_path, "nested_structs/nested_structs.lance").await + let parquet_path = nested_structs::nested_structs_parquet().await?; + parquet_to_lance_file( + parquet_path, + &data_path(nested_structs::DATASET, Format::Lance), + ) + .await } +/// Random accessor for Lance format files. +/// +/// After `open()`, the dataset handle is stored and reused across `take()` calls. pub struct LanceRandomAccessor { path: PathBuf, name: String, + dataset: Option, } impl LanceRandomAccessor { @@ -73,6 +90,7 @@ impl LanceRandomAccessor { Self { path, name: "random-access/lance-tokio-local-disk".to_string(), + dataset: None, } } @@ -81,6 +99,7 @@ impl LanceRandomAccessor { Self { path, name: name.into(), + dataset: None, } } } @@ -95,19 +114,24 @@ impl RandomAccessor for LanceRandomAccessor { &self.name } - fn path(&self) -> &PathBuf { - &self.path - } - - async fn take(&self, indices: Vec) -> anyhow::Result { + async fn open(&mut self) -> anyhow::Result<()> { let dataset = Dataset::open( self.path .to_str() .ok_or_else(|| anyhow!("Invalid dataset path"))?, ) .await?; - let projection = ProjectionRequest::from_schema(dataset.schema().clone()); // All columns. - let result = dataset.take(indices.as_slice(), projection).await?; + self.dataset = Some(dataset); + Ok(()) + } + + async fn take(&self, indices: &[u64]) -> anyhow::Result { + let dataset = self + .dataset + .as_ref() + .ok_or_else(|| anyhow!("accessor not opened; call open() first"))?; + let projection = ProjectionRequest::from_schema(dataset.schema().clone()); + let result = dataset.take(indices, projection).await?; Ok(result.num_rows()) } } diff --git a/benchmarks/random-access-bench/Cargo.toml b/benchmarks/random-access-bench/Cargo.toml index 2b5fad7b1e3..b98c5f0a1a1 100644 --- a/benchmarks/random-access-bench/Cargo.toml +++ b/benchmarks/random-access-bench/Cargo.toml @@ -16,6 +16,7 @@ publish = false [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } indicatif = { workspace = true } lance-bench = { path = "../lance-bench", optional = true } diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 483e7c242c4..41d887ca9b6 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -1,10 +1,14 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::future::Future; use std::path::PathBuf; +use std::pin::Pin; use std::time::Duration; use std::time::Instant; +use anyhow::Result; +use async_trait::async_trait; use clap::Parser; use clap::ValueEnum; use indicatif::ProgressBar; @@ -25,43 +29,140 @@ use vortex_bench::display::DisplayFormat; use vortex_bench::display::print_measurements_json; use vortex_bench::display::render_table; use vortex_bench::measurements::TimingMeasurement; +use vortex_bench::random_access::BenchDataset; use vortex_bench::random_access::ParquetRandomAccessor; use vortex_bench::random_access::RandomAccessor; use vortex_bench::random_access::VortexRandomAccessor; use vortex_bench::setup_logging_and_tracing; use vortex_bench::utils::constants::STORAGE_NVME; -/// Which synthetic dataset to benchmark. -#[derive(ValueEnum, Clone, Copy, Debug)] -enum DatasetArg { - #[clap(name = "taxi")] - Taxi, - #[clap(name = "feature-vectors")] - FeatureVectors, - #[clap(name = "nested-lists")] - NestedLists, - #[clap(name = "nested-structs")] - NestedStructs, +// --------------------------------------------------------------------------- +// Dataset implementations +// --------------------------------------------------------------------------- + +/// Short format label used in benchmark measurement names. +fn format_label(format: Format) -> &'static str { + match format { + Format::OnDiskVortex => "vortex", + Format::VortexCompact => "vortex-compact", + Format::Parquet => "parquet", + Format::Lance => "lance", + other => unimplemented!("Random access bench not implemented for {other}"), + } } -impl DatasetArg { - fn name(&self) -> &'static str { - match self { - DatasetArg::Taxi => "taxi", - DatasetArg::FeatureVectors => "feature-vectors", - DatasetArg::NestedLists => "nested-lists", - DatasetArg::NestedStructs => "nested-structs", +/// Create a random accessor from a file path, dataset name, and format. +/// +/// This eliminates the repeated match-on-format boilerplate in each dataset. +fn create_accessor(path: PathBuf, dataset: &str, format: Format) -> Box { + let name = format!( + "random-access/{}/{}-tokio-local-disk", + dataset, + format_label(format) + ); + match format { + Format::OnDiskVortex | Format::VortexCompact => Box::new( + VortexRandomAccessor::with_name_and_format(path, name, format), + ), + Format::Parquet => Box::new(ParquetRandomAccessor::with_name(path, name)), + #[cfg(feature = "lance")] + Format::Lance => { + use lance_bench::random_access::LanceRandomAccessor; + Box::new(LanceRandomAccessor::with_name(path, name)) } + other => unimplemented!("Random access bench not implemented for {other}"), + } +} + +/// A function returning a boxed future that resolves to a file path. +type PathFn = fn() -> Pin> + Send>>; + +/// Paths for a specific dataset, keyed by format. +struct DatasetPaths { + name: &'static str, + row_count: u64, + parquet: PathFn, + vortex: PathFn, + vortex_compact: PathFn, + #[cfg(feature = "lance")] + lance: PathFn, +} + +#[async_trait] +impl BenchDataset for DatasetPaths { + fn name(&self) -> &str { + self.name } fn row_count(&self) -> u64 { - match self { - DatasetArg::Taxi => 3_339_715, - _ => 1_000_000, - } + self.row_count + } + + async fn create(&self, format: Format) -> Result> { + let path = match format { + Format::OnDiskVortex => (self.vortex)().await?, + Format::VortexCompact => (self.vortex_compact)().await?, + Format::Parquet => (self.parquet)().await?, + #[cfg(feature = "lance")] + Format::Lance => (self.lance)().await?, + other => unimplemented!("Random access bench not implemented for {other}"), + }; + Ok(create_accessor(path, self.name, format)) } } +fn taxi_dataset() -> DatasetPaths { + DatasetPaths { + name: "taxi", + row_count: 3_339_715, + parquet: || Box::pin(taxi_data_parquet()), + vortex: || Box::pin(taxi_data_vortex()), + vortex_compact: || Box::pin(taxi_data_vortex_compact()), + #[cfg(feature = "lance")] + lance: || Box::pin(lance_bench::random_access::taxi_data_lance()), + } +} + +fn feature_vectors_dataset() -> DatasetPaths { + DatasetPaths { + name: "feature-vectors", + row_count: 1_000_000, + parquet: || Box::pin(feature_vectors_parquet()), + vortex: || Box::pin(feature_vectors_vortex()), + vortex_compact: || Box::pin(feature_vectors_vortex_compact()), + #[cfg(feature = "lance")] + lance: || Box::pin(lance_bench::random_access::feature_vectors_lance()), + } +} + +fn nested_lists_dataset() -> DatasetPaths { + DatasetPaths { + name: "nested-lists", + row_count: 1_000_000, + parquet: || Box::pin(nested_lists_parquet()), + vortex: || Box::pin(nested_lists_vortex()), + vortex_compact: || Box::pin(nested_lists_vortex_compact()), + #[cfg(feature = "lance")] + lance: || Box::pin(lance_bench::random_access::nested_lists_lance()), + } +} + +fn nested_structs_dataset() -> DatasetPaths { + DatasetPaths { + name: "nested-structs", + row_count: 1_000_000, + parquet: || Box::pin(nested_structs_parquet()), + vortex: || Box::pin(nested_structs_vortex()), + vortex_compact: || Box::pin(nested_structs_vortex_compact()), + #[cfg(feature = "lance")] + lance: || Box::pin(lance_bench::random_access::nested_structs_lance()), + } +} + +// --------------------------------------------------------------------------- +// Access patterns +// --------------------------------------------------------------------------- + /// Access pattern for random access benchmarks. #[derive(Clone, Copy, Debug)] enum AccessPattern { @@ -94,7 +195,7 @@ const CLUSTER_SIZE: usize = 20; const POISSON_EXPECTED_COUNT: usize = 100; /// Generate indices for the given dataset and access pattern. -fn generate_indices(dataset: DatasetArg, pattern: AccessPattern) -> Vec { +fn generate_indices(dataset: &dyn BenchDataset, pattern: AccessPattern) -> Vec { let row_count = dataset.row_count(); let mut rng = StdRng::seed_from_u64(42); @@ -135,6 +236,34 @@ fn generate_indices(dataset: DatasetArg, pattern: AccessPattern) -> Vec { } } +// --------------------------------------------------------------------------- +// CLI +// --------------------------------------------------------------------------- + +/// Which synthetic dataset to benchmark. +#[derive(ValueEnum, Clone, Copy, Debug)] +enum DatasetArg { + #[clap(name = "taxi")] + Taxi, + #[clap(name = "feature-vectors")] + FeatureVectors, + #[clap(name = "nested-lists")] + NestedLists, + #[clap(name = "nested-structs")] + NestedStructs, +} + +impl DatasetArg { + fn into_dataset(self) -> DatasetPaths { + match self { + DatasetArg::Taxi => taxi_dataset(), + DatasetArg::FeatureVectors => feature_vectors_dataset(), + DatasetArg::NestedLists => nested_lists_dataset(), + DatasetArg::NestedStructs => nested_structs_dataset(), + } + } +} + #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { @@ -162,13 +291,15 @@ struct Args { } #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<()> { let args = Args::parse(); setup_logging_and_tracing(args.verbose, args.tracing)?; + let dataset = args.dataset.into_dataset(); + run_random_access( - args.dataset, + &dataset, args.formats, args.time_limit, args.display_format, @@ -177,94 +308,27 @@ async fn main() -> anyhow::Result<()> { .await } -/// Create a random accessor for the given format and dataset. -async fn get_accessor( - dataset: DatasetArg, - format: Format, - pattern: AccessPattern, -) -> anyhow::Result> { - let ds_name = dataset.name(); - let pat_name = pattern.name(); +// --------------------------------------------------------------------------- +// Benchmark core +// --------------------------------------------------------------------------- - match format { - Format::OnDiskVortex => { - let path = match dataset { - DatasetArg::Taxi => taxi_data_vortex().await?, - DatasetArg::FeatureVectors => feature_vectors_vortex().await?, - DatasetArg::NestedLists => nested_lists_vortex().await?, - DatasetArg::NestedStructs => nested_structs_vortex().await?, - }; - Ok(Box::new(VortexRandomAccessor::with_name_and_format( - path, - format!("random-access/{ds_name}/{pat_name}/vortex-tokio-local-disk"), - Format::OnDiskVortex, - ))) - } - Format::VortexCompact => { - let path = match dataset { - DatasetArg::Taxi => taxi_data_vortex_compact().await?, - DatasetArg::FeatureVectors => feature_vectors_vortex_compact().await?, - DatasetArg::NestedLists => nested_lists_vortex_compact().await?, - DatasetArg::NestedStructs => nested_structs_vortex_compact().await?, - }; - Ok(Box::new(VortexRandomAccessor::with_name_and_format( - path, - format!("random-access/{ds_name}/{pat_name}/vortex-compact-tokio-local-disk"), - Format::VortexCompact, - ))) - } - Format::Parquet => { - let path = match dataset { - DatasetArg::Taxi => taxi_data_parquet().await?, - DatasetArg::FeatureVectors => feature_vectors_parquet().await?, - DatasetArg::NestedLists => nested_lists_parquet().await?, - DatasetArg::NestedStructs => nested_structs_parquet().await?, - }; - Ok(Box::new(ParquetRandomAccessor::with_name( - path, - format!("random-access/{ds_name}/{pat_name}/parquet-tokio-local-disk"), - ))) - } - #[cfg(feature = "lance")] - Format::Lance => { - use lance_bench::random_access::LanceRandomAccessor; - - let path = match dataset { - DatasetArg::Taxi => lance_bench::random_access::taxi_data_lance().await?, - DatasetArg::FeatureVectors => { - lance_bench::random_access::feature_vectors_lance().await? - } - DatasetArg::NestedLists => lance_bench::random_access::nested_lists_lance().await?, - DatasetArg::NestedStructs => { - lance_bench::random_access::nested_structs_lance().await? - } - }; - Ok(Box::new(LanceRandomAccessor::with_name( - path, - format!("random-access/{ds_name}/{pat_name}/lance-tokio-local-disk"), - ))) - } - _ => unimplemented!("Random access bench not implemented for {format}"), - } -} - -/// Run a random access benchmark for the given accessor. +/// Run a random access benchmark for the given accessor (already opened). /// /// Runs the take operation repeatedly until the time limit is reached, /// collecting timing for each run. async fn benchmark_random_access( accessor: &dyn RandomAccessor, + measurement_name: &str, indices: &[u64], time_limit_secs: u64, storage: &str, -) -> anyhow::Result { +) -> Result { let time_limit = Duration::from_secs(time_limit_secs); let overall_start = Instant::now(); let mut runs = Vec::new(); // Run at least once, then continue until time limit loop { - let indices = indices.to_vec(); let start = Instant::now(); let _row_count = accessor.take(indices).await?; runs.push(start.elapsed()); @@ -275,13 +339,31 @@ async fn benchmark_random_access( } Ok(TimingMeasurement { - name: accessor.name().to_string(), + name: measurement_name.to_string(), storage: storage.to_string(), target: Target::new(format_to_engine(accessor.format()), accessor.format()), runs, }) } +/// Build a measurement name for a benchmark run. +/// +/// For taxi (legacy), the name is `random-access/{format}-tokio-local-disk` to preserve +/// historical continuity with existing benchmark data. +/// For other datasets, includes dataset and pattern: +/// `random-access/{dataset}/{pattern}/{format}-tokio-local-disk`. +fn measurement_name(dataset: &str, pattern: Option, format: Format) -> String { + match pattern { + Some(p) => format!( + "random-access/{}/{}/{}-tokio-local-disk", + dataset, + p.name(), + format_label(format) + ), + None => format!("random-access/{}-tokio-local-disk", format_label(format)), + } +} + /// Map format to the appropriate engine for random access benchmarks. fn format_to_engine(format: Format) -> Engine { match format { @@ -296,31 +378,63 @@ fn format_to_engine(format: Format) -> Engine { /// The benchmark ID used for output path. const BENCHMARK_ID: &str = "random-access"; +/// Fixed indices used by the original taxi benchmark (preserved for historical continuity). +const FIXED_TAXI_INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000]; + async fn run_random_access( - dataset: DatasetArg, + dataset: &dyn BenchDataset, formats: Vec, time_limit: u64, display_format: DisplayFormat, output_path: Option, -) -> anyhow::Result<()> { - // Total work: formats x patterns - let total_steps = formats.len() * ACCESS_PATTERNS.len(); +) -> Result<()> { + let is_legacy = dataset.name() == "taxi"; + + // Legacy datasets get an extra run per format (fixed indices, old-style name). + let legacy_extra = if is_legacy { formats.len() } else { 0 }; + let total_steps = formats.len() * ACCESS_PATTERNS.len() + legacy_extra; let progress = ProgressBar::new(total_steps as u64); let mut targets = Vec::new(); let mut measurements = Vec::new(); for format in &formats { + if is_legacy { + // Taxi: also emit the old fixed-index benchmark for historical continuity. + let mut accessor = dataset.create(*format).await?; + accessor.open().await?; + let name = measurement_name(dataset.name(), None, *format); + let measurement = benchmark_random_access( + accessor.as_ref(), + &name, + &FIXED_TAXI_INDICES, + time_limit, + STORAGE_NVME, + ) + .await?; + + targets.push(measurement.target); + measurements.push(measurement); + progress.inc(1); + } + + // All datasets: run each access pattern with 4-part names. for pattern in &ACCESS_PATTERNS { - let accessor = get_accessor(dataset, *format, *pattern).await?; + let mut accessor = dataset.create(*format).await?; + accessor.open().await?; let indices = generate_indices(dataset, *pattern); - let measurement = - benchmark_random_access(accessor.as_ref(), &indices, time_limit, STORAGE_NVME) - .await?; + let name = measurement_name(dataset.name(), Some(*pattern), *format); + let measurement = benchmark_random_access( + accessor.as_ref(), + &name, + &indices, + time_limit, + STORAGE_NVME, + ) + .await?; targets.push(measurement.target); measurements.push(measurement); - progress.inc(1); } } diff --git a/encodings/runend/benches/run_end_compress.rs b/encodings/runend/benches/run_end_compress.rs index d8fc9aedeb5..41a0df35430 100644 --- a/encodings/runend/benches/run_end_compress.rs +++ b/encodings/runend/benches/run_end_compress.rs @@ -7,6 +7,8 @@ use divan::Bencher; use itertools::repeat_n; use vortex_array::Array; use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::compute::take; use vortex_array::compute::warm_up_vtables; @@ -96,6 +98,18 @@ fn take_indices(bencher: Bencher, (length, run_step): (usize, usize)) { .to_array(); bencher - .with_inputs(|| (&source_array, &runend_array)) - .bench_refs(|(array, indices)| array.as_ref().take(indices.to_array()).unwrap()); + .with_inputs(|| { + ( + &source_array, + &runend_array, + &mut LEGACY_SESSION.create_execution_ctx(), + ) + }) + .bench_refs(|(array, indices, ctx)| { + array + .as_ref() + .take(indices.to_array()) + .unwrap() + .execute::(ctx) + }); } diff --git a/vortex-bench/src/datasets/feature_vectors.rs b/vortex-bench/src/datasets/feature_vectors.rs index b260260f975..3b5e23f54aa 100644 --- a/vortex-bench/src/datasets/feature_vectors.rs +++ b/vortex-bench/src/datasets/feature_vectors.rs @@ -18,8 +18,13 @@ use rand::SeedableRng; use rand::rngs::StdRng; use crate::CompactionStrategy; +use crate::Format; use crate::conversions::write_parquet_as_vortex; use crate::idempotent_async; +use crate::random_access::data_path; + +/// Dataset identifier used for data path generation. +pub const DATASET: &str = "feature_vectors"; /// Number of rows in the feature vectors dataset. pub const ROW_COUNT: usize = 1_000_000; @@ -36,7 +41,7 @@ const BATCH_SIZE: usize = 100_000; /// This simulates a table of embedding vectors, common in ML workloads. pub async fn feature_vectors_parquet() -> Result { idempotent_async( - "feature_vectors/feature_vectors.parquet", + data_path(DATASET, Format::Parquet), |temp_path| async move { let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int64, false), @@ -85,21 +90,13 @@ pub async fn feature_vectors_parquet() -> Result { /// Get the path to the feature vectors vortex file, converting from parquet if needed. pub async fn feature_vectors_vortex() -> Result { let parquet_path = feature_vectors_parquet().await?; - write_parquet_as_vortex( - parquet_path, - "feature_vectors/feature_vectors.vortex", - CompactionStrategy::Default, - ) - .await + let path = data_path(DATASET, Format::OnDiskVortex); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Default).await } /// Get the path to the feature vectors compact vortex file, converting from parquet if needed. pub async fn feature_vectors_vortex_compact() -> Result { let parquet_path = feature_vectors_parquet().await?; - write_parquet_as_vortex( - parquet_path, - "feature_vectors/feature_vectors-compact.vortex", - CompactionStrategy::Compact, - ) - .await + let path = data_path(DATASET, Format::VortexCompact); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Compact).await } diff --git a/vortex-bench/src/datasets/nested_lists.rs b/vortex-bench/src/datasets/nested_lists.rs index 9306e04077e..e705886acec 100644 --- a/vortex-bench/src/datasets/nested_lists.rs +++ b/vortex-bench/src/datasets/nested_lists.rs @@ -18,8 +18,13 @@ use rand::SeedableRng; use rand::rngs::StdRng; use crate::CompactionStrategy; +use crate::Format; use crate::conversions::write_parquet_as_vortex; use crate::idempotent_async; +use crate::random_access::data_path; + +/// Dataset identifier used for data path generation. +pub const DATASET: &str = "nested_lists"; /// Number of rows in the nested lists dataset. pub const ROW_COUNT: usize = 1_000_000; @@ -36,7 +41,7 @@ const BATCH_SIZE: usize = 100_000; /// Each row contains a variable-length list of 1 to 20 random integers. pub async fn nested_lists_parquet() -> Result { idempotent_async( - "nested_lists/nested_lists.parquet", + data_path(DATASET, Format::Parquet), |temp_path| async move { let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int64, false), @@ -83,21 +88,13 @@ pub async fn nested_lists_parquet() -> Result { /// Get the path to the nested lists vortex file, converting from parquet if needed. pub async fn nested_lists_vortex() -> Result { let parquet_path = nested_lists_parquet().await?; - write_parquet_as_vortex( - parquet_path, - "nested_lists/nested_lists.vortex", - CompactionStrategy::Default, - ) - .await + let path = data_path(DATASET, Format::OnDiskVortex); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Default).await } /// Get the path to the nested lists compact vortex file, converting from parquet if needed. pub async fn nested_lists_vortex_compact() -> Result { let parquet_path = nested_lists_parquet().await?; - write_parquet_as_vortex( - parquet_path, - "nested_lists/nested_lists-compact.vortex", - CompactionStrategy::Compact, - ) - .await + let path = data_path(DATASET, Format::VortexCompact); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Compact).await } diff --git a/vortex-bench/src/datasets/nested_structs.rs b/vortex-bench/src/datasets/nested_structs.rs index c049cd945dd..74d7fd3c687 100644 --- a/vortex-bench/src/datasets/nested_structs.rs +++ b/vortex-bench/src/datasets/nested_structs.rs @@ -19,8 +19,13 @@ use rand::SeedableRng; use rand::rngs::StdRng; use crate::CompactionStrategy; +use crate::Format; use crate::conversions::write_parquet_as_vortex; use crate::idempotent_async; +use crate::random_access::data_path; + +/// Dataset identifier used for data path generation. +pub const DATASET: &str = "nested_structs"; /// Number of rows in the nested structs dataset. pub const ROW_COUNT: usize = 1_000_000; @@ -45,7 +50,7 @@ const BATCH_SIZE: usize = 100_000; /// ``` pub async fn nested_structs_parquet() -> Result { idempotent_async( - "nested_structs/nested_structs.parquet", + data_path(DATASET, Format::Parquet), |temp_path| async move { let inner_fields = Fields::from(vec![ Field::new("x", DataType::Float64, false), @@ -110,21 +115,13 @@ pub async fn nested_structs_parquet() -> Result { /// Get the path to the nested structs vortex file, converting from parquet if needed. pub async fn nested_structs_vortex() -> Result { let parquet_path = nested_structs_parquet().await?; - write_parquet_as_vortex( - parquet_path, - "nested_structs/nested_structs.vortex", - CompactionStrategy::Default, - ) - .await + let path = data_path(DATASET, Format::OnDiskVortex); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Default).await } /// Get the path to the nested structs compact vortex file, converting from parquet if needed. pub async fn nested_structs_vortex_compact() -> Result { let parquet_path = nested_structs_parquet().await?; - write_parquet_as_vortex( - parquet_path, - "nested_structs/nested_structs-compact.vortex", - CompactionStrategy::Compact, - ) - .await + let path = data_path(DATASET, Format::VortexCompact); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Compact).await } diff --git a/vortex-bench/src/datasets/taxi_data.rs b/vortex-bench/src/datasets/taxi_data.rs index 9367d9746aa..17396ebb497 100644 --- a/vortex-bench/src/datasets/taxi_data.rs +++ b/vortex-bench/src/datasets/taxi_data.rs @@ -20,6 +20,9 @@ use crate::datasets::Dataset; use crate::datasets::data_downloads::download_data; use crate::idempotent_async; +/// Dataset identifier used for data path generation. +pub const DATASET: &str = "taxi"; + pub struct TaxiData; #[async_trait] diff --git a/vortex-bench/src/random_access/mod.rs b/vortex-bench/src/random_access/mod.rs index 2193b76e673..5e42c02bc7c 100644 --- a/vortex-bench/src/random_access/mod.rs +++ b/vortex-bench/src/random_access/mod.rs @@ -1,8 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::path::PathBuf; - use anyhow::Result; use async_trait::async_trait; @@ -14,21 +12,49 @@ pub mod take; pub use take::ParquetRandomAccessor; pub use take::VortexRandomAccessor; +/// Generate the data path for a random-access benchmark dataset file. +/// +/// Returns a path like `random_access/{dataset}/{dataset}.{ext}` +/// (or `{dataset}-compact.{ext}` for [`Format::VortexCompact`]). +pub fn data_path(dataset: &str, format: Format) -> String { + let ext = format.ext(); + match format { + Format::VortexCompact => format!("random_access/{dataset}/{dataset}-compact.{ext}"), + _ => format!("random_access/{dataset}/{dataset}.{ext}"), + } +} + +/// Trait for a benchmark dataset that knows how to write files and create accessors. +#[async_trait] +pub trait BenchDataset: Send + Sync { + /// A descriptive name for this dataset (used in benchmark output and CLI). + fn name(&self) -> &str; + + /// The total number of rows in this dataset. + fn row_count(&self) -> u64; + + /// Create a format-specific random accessor for this dataset. + /// + /// This prepares the data file (writing it if necessary) and returns an + /// accessor that can be opened and used for random access benchmarks. + async fn create(&self, format: Format) -> Result>; +} + /// Trait for format-specific random access (take) operations. /// /// Implementations handle reading specific rows by index from a data source. -/// Each implementation wraps a prepared file path and knows how to read from it. +/// The lifecycle is: construct -> `open()` (parse metadata) -> `take()` (I/O). #[async_trait] pub trait RandomAccessor: Send + Sync { - /// The format this accessor handles. - fn format(&self) -> Format; - /// A descriptive name for this accessor (used in benchmark output). fn name(&self) -> &str; - /// The file path this accessor reads from. - fn path(&self) -> &PathBuf; + /// The format this accessor handles. + fn format(&self) -> Format; + + /// Open the file and parse metadata. This is not timed in benchmarks. + async fn open(&mut self) -> Result<()>; /// Take rows at the given indices, returning the number of rows read. - async fn take(&self, indices: Vec) -> Result; + async fn take(&self, indices: &[u64]) -> Result; } diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index 3060f212753..5d0f1397120 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -1,12 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::iter; -use std::path::Path; use std::path::PathBuf; +use anyhow::anyhow; use arrow_array::PrimitiveArray; -use arrow_array::RecordBatch; use arrow_array::types::Int64Type; use arrow_select::concat::concat_batches; use arrow_select::take::take_record_batch; @@ -18,13 +16,13 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::file::metadata::RowGroupMetaData; use stream::StreamExt; use vortex::array::Array; -use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::IntoArray; use vortex::array::VortexSessionExecute; use vortex::array::stream::ArrayStreamExt; use vortex::buffer::Buffer; use vortex::file::OpenOptionsSessionExt; +use vortex::file::VortexFile; use vortex::utils::aliases::hash_map::HashMap; use crate::Format; @@ -32,10 +30,13 @@ use crate::SESSION; use crate::random_access::RandomAccessor; /// Random accessor for Vortex format files. +/// +/// After `open()`, the file handle is stored and reused across `take()` calls. pub struct VortexRandomAccessor { path: PathBuf, name: String, format: Format, + file: Option, } impl VortexRandomAccessor { @@ -45,6 +46,7 @@ impl VortexRandomAccessor { path, name: "random-access/vortex-tokio-local-disk".to_string(), format: Format::OnDiskVortex, + file: None, } } @@ -54,6 +56,7 @@ impl VortexRandomAccessor { path, name: name.into(), format: Format::OnDiskVortex, + file: None, } } @@ -63,6 +66,7 @@ impl VortexRandomAccessor { path, name: name.into(), format, + file: None, } } @@ -72,6 +76,7 @@ impl VortexRandomAccessor { path, name: "random-access/vortex-compact-tokio-local-disk".to_string(), format: Format::VortexCompact, + file: None, } } } @@ -86,20 +91,49 @@ impl RandomAccessor for VortexRandomAccessor { &self.name } - fn path(&self) -> &PathBuf { - &self.path + async fn open(&mut self) -> anyhow::Result<()> { + let file = SESSION.open_options().open_path(&self.path).await?; + self.file = Some(file); + Ok(()) } - async fn take(&self, indices: Vec) -> anyhow::Result { - let result = take_vortex(&self.path, indices.into()).await?; - Ok(result.len()) + async fn take(&self, indices: &[u64]) -> anyhow::Result { + let file = self + .file + .as_ref() + .ok_or_else(|| anyhow!("accessor not opened; call open() first"))?; + + let indices_buf: Buffer = Buffer::from(indices.to_vec()); + let array = file + .scan()? + .with_row_indices(indices_buf) + .into_array_stream()? + .read_all() + .await?; + + // We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es. + let mut ctx = SESSION.create_execution_ctx(); + let canonical = array.execute::(&mut ctx)?.into_array(); + Ok(canonical.len()) } } +/// Pre-computed Parquet metadata stored after `open()`. +struct ParquetMetadata { + /// Cumulative row offsets per row group (length = num_row_groups + 1). + row_group_offsets: Vec, + /// Path to the Parquet file (for re-opening on each take). + path: PathBuf, +} + /// Random accessor for Parquet format files. +/// +/// After `open()`, the file metadata and row group offsets are stored and +/// reused to map indices to row groups in each `take()` call. pub struct ParquetRandomAccessor { path: PathBuf, name: String, + metadata: Option, } impl ParquetRandomAccessor { @@ -108,6 +142,7 @@ impl ParquetRandomAccessor { Self { path, name: "random-access/parquet-tokio-local-disk".to_string(), + metadata: None, } } @@ -116,6 +151,7 @@ impl ParquetRandomAccessor { Self { path, name: name.into(), + metadata: None, } } } @@ -130,90 +166,88 @@ impl RandomAccessor for ParquetRandomAccessor { &self.name } - fn path(&self) -> &PathBuf { - &self.path - } + async fn open(&mut self) -> anyhow::Result<()> { + let file = tokio::fs::File::open(&self.path).await?; + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + file, + ArrowReaderOptions::new().with_page_index(true), + ) + .await?; - async fn take(&self, indices: Vec) -> anyhow::Result { - let result = take_parquet(&self.path, indices).await?; - Ok(result.num_rows()) + let row_group_offsets = std::iter::once(0) + .chain( + builder + .metadata() + .row_groups() + .iter() + .map(RowGroupMetaData::num_rows), + ) + .scan(0i64, |acc, x| { + *acc += x; + Some(*acc) + }) + .collect::>(); + + self.metadata = Some(ParquetMetadata { + row_group_offsets, + path: self.path.clone(), + }); + + Ok(()) } -} -async fn take_vortex(reader: impl AsRef, indices: Buffer) -> anyhow::Result { - let array = SESSION - .open_options() - .open_path(reader.as_ref()) - .await? - .scan()? - .with_row_indices(indices) - .into_array_stream()? - .read_all() - .await?; - - // We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es. - let mut ctx = SESSION.create_execution_ctx(); - // TODO(joe): should we go to a vector. - Ok(array.execute::(&mut ctx)?.into_array()) -} + async fn take(&self, indices: &[u64]) -> anyhow::Result { + let meta = self + .metadata + .as_ref() + .ok_or_else(|| anyhow!("accessor not opened; call open() first"))?; + + // Map indices to row groups. + let mut row_groups = HashMap::new(); + for &idx in indices { + let row_group_idx = meta + .row_group_offsets + .binary_search(&(idx as i64)) + .unwrap_or_else(|e| e - 1); + row_groups + .entry(row_group_idx) + .or_insert_with(Vec::new) + .push((idx as i64) - meta.row_group_offsets[row_group_idx]); + } -pub async fn take_parquet(path: &Path, indices: Vec) -> anyhow::Result { - let file = tokio::fs::File::open(path).await?; - - let builder = ParquetRecordBatchStreamBuilder::new_with_options( - file, - ArrowReaderOptions::new().with_page_index(true), - ) - .await?; - - // We figure out which row groups we need to read and a selection filter for each of them. - let mut row_groups = HashMap::new(); - let row_group_offsets = iter::once(0) - .chain( - builder - .metadata() - .row_groups() - .iter() - .map(RowGroupMetaData::num_rows), + let sorted_row_group_keys = row_groups.keys().copied().sorted().collect_vec(); + let row_group_indices = sorted_row_group_keys + .iter() + .map(|i| row_groups[i].clone()) + .collect_vec(); + + // Re-open the file for reading (Parquet builder consumes the file handle). + let file = tokio::fs::File::open(&meta.path).await?; + let builder = ParquetRecordBatchStreamBuilder::new_with_options( + file, + ArrowReaderOptions::new().with_page_index(true), ) - .scan(0i64, |acc, x| { - *acc += x; - Some(*acc) - }) - .collect::>(); - - for idx in indices { - let row_group_idx = row_group_offsets - .binary_search(&(idx as i64)) - .unwrap_or_else(|e| e - 1); - row_groups - .entry(row_group_idx) - .or_insert_with(Vec::new) - .push((idx as i64) - row_group_offsets[row_group_idx]); + .await?; + + let reader = builder + .with_row_groups(sorted_row_group_keys) + // FIXME(ngates): our indices code assumes the batch size == the row group sizes + .with_batch_size(10_000_000) + .build()?; + + let schema = reader.schema().clone(); + + let batches = reader + .enumerate() + .map(|(idx, batch)| { + let batch = batch.unwrap(); + let indices = PrimitiveArray::::from(row_group_indices[idx].clone()); + take_record_batch(&batch, &indices).unwrap() + }) + .collect::>() + .await; + + let result = concat_batches(&schema, &batches)?; + Ok(result.num_rows()) } - let sorted_row_group_keys = row_groups.keys().copied().sorted().collect_vec(); - let row_group_indices = sorted_row_group_keys - .iter() - .map(|i| row_groups[i].clone()) - .collect_vec(); - - let reader = builder - .with_row_groups(sorted_row_group_keys) - // FIXME(ngates): our indices code assumes the batch size == the row group sizes - .with_batch_size(10_000_000) - .build()?; - - let schema = reader.schema().clone(); - - let batches = reader - .enumerate() - .map(|(idx, batch)| { - let batch = batch.unwrap(); - let indices = PrimitiveArray::::from(row_group_indices[idx].clone()); - take_record_batch(&batch, &indices).unwrap() - }) - .collect::>() - .await; - - Ok(concat_batches(&schema, &batches)?) } From 8a54b1b8504c492e58c525cfea89321fe75ef81d Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 10 Feb 2026 11:18:30 +0000 Subject: [PATCH 3/6] feat[bench]: add more random access Signed-off-by: Joe Isaacs --- benchmarks/random-access-bench/src/main.rs | 109 ++++++++++++--------- 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 41d887ca9b6..8762c571161 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -285,9 +285,14 @@ struct Args { display_format: DisplayFormat, #[arg(short)] output_path: Option, - /// Which dataset to benchmark random access on. - #[arg(long, value_enum, default_value_t = DatasetArg::Taxi)] - dataset: DatasetArg, + /// Which datasets to benchmark random access on. + #[arg( + long, + value_delimiter = ',', + value_enum, + default_values_t = vec![DatasetArg::Taxi, DatasetArg::FeatureVectors, DatasetArg::NestedLists, DatasetArg::NestedStructs] + )] + datasets: Vec, } #[tokio::main] @@ -296,10 +301,14 @@ async fn main() -> Result<()> { setup_logging_and_tracing(args.verbose, args.tracing)?; - let dataset = args.dataset.into_dataset(); + let datasets: Vec = args + .datasets + .into_iter() + .map(|d| d.into_dataset()) + .collect(); run_random_access( - &dataset, + &datasets, args.formats, args.time_limit, args.display_format, @@ -382,60 +391,64 @@ const BENCHMARK_ID: &str = "random-access"; const FIXED_TAXI_INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000]; async fn run_random_access( - dataset: &dyn BenchDataset, + datasets: &[DatasetPaths], formats: Vec, time_limit: u64, display_format: DisplayFormat, output_path: Option, ) -> Result<()> { - let is_legacy = dataset.name() == "taxi"; - - // Legacy datasets get an extra run per format (fixed indices, old-style name). - let legacy_extra = if is_legacy { formats.len() } else { 0 }; - let total_steps = formats.len() * ACCESS_PATTERNS.len() + legacy_extra; + let total_steps: usize = datasets + .iter() + .map(|d| { + let legacy_extra = if d.name() == "taxi" { formats.len() } else { 0 }; + formats.len() * ACCESS_PATTERNS.len() + legacy_extra + }) + .sum(); let progress = ProgressBar::new(total_steps as u64); let mut targets = Vec::new(); let mut measurements = Vec::new(); - for format in &formats { - if is_legacy { - // Taxi: also emit the old fixed-index benchmark for historical continuity. - let mut accessor = dataset.create(*format).await?; - accessor.open().await?; - let name = measurement_name(dataset.name(), None, *format); - let measurement = benchmark_random_access( - accessor.as_ref(), - &name, - &FIXED_TAXI_INDICES, - time_limit, - STORAGE_NVME, - ) - .await?; - - targets.push(measurement.target); - measurements.push(measurement); - progress.inc(1); - } + for dataset in datasets { + for format in &formats { + if dataset.name() == "taxi" { + // Taxi: also emit the old fixed-index benchmark for historical continuity. + let mut accessor = dataset.create(*format).await?; + accessor.open().await?; + let name = measurement_name(dataset.name(), None, *format); + let measurement = benchmark_random_access( + accessor.as_ref(), + &name, + &FIXED_TAXI_INDICES, + time_limit, + STORAGE_NVME, + ) + .await?; + + targets.push(measurement.target); + measurements.push(measurement); + progress.inc(1); + } - // All datasets: run each access pattern with 4-part names. - for pattern in &ACCESS_PATTERNS { - let mut accessor = dataset.create(*format).await?; - accessor.open().await?; - let indices = generate_indices(dataset, *pattern); - let name = measurement_name(dataset.name(), Some(*pattern), *format); - let measurement = benchmark_random_access( - accessor.as_ref(), - &name, - &indices, - time_limit, - STORAGE_NVME, - ) - .await?; - - targets.push(measurement.target); - measurements.push(measurement); - progress.inc(1); + // All datasets: run each access pattern with 4-part names. + for pattern in &ACCESS_PATTERNS { + let mut accessor = dataset.create(*format).await?; + accessor.open().await?; + let indices = generate_indices(dataset, *pattern); + let name = measurement_name(dataset.name(), Some(*pattern), *format); + let measurement = benchmark_random_access( + accessor.as_ref(), + &name, + &indices, + time_limit, + STORAGE_NVME, + ) + .await?; + + targets.push(measurement.target); + measurements.push(measurement); + progress.inc(1); + } } } From c8ed43cacbef0dfcc1d02a23ed4f8b003f4d24ca Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 10 Feb 2026 11:30:44 +0000 Subject: [PATCH 4/6] feat[bench]: add more random access Signed-off-by: Joe Isaacs --- benchmarks/random-access-bench/src/main.rs | 2 -- vortex-bench/src/datasets/feature_vectors.rs | 5 +++-- vortex-bench/src/random_access/take.rs | 8 +++++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 8762c571161..d91bdd1745c 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -412,7 +412,6 @@ async fn run_random_access( for dataset in datasets { for format in &formats { if dataset.name() == "taxi" { - // Taxi: also emit the old fixed-index benchmark for historical continuity. let mut accessor = dataset.create(*format).await?; accessor.open().await?; let name = measurement_name(dataset.name(), None, *format); @@ -430,7 +429,6 @@ async fn run_random_access( progress.inc(1); } - // All datasets: run each access pattern with 4-part names. for pattern in &ACCESS_PATTERNS { let mut accessor = dataset.create(*format).await?; accessor.open().await?; diff --git a/vortex-bench/src/datasets/feature_vectors.rs b/vortex-bench/src/datasets/feature_vectors.rs index 3b5e23f54aa..9f3f8b773d2 100644 --- a/vortex-bench/src/datasets/feature_vectors.rs +++ b/vortex-bench/src/datasets/feature_vectors.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::fs::File; use std::path::PathBuf; use std::sync::Arc; @@ -37,7 +38,7 @@ const BATCH_SIZE: usize = 100_000; /// Generate a synthetic feature vectors parquet file. /// -/// Schema: `id: Int64, embedding: FixedSizeList`. +/// Schema: `id: Int64, embedding: FixedSizeList`. /// This simulates a table of embedding vectors, common in ML workloads. pub async fn feature_vectors_parquet() -> Result { idempotent_async( @@ -55,7 +56,7 @@ pub async fn feature_vectors_parquet() -> Result { ), ])); - let file = std::fs::File::create(&temp_path)?; + let file = File::create(&temp_path)?; let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?; let mut rng = StdRng::seed_from_u64(42); diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index 5d0f1397120..9eb5cd8fecc 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::iter::once; use std::path::PathBuf; use anyhow::anyhow; @@ -15,6 +16,7 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::file::metadata::RowGroupMetaData; use stream::StreamExt; +use tokio::fs::File; use vortex::array::Array; use vortex::array::Canonical; use vortex::array::IntoArray; @@ -167,14 +169,14 @@ impl RandomAccessor for ParquetRandomAccessor { } async fn open(&mut self) -> anyhow::Result<()> { - let file = tokio::fs::File::open(&self.path).await?; + let file = File::open(&self.path).await?; let builder = ParquetRecordBatchStreamBuilder::new_with_options( file, ArrowReaderOptions::new().with_page_index(true), ) .await?; - let row_group_offsets = std::iter::once(0) + let row_group_offsets = once(0) .chain( builder .metadata() @@ -222,7 +224,7 @@ impl RandomAccessor for ParquetRandomAccessor { .collect_vec(); // Re-open the file for reading (Parquet builder consumes the file handle). - let file = tokio::fs::File::open(&meta.path).await?; + let file = File::open(&meta.path).await?; let builder = ParquetRecordBatchStreamBuilder::new_with_options( file, ArrowReaderOptions::new().with_page_index(true), From ed626bb4e403ba823531fbb943b55d7a77661df5 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 10 Feb 2026 12:30:14 +0000 Subject: [PATCH 5/6] feat[bench]: add more random access Signed-off-by: Joe Isaacs --- benchmarks/lance-bench/src/random_access.rs | 48 ++--- benchmarks/random-access-bench/src/main.rs | 203 ++++++------------- vortex-bench/src/datasets/feature_vectors.rs | 24 +++ vortex-bench/src/datasets/nested_lists.rs | 24 +++ vortex-bench/src/datasets/nested_structs.rs | 24 +++ vortex-bench/src/datasets/taxi_data.rs | 25 +++ vortex-bench/src/random_access/mod.rs | 16 +- vortex-bench/src/random_access/take.rs | 170 +++++----------- 8 files changed, 227 insertions(+), 307 deletions(-) diff --git a/benchmarks/lance-bench/src/random_access.rs b/benchmarks/lance-bench/src/random_access.rs index dd0c6ba0349..53138552742 100644 --- a/benchmarks/lance-bench/src/random_access.rs +++ b/benchmarks/lance-bench/src/random_access.rs @@ -78,29 +78,24 @@ pub async fn nested_structs_lance() -> anyhow::Result { /// Random accessor for Lance format files. /// -/// After `open()`, the dataset handle is stored and reused across `take()` calls. +/// The dataset handle is opened at construction time and reused across `take()` calls. pub struct LanceRandomAccessor { - path: PathBuf, name: String, - dataset: Option, + dataset: Dataset, } impl LanceRandomAccessor { - pub fn new(path: PathBuf) -> Self { - Self { - path, - name: "random-access/lance-tokio-local-disk".to_string(), - dataset: None, - } - } - - /// Create a new Lance random accessor with a custom name. - pub fn with_name(path: PathBuf, name: impl Into) -> Self { - Self { - path, + /// Open a Lance dataset and return a ready-to-use accessor. + pub async fn open(path: PathBuf, name: impl Into) -> anyhow::Result { + let dataset = Dataset::open( + path.to_str() + .ok_or_else(|| anyhow!("Invalid dataset path"))?, + ) + .await?; + Ok(Self { name: name.into(), - dataset: None, - } + dataset, + }) } } @@ -114,24 +109,9 @@ impl RandomAccessor for LanceRandomAccessor { &self.name } - async fn open(&mut self) -> anyhow::Result<()> { - let dataset = Dataset::open( - self.path - .to_str() - .ok_or_else(|| anyhow!("Invalid dataset path"))?, - ) - .await?; - self.dataset = Some(dataset); - Ok(()) - } - async fn take(&self, indices: &[u64]) -> anyhow::Result { - let dataset = self - .dataset - .as_ref() - .ok_or_else(|| anyhow!("accessor not opened; call open() first"))?; - let projection = ProjectionRequest::from_schema(dataset.schema().clone()); - let result = dataset.take(indices, projection).await?; + let projection = ProjectionRequest::from_schema(self.dataset.schema().clone()); + let result = self.dataset.take(indices, projection).await?; Ok(result.num_rows()) } } diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index d91bdd1745c..e524fb2ad57 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -1,14 +1,11 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::future::Future; use std::path::PathBuf; -use std::pin::Pin; use std::time::Duration; use std::time::Instant; use anyhow::Result; -use async_trait::async_trait; use clap::Parser; use clap::ValueEnum; use indicatif::ProgressBar; @@ -21,10 +18,10 @@ use vortex_bench::BenchmarkOutput; use vortex_bench::Engine; use vortex_bench::Format; use vortex_bench::Target; -use vortex_bench::datasets::feature_vectors::*; -use vortex_bench::datasets::nested_lists::*; -use vortex_bench::datasets::nested_structs::*; -use vortex_bench::datasets::taxi_data::*; +use vortex_bench::datasets::feature_vectors::FeatureVectorsData; +use vortex_bench::datasets::nested_lists::NestedListsData; +use vortex_bench::datasets::nested_structs::NestedStructsData; +use vortex_bench::datasets::taxi_data::TaxiData; use vortex_bench::display::DisplayFormat; use vortex_bench::display::print_measurements_json; use vortex_bench::display::render_table; @@ -36,129 +33,6 @@ use vortex_bench::random_access::VortexRandomAccessor; use vortex_bench::setup_logging_and_tracing; use vortex_bench::utils::constants::STORAGE_NVME; -// --------------------------------------------------------------------------- -// Dataset implementations -// --------------------------------------------------------------------------- - -/// Short format label used in benchmark measurement names. -fn format_label(format: Format) -> &'static str { - match format { - Format::OnDiskVortex => "vortex", - Format::VortexCompact => "vortex-compact", - Format::Parquet => "parquet", - Format::Lance => "lance", - other => unimplemented!("Random access bench not implemented for {other}"), - } -} - -/// Create a random accessor from a file path, dataset name, and format. -/// -/// This eliminates the repeated match-on-format boilerplate in each dataset. -fn create_accessor(path: PathBuf, dataset: &str, format: Format) -> Box { - let name = format!( - "random-access/{}/{}-tokio-local-disk", - dataset, - format_label(format) - ); - match format { - Format::OnDiskVortex | Format::VortexCompact => Box::new( - VortexRandomAccessor::with_name_and_format(path, name, format), - ), - Format::Parquet => Box::new(ParquetRandomAccessor::with_name(path, name)), - #[cfg(feature = "lance")] - Format::Lance => { - use lance_bench::random_access::LanceRandomAccessor; - Box::new(LanceRandomAccessor::with_name(path, name)) - } - other => unimplemented!("Random access bench not implemented for {other}"), - } -} - -/// A function returning a boxed future that resolves to a file path. -type PathFn = fn() -> Pin> + Send>>; - -/// Paths for a specific dataset, keyed by format. -struct DatasetPaths { - name: &'static str, - row_count: u64, - parquet: PathFn, - vortex: PathFn, - vortex_compact: PathFn, - #[cfg(feature = "lance")] - lance: PathFn, -} - -#[async_trait] -impl BenchDataset for DatasetPaths { - fn name(&self) -> &str { - self.name - } - - fn row_count(&self) -> u64 { - self.row_count - } - - async fn create(&self, format: Format) -> Result> { - let path = match format { - Format::OnDiskVortex => (self.vortex)().await?, - Format::VortexCompact => (self.vortex_compact)().await?, - Format::Parquet => (self.parquet)().await?, - #[cfg(feature = "lance")] - Format::Lance => (self.lance)().await?, - other => unimplemented!("Random access bench not implemented for {other}"), - }; - Ok(create_accessor(path, self.name, format)) - } -} - -fn taxi_dataset() -> DatasetPaths { - DatasetPaths { - name: "taxi", - row_count: 3_339_715, - parquet: || Box::pin(taxi_data_parquet()), - vortex: || Box::pin(taxi_data_vortex()), - vortex_compact: || Box::pin(taxi_data_vortex_compact()), - #[cfg(feature = "lance")] - lance: || Box::pin(lance_bench::random_access::taxi_data_lance()), - } -} - -fn feature_vectors_dataset() -> DatasetPaths { - DatasetPaths { - name: "feature-vectors", - row_count: 1_000_000, - parquet: || Box::pin(feature_vectors_parquet()), - vortex: || Box::pin(feature_vectors_vortex()), - vortex_compact: || Box::pin(feature_vectors_vortex_compact()), - #[cfg(feature = "lance")] - lance: || Box::pin(lance_bench::random_access::feature_vectors_lance()), - } -} - -fn nested_lists_dataset() -> DatasetPaths { - DatasetPaths { - name: "nested-lists", - row_count: 1_000_000, - parquet: || Box::pin(nested_lists_parquet()), - vortex: || Box::pin(nested_lists_vortex()), - vortex_compact: || Box::pin(nested_lists_vortex_compact()), - #[cfg(feature = "lance")] - lance: || Box::pin(lance_bench::random_access::nested_lists_lance()), - } -} - -fn nested_structs_dataset() -> DatasetPaths { - DatasetPaths { - name: "nested-structs", - row_count: 1_000_000, - parquet: || Box::pin(nested_structs_parquet()), - vortex: || Box::pin(nested_structs_vortex()), - vortex_compact: || Box::pin(nested_structs_vortex_compact()), - #[cfg(feature = "lance")] - lance: || Box::pin(lance_bench::random_access::nested_structs_lance()), - } -} - // --------------------------------------------------------------------------- // Access patterns // --------------------------------------------------------------------------- @@ -254,12 +128,12 @@ enum DatasetArg { } impl DatasetArg { - fn into_dataset(self) -> DatasetPaths { + fn into_dataset(self) -> Box { match self { - DatasetArg::Taxi => taxi_dataset(), - DatasetArg::FeatureVectors => feature_vectors_dataset(), - DatasetArg::NestedLists => nested_lists_dataset(), - DatasetArg::NestedStructs => nested_structs_dataset(), + DatasetArg::Taxi => Box::new(TaxiData), + DatasetArg::FeatureVectors => Box::new(FeatureVectorsData), + DatasetArg::NestedLists => Box::new(NestedListsData), + DatasetArg::NestedStructs => Box::new(NestedStructsData), } } } @@ -301,7 +175,7 @@ async fn main() -> Result<()> { setup_logging_and_tracing(args.verbose, args.tracing)?; - let datasets: Vec = args + let datasets: Vec> = args .datasets .into_iter() .map(|d| d.into_dataset()) @@ -362,14 +236,15 @@ async fn benchmark_random_access( /// For other datasets, includes dataset and pattern: /// `random-access/{dataset}/{pattern}/{format}-tokio-local-disk`. fn measurement_name(dataset: &str, pattern: Option, format: Format) -> String { + let fmt = format.ext(); match pattern { Some(p) => format!( "random-access/{}/{}/{}-tokio-local-disk", dataset, p.name(), - format_label(format) + fmt ), - None => format!("random-access/{}-tokio-local-disk", format_label(format)), + None => format!("random-access/{}-tokio-local-disk", fmt), } } @@ -384,6 +259,48 @@ fn format_to_engine(format: Format) -> Engine { } } +/// Open a random accessor for any supported format. +/// +/// For Vortex and Parquet, the path comes from [`BenchDataset::path`]. +/// For Lance (behind the `lance` feature), the path is resolved from lance-bench helpers. +async fn open_accessor( + dataset: &dyn BenchDataset, + format: Format, +) -> Result> { + let name = format!( + "random-access/{}/{}-tokio-local-disk", + dataset.name(), + format.ext() + ); + match format { + Format::OnDiskVortex | Format::VortexCompact => { + let path = dataset.path(format).await?; + Ok(Box::new( + VortexRandomAccessor::open(path, name, format).await?, + )) + } + Format::Parquet => { + let path = dataset.path(format).await?; + Ok(Box::new(ParquetRandomAccessor::open(path, name).await?)) + } + #[cfg(feature = "lance")] + Format::Lance => { + use lance_bench::random_access; + let path = match dataset.name() { + "taxi" => random_access::taxi_data_lance().await?, + "feature-vectors" => random_access::feature_vectors_lance().await?, + "nested-lists" => random_access::nested_lists_lance().await?, + "nested-structs" => random_access::nested_structs_lance().await?, + other => anyhow::bail!("Unknown dataset for Lance: {other}"), + }; + Ok(Box::new( + random_access::LanceRandomAccessor::open(path, name).await?, + )) + } + other => unimplemented!("open_accessor not implemented for {other}"), + } +} + /// The benchmark ID used for output path. const BENCHMARK_ID: &str = "random-access"; @@ -391,7 +308,7 @@ const BENCHMARK_ID: &str = "random-access"; const FIXED_TAXI_INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000]; async fn run_random_access( - datasets: &[DatasetPaths], + datasets: &[Box], formats: Vec, time_limit: u64, display_format: DisplayFormat, @@ -412,8 +329,7 @@ async fn run_random_access( for dataset in datasets { for format in &formats { if dataset.name() == "taxi" { - let mut accessor = dataset.create(*format).await?; - accessor.open().await?; + let accessor = open_accessor(dataset.as_ref(), *format).await?; let name = measurement_name(dataset.name(), None, *format); let measurement = benchmark_random_access( accessor.as_ref(), @@ -430,9 +346,8 @@ async fn run_random_access( } for pattern in &ACCESS_PATTERNS { - let mut accessor = dataset.create(*format).await?; - accessor.open().await?; - let indices = generate_indices(dataset, *pattern); + let accessor = open_accessor(dataset.as_ref(), *format).await?; + let indices = generate_indices(dataset.as_ref(), *pattern); let name = measurement_name(dataset.name(), Some(*pattern), *format); let measurement = benchmark_random_access( accessor.as_ref(), diff --git a/vortex-bench/src/datasets/feature_vectors.rs b/vortex-bench/src/datasets/feature_vectors.rs index 9f3f8b773d2..4928c0df326 100644 --- a/vortex-bench/src/datasets/feature_vectors.rs +++ b/vortex-bench/src/datasets/feature_vectors.rs @@ -13,6 +13,7 @@ use arrow_array::builder::Float32Builder; use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Schema; +use async_trait::async_trait; use parquet::arrow::ArrowWriter; use rand::Rng; use rand::SeedableRng; @@ -22,6 +23,7 @@ use crate::CompactionStrategy; use crate::Format; use crate::conversions::write_parquet_as_vortex; use crate::idempotent_async; +use crate::random_access::BenchDataset; use crate::random_access::data_path; /// Dataset identifier used for data path generation. @@ -30,6 +32,28 @@ pub const DATASET: &str = "feature_vectors"; /// Number of rows in the feature vectors dataset. pub const ROW_COUNT: usize = 1_000_000; +pub struct FeatureVectorsData; + +#[async_trait] +impl BenchDataset for FeatureVectorsData { + fn name(&self) -> &str { + "feature-vectors" + } + + fn row_count(&self) -> u64 { + ROW_COUNT as u64 + } + + async fn path(&self, format: Format) -> Result { + match format { + Format::OnDiskVortex => feature_vectors_vortex().await, + Format::VortexCompact => feature_vectors_vortex_compact().await, + Format::Parquet => feature_vectors_parquet().await, + other => unimplemented!("Random access bench not implemented for {other}"), + } + } +} + /// Dimensionality of each feature vector. const VECTOR_DIM: i32 = 1024; diff --git a/vortex-bench/src/datasets/nested_lists.rs b/vortex-bench/src/datasets/nested_lists.rs index e705886acec..5f0fbac80f5 100644 --- a/vortex-bench/src/datasets/nested_lists.rs +++ b/vortex-bench/src/datasets/nested_lists.rs @@ -12,6 +12,7 @@ use arrow_array::builder::ListBuilder; use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Schema; +use async_trait::async_trait; use parquet::arrow::ArrowWriter; use rand::Rng; use rand::SeedableRng; @@ -21,6 +22,7 @@ use crate::CompactionStrategy; use crate::Format; use crate::conversions::write_parquet_as_vortex; use crate::idempotent_async; +use crate::random_access::BenchDataset; use crate::random_access::data_path; /// Dataset identifier used for data path generation. @@ -29,6 +31,28 @@ pub const DATASET: &str = "nested_lists"; /// Number of rows in the nested lists dataset. pub const ROW_COUNT: usize = 1_000_000; +pub struct NestedListsData; + +#[async_trait] +impl BenchDataset for NestedListsData { + fn name(&self) -> &str { + "nested-lists" + } + + fn row_count(&self) -> u64 { + ROW_COUNT as u64 + } + + async fn path(&self, format: Format) -> Result { + match format { + Format::OnDiskVortex => nested_lists_vortex().await, + Format::VortexCompact => nested_lists_vortex_compact().await, + Format::Parquet => nested_lists_parquet().await, + other => unimplemented!("Random access bench not implemented for {other}"), + } + } +} + /// Maximum number of elements in each list. const MAX_LIST_LEN: usize = 20; diff --git a/vortex-bench/src/datasets/nested_structs.rs b/vortex-bench/src/datasets/nested_structs.rs index 74d7fd3c687..b3f362ed904 100644 --- a/vortex-bench/src/datasets/nested_structs.rs +++ b/vortex-bench/src/datasets/nested_structs.rs @@ -13,6 +13,7 @@ use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Fields; use arrow_schema::Schema; +use async_trait::async_trait; use parquet::arrow::ArrowWriter; use rand::Rng; use rand::SeedableRng; @@ -22,6 +23,7 @@ use crate::CompactionStrategy; use crate::Format; use crate::conversions::write_parquet_as_vortex; use crate::idempotent_async; +use crate::random_access::BenchDataset; use crate::random_access::data_path; /// Dataset identifier used for data path generation. @@ -30,6 +32,28 @@ pub const DATASET: &str = "nested_structs"; /// Number of rows in the nested structs dataset. pub const ROW_COUNT: usize = 1_000_000; +pub struct NestedStructsData; + +#[async_trait] +impl BenchDataset for NestedStructsData { + fn name(&self) -> &str { + "nested-structs" + } + + fn row_count(&self) -> u64 { + ROW_COUNT as u64 + } + + async fn path(&self, format: Format) -> Result { + match format { + Format::OnDiskVortex => nested_structs_vortex().await, + Format::VortexCompact => nested_structs_vortex_compact().await, + Format::Parquet => nested_structs_parquet().await, + other => unimplemented!("Random access bench not implemented for {other}"), + } + } +} + /// Batch size for data generation. const BATCH_SIZE: usize = 100_000; diff --git a/vortex-bench/src/datasets/taxi_data.rs b/vortex-bench/src/datasets/taxi_data.rs index 17396ebb497..133f7c55586 100644 --- a/vortex-bench/src/datasets/taxi_data.rs +++ b/vortex-bench/src/datasets/taxi_data.rs @@ -13,16 +13,21 @@ use vortex::file::OpenOptionsSessionExt; use vortex::file::WriteOptionsSessionExt; use crate::CompactionStrategy; +use crate::Format; use crate::IdempotentPath; use crate::SESSION; use crate::conversions::parquet_to_vortex_chunks; use crate::datasets::Dataset; use crate::datasets::data_downloads::download_data; use crate::idempotent_async; +use crate::random_access::BenchDataset; /// Dataset identifier used for data path generation. pub const DATASET: &str = "taxi"; +/// Total number of rows in the taxi dataset. +pub const ROW_COUNT: u64 = 3_339_715; + pub struct TaxiData; #[async_trait] @@ -40,6 +45,26 @@ impl Dataset for TaxiData { } } +#[async_trait] +impl BenchDataset for TaxiData { + fn name(&self) -> &str { + "taxi" + } + + fn row_count(&self) -> u64 { + ROW_COUNT + } + + async fn path(&self, format: Format) -> Result { + match format { + Format::OnDiskVortex => taxi_data_vortex().await, + Format::VortexCompact => taxi_data_vortex_compact().await, + Format::Parquet => taxi_data_parquet().await, + other => unimplemented!("Random access bench not implemented for {other}"), + } + } +} + pub async fn taxi_data_parquet() -> Result { let taxi_parquet_fpath = "taxi/taxi.parquet".to_data_path(); let taxi_data_url = diff --git a/vortex-bench/src/random_access/mod.rs b/vortex-bench/src/random_access/mod.rs index 5e42c02bc7c..5421ebb01b5 100644 --- a/vortex-bench/src/random_access/mod.rs +++ b/vortex-bench/src/random_access/mod.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::path::PathBuf; + use anyhow::Result; use async_trait::async_trait; @@ -24,7 +26,7 @@ pub fn data_path(dataset: &str, format: Format) -> String { } } -/// Trait for a benchmark dataset that knows how to write files and create accessors. +/// Trait for a benchmark dataset that knows how to prepare data files. #[async_trait] pub trait BenchDataset: Send + Sync { /// A descriptive name for this dataset (used in benchmark output and CLI). @@ -33,17 +35,16 @@ pub trait BenchDataset: Send + Sync { /// The total number of rows in this dataset. fn row_count(&self) -> u64; - /// Create a format-specific random accessor for this dataset. + /// Prepare the data file for the given format and return its path. /// - /// This prepares the data file (writing it if necessary) and returns an - /// accessor that can be opened and used for random access benchmarks. - async fn create(&self, format: Format) -> Result>; + /// This writes the file if it doesn't already exist. + async fn path(&self, format: Format) -> Result; } /// Trait for format-specific random access (take) operations. /// /// Implementations handle reading specific rows by index from a data source. -/// The lifecycle is: construct -> `open()` (parse metadata) -> `take()` (I/O). +/// Accessors are constructed in a ready-to-use state with metadata already parsed. #[async_trait] pub trait RandomAccessor: Send + Sync { /// A descriptive name for this accessor (used in benchmark output). @@ -52,9 +53,6 @@ pub trait RandomAccessor: Send + Sync { /// The format this accessor handles. fn format(&self) -> Format; - /// Open the file and parse metadata. This is not timed in benchmarks. - async fn open(&mut self) -> Result<()>; - /// Take rows at the given indices, returning the number of rows read. async fn take(&self, indices: &[u64]) -> Result; } diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index 9eb5cd8fecc..df48cd075c0 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -4,7 +4,6 @@ use std::iter::once; use std::path::PathBuf; -use anyhow::anyhow; use arrow_array::PrimitiveArray; use arrow_array::types::Int64Type; use arrow_select::concat::concat_batches; @@ -13,8 +12,8 @@ use async_trait::async_trait; use futures::stream; use itertools::Itertools; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; use parquet::arrow::arrow_reader::ArrowReaderOptions; -use parquet::file::metadata::RowGroupMetaData; use stream::StreamExt; use tokio::fs::File; use vortex::array::Array; @@ -33,53 +32,26 @@ use crate::random_access::RandomAccessor; /// Random accessor for Vortex format files. /// -/// After `open()`, the file handle is stored and reused across `take()` calls. +/// The file handle is opened at construction time and reused across `take()` calls. pub struct VortexRandomAccessor { - path: PathBuf, name: String, format: Format, - file: Option, + file: VortexFile, } impl VortexRandomAccessor { - /// Create a new Vortex random accessor for the given file path. - pub fn new(path: PathBuf) -> Self { - Self { - path, - name: "random-access/vortex-tokio-local-disk".to_string(), - format: Format::OnDiskVortex, - file: None, - } - } - - /// Create a new Vortex random accessor with a custom name. - pub fn with_name(path: PathBuf, name: impl Into) -> Self { - Self { - path, - name: name.into(), - format: Format::OnDiskVortex, - file: None, - } - } - - /// Create a new Vortex random accessor with a custom name and format. - pub fn with_name_and_format(path: PathBuf, name: impl Into, format: Format) -> Self { - Self { - path, + /// Open a Vortex file and return a ready-to-use accessor. + pub async fn open( + path: impl AsRef, + name: impl Into, + format: Format, + ) -> anyhow::Result { + let file = SESSION.open_options().open_path(path.as_ref()).await?; + Ok(Self { name: name.into(), format, - file: None, - } - } - - /// Create a new Vortex random accessor for compact format. - pub fn compact(path: PathBuf) -> Self { - Self { - path, - name: "random-access/vortex-compact-tokio-local-disk".to_string(), - format: Format::VortexCompact, - file: None, - } + file, + }) } } @@ -93,20 +65,10 @@ impl RandomAccessor for VortexRandomAccessor { &self.name } - async fn open(&mut self) -> anyhow::Result<()> { - let file = SESSION.open_options().open_path(&self.path).await?; - self.file = Some(file); - Ok(()) - } - async fn take(&self, indices: &[u64]) -> anyhow::Result { - let file = self - .file - .as_ref() - .ok_or_else(|| anyhow!("accessor not opened; call open() first"))?; - let indices_buf: Buffer = Buffer::from(indices.to_vec()); - let array = file + let array = self + .file .scan()? .with_row_indices(indices_buf) .into_array_stream()? @@ -120,69 +82,34 @@ impl RandomAccessor for VortexRandomAccessor { } } -/// Pre-computed Parquet metadata stored after `open()`. -struct ParquetMetadata { - /// Cumulative row offsets per row group (length = num_row_groups + 1). - row_group_offsets: Vec, - /// Path to the Parquet file (for re-opening on each take). - path: PathBuf, -} - /// Random accessor for Parquet format files. /// -/// After `open()`, the file metadata and row group offsets are stored and +/// Parquet footer and row group offsets are parsed at construction time and /// reused to map indices to row groups in each `take()` call. pub struct ParquetRandomAccessor { - path: PathBuf, name: String, - metadata: Option, + /// Cumulative row offsets per row group (length = num_row_groups + 1). + row_group_offsets: Vec, + /// Cached Arrow reader metadata (footer) to avoid re-parsing on each take. + arrow_metadata: ArrowReaderMetadata, + /// Path to the Parquet file (for re-opening on each take). + path: PathBuf, } impl ParquetRandomAccessor { - /// Create a new Parquet random accessor for the given file path. - pub fn new(path: PathBuf) -> Self { - Self { - path, - name: "random-access/parquet-tokio-local-disk".to_string(), - metadata: None, - } - } - - /// Create a new Parquet random accessor with a custom name. - pub fn with_name(path: PathBuf, name: impl Into) -> Self { - Self { - path, - name: name.into(), - metadata: None, - } - } -} - -#[async_trait] -impl RandomAccessor for ParquetRandomAccessor { - fn format(&self) -> Format { - Format::Parquet - } - - fn name(&self) -> &str { - &self.name - } - - async fn open(&mut self) -> anyhow::Result<()> { - let file = File::open(&self.path).await?; - let builder = ParquetRecordBatchStreamBuilder::new_with_options( - file, - ArrowReaderOptions::new().with_page_index(true), - ) - .await?; + /// Open a Parquet file, parse the footer, and return a ready-to-use accessor. + pub async fn open(path: PathBuf, name: impl Into) -> anyhow::Result { + let mut file = File::open(&path).await?; + let options = ArrowReaderOptions::new().with_page_index(true); + let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?; let row_group_offsets = once(0) .chain( - builder + arrow_metadata .metadata() .row_groups() .iter() - .map(RowGroupMetaData::num_rows), + .map(|rg| rg.num_rows()), ) .scan(0i64, |acc, x| { *acc += x; @@ -190,31 +117,37 @@ impl RandomAccessor for ParquetRandomAccessor { }) .collect::>(); - self.metadata = Some(ParquetMetadata { + Ok(Self { + name: name.into(), row_group_offsets, - path: self.path.clone(), - }); + arrow_metadata, + path, + }) + } +} - Ok(()) +#[async_trait] +impl RandomAccessor for ParquetRandomAccessor { + fn format(&self) -> Format { + Format::Parquet } - async fn take(&self, indices: &[u64]) -> anyhow::Result { - let meta = self - .metadata - .as_ref() - .ok_or_else(|| anyhow!("accessor not opened; call open() first"))?; + fn name(&self) -> &str { + &self.name + } + async fn take(&self, indices: &[u64]) -> anyhow::Result { // Map indices to row groups. let mut row_groups = HashMap::new(); for &idx in indices { - let row_group_idx = meta + let row_group_idx = self .row_group_offsets .binary_search(&(idx as i64)) .unwrap_or_else(|e| e - 1); row_groups .entry(row_group_idx) .or_insert_with(Vec::new) - .push((idx as i64) - meta.row_group_offsets[row_group_idx]); + .push((idx as i64) - self.row_group_offsets[row_group_idx]); } let sorted_row_group_keys = row_groups.keys().copied().sorted().collect_vec(); @@ -223,13 +156,10 @@ impl RandomAccessor for ParquetRandomAccessor { .map(|i| row_groups[i].clone()) .collect_vec(); - // Re-open the file for reading (Parquet builder consumes the file handle). - let file = File::open(&meta.path).await?; - let builder = ParquetRecordBatchStreamBuilder::new_with_options( - file, - ArrowReaderOptions::new().with_page_index(true), - ) - .await?; + // Re-open the file but reuse cached metadata (avoids re-parsing the footer). + let file = File::open(&self.path).await?; + let builder = + ParquetRecordBatchStreamBuilder::new_with_metadata(file, self.arrow_metadata.clone()); let reader = builder .with_row_groups(sorted_row_group_keys) From 9335cada14788ea71c242f5e7b314645a6788df7 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 10 Feb 2026 14:08:53 +0000 Subject: [PATCH 6/6] fix Signed-off-by: Joe Isaacs --- benchmarks/random-access-bench/src/main.rs | 113 +++++++++++++++------ 1 file changed, 81 insertions(+), 32 deletions(-) diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index e524fb2ad57..31aa4946baf 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -114,6 +114,20 @@ fn generate_indices(dataset: &dyn BenchDataset, pattern: AccessPattern) -> Vec, + /// Whether to reopen the file on each iteration, use a cached handle, or run both. + #[arg(long, value_enum, default_value_t = OpenMode::Both)] + open_mode: OpenMode, } #[tokio::main] @@ -185,6 +202,7 @@ async fn main() -> Result<()> { &datasets, args.formats, args.time_limit, + args.open_mode, args.display_format, args.output_path, ) @@ -195,22 +213,26 @@ async fn main() -> Result<()> { // Benchmark core // --------------------------------------------------------------------------- -/// Run a random access benchmark for the given accessor (already opened). +/// Run a random access benchmark. /// /// Runs the take operation repeatedly until the time limit is reached, -/// collecting timing for each run. +/// collecting timing for each run. When `reopen` is true, the accessor is +/// recreated from scratch before each iteration so that file metadata +/// parsing is included in the timing. async fn benchmark_random_access( - accessor: &dyn RandomAccessor, + dataset: &dyn BenchDataset, + format: Format, measurement_name: &str, indices: &[u64], time_limit_secs: u64, storage: &str, + reopen: bool, ) -> Result { let time_limit = Duration::from_secs(time_limit_secs); let overall_start = Instant::now(); let mut runs = Vec::new(); + let mut accessor = open_accessor(dataset, format).await?; - // Run at least once, then continue until time limit loop { let start = Instant::now(); let _row_count = accessor.take(indices).await?; @@ -219,12 +241,16 @@ async fn benchmark_random_access( if overall_start.elapsed() >= time_limit { break; } + + if reopen { + accessor = open_accessor(dataset, format).await?; + } } Ok(TimingMeasurement { name: measurement_name.to_string(), storage: storage.to_string(), - target: Target::new(format_to_engine(accessor.format()), accessor.format()), + target: Target::new(format_to_engine(format), format), runs, }) } @@ -311,14 +337,21 @@ async fn run_random_access( datasets: &[Box], formats: Vec, time_limit: u64, + open_mode: OpenMode, display_format: DisplayFormat, output_path: Option, ) -> Result<()> { + let reopen_variants: &[bool] = match open_mode { + OpenMode::Cached => &[false], + OpenMode::Reopen => &[true], + OpenMode::Both => &[false, true], + }; + let total_steps: usize = datasets .iter() .map(|d| { let legacy_extra = if d.name() == "taxi" { formats.len() } else { 0 }; - formats.len() * ACCESS_PATTERNS.len() + legacy_extra + (formats.len() * ACCESS_PATTERNS.len() + legacy_extra) * reopen_variants.len() }) .sum(); let progress = ProgressBar::new(total_steps as u64); @@ -329,38 +362,54 @@ async fn run_random_access( for dataset in datasets { for format in &formats { if dataset.name() == "taxi" { - let accessor = open_accessor(dataset.as_ref(), *format).await?; let name = measurement_name(dataset.name(), None, *format); - let measurement = benchmark_random_access( - accessor.as_ref(), - &name, - &FIXED_TAXI_INDICES, - time_limit, - STORAGE_NVME, - ) - .await?; - - targets.push(measurement.target); - measurements.push(measurement); - progress.inc(1); + for &reopen in reopen_variants { + let bench_name = if reopen { + format!("{name}-footer") + } else { + name.clone() + }; + let measurement = benchmark_random_access( + dataset.as_ref(), + *format, + &bench_name, + &FIXED_TAXI_INDICES, + time_limit, + STORAGE_NVME, + reopen, + ) + .await?; + + targets.push(measurement.target); + measurements.push(measurement); + progress.inc(1); + } } for pattern in &ACCESS_PATTERNS { - let accessor = open_accessor(dataset.as_ref(), *format).await?; let indices = generate_indices(dataset.as_ref(), *pattern); let name = measurement_name(dataset.name(), Some(*pattern), *format); - let measurement = benchmark_random_access( - accessor.as_ref(), - &name, - &indices, - time_limit, - STORAGE_NVME, - ) - .await?; - - targets.push(measurement.target); - measurements.push(measurement); - progress.inc(1); + for &reopen in reopen_variants { + let bench_name = if reopen { + format!("{name}-footer") + } else { + name.clone() + }; + let measurement = benchmark_random_access( + dataset.as_ref(), + *format, + &bench_name, + &indices, + time_limit, + STORAGE_NVME, + reopen, + ) + .await?; + + targets.push(measurement.target); + measurements.push(measurement); + progress.inc(1); + } } } }