diff --git a/Cargo.lock b/Cargo.lock index 578056b75b5..091dff9418d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7830,9 +7830,12 @@ name = "random-access-bench" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "clap", "indicatif", "lance-bench", + "rand 0.9.2", + "rand_distr 0.5.1", "tokio", "vortex", "vortex-bench", diff --git a/benchmarks-website-v2/server.js b/benchmarks-website-v2/server.js index 896748dd22c..91d724f8520 100644 --- a/benchmarks-website-v2/server.js +++ b/benchmarks-website-v2/server.js @@ -271,9 +271,21 @@ async function refresh() { } if (!groups[group]) continue; - const [query, series] = b.name.split("/"); - const seriesName = rename(series || "default"); - const chartName = formatQuery(query); + // Random access names have the form: random-access/{dataset}/{pattern}/{format} + // Historical random access names: random-access/{format} + // Other benchmarks use: {query}/{series} + let seriesName, chartName; + const parts = b.name.split("/"); + if (group === "Random Access" && parts.length === 4) { + chartName = `${parts[1]}/${parts[2]}`.toUpperCase().replace(/[_-]/g, " "); + seriesName = rename(parts[3] || "default"); + } else if (group === "Random Access" && parts.length === 2) { + chartName = "RANDOM ACCESS"; + seriesName = rename(parts[1] || "default"); + } else { + seriesName = rename(parts[1] || "default"); + chartName = formatQuery(parts[0]); + } if (chartName.includes("PARQUET-UNC")) continue; // Skip throughput metrics (keep only time/size) @@ -286,7 +298,7 @@ async function refresh() { else unit = "ns"; } - const sortPos = query.match(/q(\d+)$/i)?.[1] + const sortPos = parts[0].match(/q(\d+)$/i)?.[1] ? parseInt(RegExp.$1, 10) : 0; const idx = commitIdx.get(commit.id); diff --git a/benchmarks-website-v2/src/config.js b/benchmarks-website-v2/src/config.js index 0caac21e4df..c6ce9060428 100644 --- a/benchmarks-website-v2/src/config.js +++ b/benchmarks-website-v2/src/config.js @@ -74,6 +74,7 @@ export const ENGINE_RENAMES = { "duckdb:duckdb": "duckdb:duckdb", "duckdb:vortex-compact": "duckdb:vortex-compact", "vortex-tokio-local-disk": "vortex-nvme", + "vortex-compact-tokio-local-disk": "vortex-compact-nvme", "lance-tokio-local-disk": "lance-nvme", "parquet-tokio-local-disk": "parquet-nvme", lance: "lance", @@ -89,6 +90,7 @@ const BESPOKE_CONFIGS = [ name: "Random Access", renamedDatasets: { "vortex-tokio-local-disk": "vortex-nvme", + "vortex-compact-tokio-local-disk": "vortex-compact-nvme", "lance-tokio-local-disk": "lance-nvme", "parquet-tokio-local-disk": "parquet-nvme", }, @@ -242,6 +244,7 @@ export const ENGINE_LABELS = { // Series color map export const SERIES_COLOR_MAP = { "vortex-nvme": "#19a508", + "vortex-compact-nvme": "#15850a", "parquet-nvme": "#ef7f1d", "lance-nvme": "#3B82F6", "datafusion:arrow": "#7a27b1", diff --git a/benchmarks/lance-bench/src/random_access.rs b/benchmarks/lance-bench/src/random_access.rs index 83ff5e7b314..53138552742 100644 --- a/benchmarks/lance-bench/src/random_access.rs +++ b/benchmarks/lance-bench/src/random_access.rs @@ -12,14 +12,19 @@ use lance::dataset::WriteParams; use lance_encoding::version::LanceFileVersion; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use vortex_bench::Format; -use vortex_bench::datasets::taxi_data::taxi_data_parquet; +use vortex_bench::datasets::feature_vectors; +use vortex_bench::datasets::nested_lists; +use vortex_bench::datasets::nested_structs; +use vortex_bench::datasets::taxi_data; use vortex_bench::idempotent_async; use vortex_bench::random_access::RandomAccessor; +use vortex_bench::random_access::data_path; -pub async fn taxi_data_lance() -> anyhow::Result { - idempotent_async("taxi/taxi.lance", |output_fname| async move { - let parquet_path = taxi_data_parquet().await?; - +/// Convert a parquet file to lance format. +/// +/// Uses `idempotent_async` to skip conversion if the output already exists. +async fn parquet_to_lance_file(parquet_path: PathBuf, lance_path: &str) -> anyhow::Result { + idempotent_async(lance_path, |output_fname| async move { let file = File::open(&parquet_path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; let reader = builder.build()?; @@ -39,13 +44,58 @@ pub async fn taxi_data_lance() -> anyhow::Result { .await } +pub async fn taxi_data_lance() -> anyhow::Result { + let parquet_path = taxi_data::taxi_data_parquet().await?; + parquet_to_lance_file(parquet_path, &data_path(taxi_data::DATASET, Format::Lance)).await +} + +pub async fn feature_vectors_lance() -> anyhow::Result { + let parquet_path = feature_vectors::feature_vectors_parquet().await?; + parquet_to_lance_file( + parquet_path, + &data_path(feature_vectors::DATASET, Format::Lance), + ) + .await +} + +pub async fn nested_lists_lance() -> anyhow::Result { + let parquet_path = nested_lists::nested_lists_parquet().await?; + parquet_to_lance_file( + parquet_path, + &data_path(nested_lists::DATASET, Format::Lance), + ) + .await +} + +pub async fn nested_structs_lance() -> anyhow::Result { + let parquet_path = nested_structs::nested_structs_parquet().await?; + parquet_to_lance_file( + parquet_path, + &data_path(nested_structs::DATASET, Format::Lance), + ) + .await +} + +/// Random accessor for Lance format files. +/// +/// The dataset handle is opened at construction time and reused across `take()` calls. pub struct LanceRandomAccessor { - path: PathBuf, + name: String, + dataset: Dataset, } impl LanceRandomAccessor { - pub fn new(path: PathBuf) -> Self { - Self { path } + /// Open a Lance dataset and return a ready-to-use accessor. + pub async fn open(path: PathBuf, name: impl Into) -> anyhow::Result { + let dataset = Dataset::open( + path.to_str() + .ok_or_else(|| anyhow!("Invalid dataset path"))?, + ) + .await?; + Ok(Self { + name: name.into(), + dataset, + }) } } @@ -56,22 +106,12 @@ impl RandomAccessor for LanceRandomAccessor { } fn name(&self) -> &str { - "random-access/lance-tokio-local-disk" - } - - fn path(&self) -> &PathBuf { - &self.path + &self.name } - async fn take(&self, indices: Vec) -> anyhow::Result { - let dataset = Dataset::open( - self.path - .to_str() - .ok_or_else(|| anyhow!("Invalid dataset path"))?, - ) - .await?; - let projection = ProjectionRequest::from_schema(dataset.schema().clone()); // All columns. - let result = dataset.take(indices.as_slice(), projection).await?; + async fn take(&self, indices: &[u64]) -> anyhow::Result { + let projection = ProjectionRequest::from_schema(self.dataset.schema().clone()); + let result = self.dataset.take(indices, projection).await?; Ok(result.num_rows()) } } diff --git a/benchmarks/random-access-bench/Cargo.toml b/benchmarks/random-access-bench/Cargo.toml index 6186090e3e4..b98c5f0a1a1 100644 --- a/benchmarks/random-access-bench/Cargo.toml +++ b/benchmarks/random-access-bench/Cargo.toml @@ -16,9 +16,12 @@ publish = false [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } indicatif = { workspace = true } lance-bench = { path = "../lance-bench", optional = true } +rand = { workspace = true } +rand_distr = { workspace = true } tokio = { workspace = true, features = ["full"] } vortex = { workspace = true } vortex-bench = { workspace = true } diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 37c8167de27..31aa4946baf 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -5,23 +5,153 @@ use std::path::PathBuf; use std::time::Duration; use std::time::Instant; +use anyhow::Result; use clap::Parser; +use clap::ValueEnum; use indicatif::ProgressBar; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use rand_distr::Distribution; +use rand_distr::Exp; use vortex_bench::BenchmarkOutput; use vortex_bench::Engine; use vortex_bench::Format; use vortex_bench::Target; -use vortex_bench::datasets::taxi_data::*; +use vortex_bench::datasets::feature_vectors::FeatureVectorsData; +use vortex_bench::datasets::nested_lists::NestedListsData; +use vortex_bench::datasets::nested_structs::NestedStructsData; +use vortex_bench::datasets::taxi_data::TaxiData; use vortex_bench::display::DisplayFormat; use vortex_bench::display::print_measurements_json; use vortex_bench::display::render_table; use vortex_bench::measurements::TimingMeasurement; +use vortex_bench::random_access::BenchDataset; use vortex_bench::random_access::ParquetRandomAccessor; use vortex_bench::random_access::RandomAccessor; use vortex_bench::random_access::VortexRandomAccessor; use vortex_bench::setup_logging_and_tracing; use vortex_bench::utils::constants::STORAGE_NVME; +// --------------------------------------------------------------------------- +// Access patterns +// --------------------------------------------------------------------------- + +/// Access pattern for random access benchmarks. +#[derive(Clone, Copy, Debug)] +enum AccessPattern { + /// Multiple clusters of sequential indices scattered across the dataset, + /// simulating workloads with spatial locality (e.g. scanning nearby records). + Correlated, + /// Indices generated by a Poisson process (exponential inter-arrival times) + /// spread uniformly across the dataset, simulating random lookups with no locality. + Uniform, +} + +impl AccessPattern { + fn name(&self) -> &'static str { + match self { + AccessPattern::Correlated => "correlated", + AccessPattern::Uniform => "uniform", + } + } +} + +const ACCESS_PATTERNS: [AccessPattern; 2] = [AccessPattern::Correlated, AccessPattern::Uniform]; + +/// Number of clusters for the correlated pattern. +const NUM_CLUSTERS: usize = 5; + +/// Number of consecutive indices per cluster. +const CLUSTER_SIZE: usize = 20; + +/// Expected number of indices for the Poisson (uniform) pattern. +const POISSON_EXPECTED_COUNT: usize = 100; + +/// Generate indices for the given dataset and access pattern. +fn generate_indices(dataset: &dyn BenchDataset, pattern: AccessPattern) -> Vec { + let row_count = dataset.row_count(); + let mut rng = StdRng::seed_from_u64(42); + + match pattern { + AccessPattern::Correlated => { + // Pick random cluster starts, then emit CLUSTER_SIZE consecutive indices from each. + let mut indices = Vec::with_capacity(NUM_CLUSTERS * CLUSTER_SIZE); + for _ in 0..NUM_CLUSTERS { + let start = rng.random_range(0..row_count.saturating_sub(CLUSTER_SIZE as u64)); + for offset in 0..CLUSTER_SIZE as u64 { + indices.push(start + offset); + } + } + indices.sort_unstable(); + indices + } + AccessPattern::Uniform => { + // Poisson process: exponential inter-arrival times with rate chosen to yield + // ~POISSON_EXPECTED_COUNT indices across the dataset. + let rate = POISSON_EXPECTED_COUNT as f64 / row_count as f64; + // SAFETY: rate is always positive (POISSON_EXPECTED_COUNT > 0, row_count > 0). + #[allow(clippy::unwrap_used)] + let exp = Exp::new(rate).unwrap(); + let mut indices = Vec::with_capacity(POISSON_EXPECTED_COUNT); + let mut pos = 0.0_f64; + loop { + let gap: f64 = exp.sample(&mut rng); + pos += gap; + #[allow(clippy::cast_possible_truncation)] + let idx = pos as u64; + if idx >= row_count { + break; + } + indices.push(idx); + } + indices + } + } +} + +// --------------------------------------------------------------------------- +// CLI +// --------------------------------------------------------------------------- + +/// Controls whether the file handle is reused or reopened each iteration. +#[derive(ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] +enum OpenMode { + /// Reuse the file handle across iterations (cached metadata). + #[clap(name = "cached")] + Cached, + /// Reopen the file each iteration (includes footer parsing). + #[clap(name = "reopen")] + Reopen, + /// Run both cached and reopen variants. + #[clap(name = "both")] + Both, +} + +/// Which synthetic dataset to benchmark. +#[derive(ValueEnum, Clone, Copy, Debug)] +enum DatasetArg { + #[clap(name = "taxi")] + Taxi, + #[clap(name = "feature-vectors")] + FeatureVectors, + #[clap(name = "nested-lists")] + NestedLists, + #[clap(name = "nested-structs")] + NestedStructs, +} + +impl DatasetArg { + fn into_dataset(self) -> Box { + match self { + DatasetArg::Taxi => Box::new(TaxiData), + DatasetArg::FeatureVectors => Box::new(FeatureVectorsData), + DatasetArg::NestedLists => Box::new(NestedListsData), + DatasetArg::NestedStructs => Box::new(NestedStructsData), + } + } +} + #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { @@ -43,71 +173,67 @@ struct Args { display_format: DisplayFormat, #[arg(short)] output_path: Option, + /// Which datasets to benchmark random access on. + #[arg( + long, + value_delimiter = ',', + value_enum, + default_values_t = vec![DatasetArg::Taxi, DatasetArg::FeatureVectors, DatasetArg::NestedLists, DatasetArg::NestedStructs] + )] + datasets: Vec, + /// Whether to reopen the file on each iteration, use a cached handle, or run both. + #[arg(long, value_enum, default_value_t = OpenMode::Both)] + open_mode: OpenMode, } #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<()> { let args = Args::parse(); setup_logging_and_tracing(args.verbose, args.tracing)?; - // Row count of the dataset is 3,339,715. - let indices = vec![10u64, 11, 12, 13, 100_000, 3_000_000]; + let datasets: Vec> = args + .datasets + .into_iter() + .map(|d| d.into_dataset()) + .collect(); run_random_access( + &datasets, args.formats, args.time_limit, + args.open_mode, args.display_format, - indices, args.output_path, ) .await } -/// Create a random accessor for the given format using taxi data. -async fn get_accessor(format: Format) -> anyhow::Result> { - match format { - Format::OnDiskVortex => { - let path = taxi_data_vortex().await?; - Ok(Box::new(VortexRandomAccessor::new(path))) - } - Format::VortexCompact => { - let path = taxi_data_vortex_compact().await?; - Ok(Box::new(VortexRandomAccessor::compact(path))) - } - Format::Parquet => { - let path = taxi_data_parquet().await?; - Ok(Box::new(ParquetRandomAccessor::new(path))) - } - #[cfg(feature = "lance")] - Format::Lance => { - use lance_bench::random_access::LanceRandomAccessor; - use lance_bench::random_access::taxi_data_lance; - - let path = taxi_data_lance().await?; - Ok(Box::new(LanceRandomAccessor::new(path))) - } - _ => unimplemented!("Random access bench not implemented for {format}"), - } -} +// --------------------------------------------------------------------------- +// Benchmark core +// --------------------------------------------------------------------------- -/// Run a random access benchmark for the given accessor. +/// Run a random access benchmark. /// /// Runs the take operation repeatedly until the time limit is reached, -/// collecting timing for each run. +/// collecting timing for each run. When `reopen` is true, the accessor is +/// recreated from scratch before each iteration so that file metadata +/// parsing is included in the timing. async fn benchmark_random_access( - accessor: &dyn RandomAccessor, + dataset: &dyn BenchDataset, + format: Format, + measurement_name: &str, indices: &[u64], time_limit_secs: u64, storage: &str, -) -> anyhow::Result { + reopen: bool, +) -> Result { let time_limit = Duration::from_secs(time_limit_secs); let overall_start = Instant::now(); let mut runs = Vec::new(); + let mut accessor = open_accessor(dataset, format).await?; - // Run at least once, then continue until time limit loop { - let indices = indices.to_vec(); let start = Instant::now(); let _row_count = accessor.take(indices).await?; runs.push(start.elapsed()); @@ -115,16 +241,39 @@ async fn benchmark_random_access( if overall_start.elapsed() >= time_limit { break; } + + if reopen { + accessor = open_accessor(dataset, format).await?; + } } Ok(TimingMeasurement { - name: accessor.name().to_string(), + name: measurement_name.to_string(), storage: storage.to_string(), - target: Target::new(format_to_engine(accessor.format()), accessor.format()), + target: Target::new(format_to_engine(format), format), runs, }) } +/// Build a measurement name for a benchmark run. +/// +/// For taxi (legacy), the name is `random-access/{format}-tokio-local-disk` to preserve +/// historical continuity with existing benchmark data. +/// For other datasets, includes dataset and pattern: +/// `random-access/{dataset}/{pattern}/{format}-tokio-local-disk`. +fn measurement_name(dataset: &str, pattern: Option, format: Format) -> String { + let fmt = format.ext(); + match pattern { + Some(p) => format!( + "random-access/{}/{}/{}-tokio-local-disk", + dataset, + p.name(), + fmt + ), + None => format!("random-access/{}-tokio-local-disk", fmt), + } +} + /// Map format to the appropriate engine for random access benchmarks. fn format_to_engine(format: Format) -> Engine { match format { @@ -136,30 +285,133 @@ fn format_to_engine(format: Format) -> Engine { } } +/// Open a random accessor for any supported format. +/// +/// For Vortex and Parquet, the path comes from [`BenchDataset::path`]. +/// For Lance (behind the `lance` feature), the path is resolved from lance-bench helpers. +async fn open_accessor( + dataset: &dyn BenchDataset, + format: Format, +) -> Result> { + let name = format!( + "random-access/{}/{}-tokio-local-disk", + dataset.name(), + format.ext() + ); + match format { + Format::OnDiskVortex | Format::VortexCompact => { + let path = dataset.path(format).await?; + Ok(Box::new( + VortexRandomAccessor::open(path, name, format).await?, + )) + } + Format::Parquet => { + let path = dataset.path(format).await?; + Ok(Box::new(ParquetRandomAccessor::open(path, name).await?)) + } + #[cfg(feature = "lance")] + Format::Lance => { + use lance_bench::random_access; + let path = match dataset.name() { + "taxi" => random_access::taxi_data_lance().await?, + "feature-vectors" => random_access::feature_vectors_lance().await?, + "nested-lists" => random_access::nested_lists_lance().await?, + "nested-structs" => random_access::nested_structs_lance().await?, + other => anyhow::bail!("Unknown dataset for Lance: {other}"), + }; + Ok(Box::new( + random_access::LanceRandomAccessor::open(path, name).await?, + )) + } + other => unimplemented!("open_accessor not implemented for {other}"), + } +} + /// The benchmark ID used for output path. const BENCHMARK_ID: &str = "random-access"; +/// Fixed indices used by the original taxi benchmark (preserved for historical continuity). +const FIXED_TAXI_INDICES: [u64; 6] = [10, 11, 12, 13, 100_000, 3_000_000]; + async fn run_random_access( + datasets: &[Box], formats: Vec, time_limit: u64, + open_mode: OpenMode, display_format: DisplayFormat, - indices: Vec, output_path: Option, -) -> anyhow::Result<()> { - let progress = ProgressBar::new(formats.len() as u64); +) -> Result<()> { + let reopen_variants: &[bool] = match open_mode { + OpenMode::Cached => &[false], + OpenMode::Reopen => &[true], + OpenMode::Both => &[false, true], + }; + + let total_steps: usize = datasets + .iter() + .map(|d| { + let legacy_extra = if d.name() == "taxi" { formats.len() } else { 0 }; + (formats.len() * ACCESS_PATTERNS.len() + legacy_extra) * reopen_variants.len() + }) + .sum(); + let progress = ProgressBar::new(total_steps as u64); let mut targets = Vec::new(); let mut measurements = Vec::new(); - for format in formats { - let accessor = get_accessor(format).await?; - let measurement = - benchmark_random_access(accessor.as_ref(), &indices, time_limit, STORAGE_NVME).await?; + for dataset in datasets { + for format in &formats { + if dataset.name() == "taxi" { + let name = measurement_name(dataset.name(), None, *format); + for &reopen in reopen_variants { + let bench_name = if reopen { + format!("{name}-footer") + } else { + name.clone() + }; + let measurement = benchmark_random_access( + dataset.as_ref(), + *format, + &bench_name, + &FIXED_TAXI_INDICES, + time_limit, + STORAGE_NVME, + reopen, + ) + .await?; + + targets.push(measurement.target); + measurements.push(measurement); + progress.inc(1); + } + } - targets.push(measurement.target); - measurements.push(measurement); + for pattern in &ACCESS_PATTERNS { + let indices = generate_indices(dataset.as_ref(), *pattern); + let name = measurement_name(dataset.name(), Some(*pattern), *format); + for &reopen in reopen_variants { + let bench_name = if reopen { + format!("{name}-footer") + } else { + name.clone() + }; + let measurement = benchmark_random_access( + dataset.as_ref(), + *format, + &bench_name, + &indices, + time_limit, + STORAGE_NVME, + reopen, + ) + .await?; - progress.inc(1); + targets.push(measurement.target); + measurements.push(measurement); + progress.inc(1); + } + } + } } progress.finish(); diff --git a/vortex-bench/src/conversions.rs b/vortex-bench/src/conversions.rs index 761b720865d..c6f512d66e0 100644 --- a/vortex-bench/src/conversions.rs +++ b/vortex-bench/src/conversions.rs @@ -13,6 +13,7 @@ use sysinfo::System; use tokio::fs::File; use tokio::fs::OpenOptions; use tokio::fs::create_dir_all; +use tokio::io::AsyncWriteExt; use tracing::Instrument; use tracing::info; use tracing::trace; @@ -201,3 +202,24 @@ pub async fn convert_parquet_directory_to_vortex( Ok(()) } + +/// Convert a Parquet file to Vortex format with the specified compaction strategy. +/// +/// Uses `idempotent_async` to skip conversion if the output file already exists. +pub async fn write_parquet_as_vortex( + parquet_path: PathBuf, + vortex_path: &str, + compaction: CompactionStrategy, +) -> anyhow::Result { + idempotent_async(vortex_path, |output_fname| async move { + let mut output_file = File::create(&output_fname).await?; + let data = parquet_to_vortex_chunks(parquet_path).await?; + let write_options = compaction.apply_options(SESSION.write_options()); + write_options + .write(&mut output_file, data.to_array_stream()) + .await?; + output_file.flush().await?; + Ok(()) + }) + .await +} diff --git a/vortex-bench/src/datasets/feature_vectors.rs b/vortex-bench/src/datasets/feature_vectors.rs new file mode 100644 index 00000000000..4928c0df326 --- /dev/null +++ b/vortex-bench/src/datasets/feature_vectors.rs @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Result; +use arrow_array::Int64Array; +use arrow_array::RecordBatch; +use arrow_array::builder::FixedSizeListBuilder; +use arrow_array::builder::Float32Builder; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use async_trait::async_trait; +use parquet::arrow::ArrowWriter; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; + +use crate::CompactionStrategy; +use crate::Format; +use crate::conversions::write_parquet_as_vortex; +use crate::idempotent_async; +use crate::random_access::BenchDataset; +use crate::random_access::data_path; + +/// Dataset identifier used for data path generation. +pub const DATASET: &str = "feature_vectors"; + +/// Number of rows in the feature vectors dataset. +pub const ROW_COUNT: usize = 1_000_000; + +pub struct FeatureVectorsData; + +#[async_trait] +impl BenchDataset for FeatureVectorsData { + fn name(&self) -> &str { + "feature-vectors" + } + + fn row_count(&self) -> u64 { + ROW_COUNT as u64 + } + + async fn path(&self, format: Format) -> Result { + match format { + Format::OnDiskVortex => feature_vectors_vortex().await, + Format::VortexCompact => feature_vectors_vortex_compact().await, + Format::Parquet => feature_vectors_parquet().await, + other => unimplemented!("Random access bench not implemented for {other}"), + } + } +} + +/// Dimensionality of each feature vector. +const VECTOR_DIM: i32 = 1024; + +/// Batch size for data generation. +const BATCH_SIZE: usize = 100_000; + +/// Generate a synthetic feature vectors parquet file. +/// +/// Schema: `id: Int64, embedding: FixedSizeList`. +/// This simulates a table of embedding vectors, common in ML workloads. +pub async fn feature_vectors_parquet() -> Result { + idempotent_async( + data_path(DATASET, Format::Parquet), + |temp_path| async move { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "embedding", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + VECTOR_DIM, + ), + false, + ), + ])); + + let file = File::create(&temp_path)?; + let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?; + let mut rng = StdRng::seed_from_u64(42); + + for batch_start in (0..ROW_COUNT).step_by(BATCH_SIZE) { + let batch_len = BATCH_SIZE.min(ROW_COUNT - batch_start); + + let ids = Int64Array::from_iter_values( + (batch_start as i64)..((batch_start + batch_len) as i64), + ); + + let mut list_builder = FixedSizeListBuilder::new(Float32Builder::new(), VECTOR_DIM); + for _ in 0..batch_len { + for _ in 0..VECTOR_DIM { + list_builder.values().append_value(rng.random::()); + } + list_builder.append(true); + } + let embedding = list_builder.finish(); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(embedding)])?; + writer.write(&batch)?; + } + + writer.close()?; + Ok(()) + }, + ) + .await +} + +/// Get the path to the feature vectors vortex file, converting from parquet if needed. +pub async fn feature_vectors_vortex() -> Result { + let parquet_path = feature_vectors_parquet().await?; + let path = data_path(DATASET, Format::OnDiskVortex); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Default).await +} + +/// Get the path to the feature vectors compact vortex file, converting from parquet if needed. +pub async fn feature_vectors_vortex_compact() -> Result { + let parquet_path = feature_vectors_parquet().await?; + let path = data_path(DATASET, Format::VortexCompact); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Compact).await +} diff --git a/vortex-bench/src/datasets/mod.rs b/vortex-bench/src/datasets/mod.rs index 1b773938468..e7f53156500 100644 --- a/vortex-bench/src/datasets/mod.rs +++ b/vortex-bench/src/datasets/mod.rs @@ -12,6 +12,9 @@ use vortex::array::ArrayRef; use crate::clickbench::Flavor; pub mod data_downloads; +pub mod feature_vectors; +pub mod nested_lists; +pub mod nested_structs; pub mod struct_list_of_ints; pub mod taxi_data; pub mod tpch_l_comment; diff --git a/vortex-bench/src/datasets/nested_lists.rs b/vortex-bench/src/datasets/nested_lists.rs new file mode 100644 index 00000000000..5f0fbac80f5 --- /dev/null +++ b/vortex-bench/src/datasets/nested_lists.rs @@ -0,0 +1,124 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Result; +use arrow_array::Int64Array; +use arrow_array::RecordBatch; +use arrow_array::builder::Int64Builder; +use arrow_array::builder::ListBuilder; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use async_trait::async_trait; +use parquet::arrow::ArrowWriter; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; + +use crate::CompactionStrategy; +use crate::Format; +use crate::conversions::write_parquet_as_vortex; +use crate::idempotent_async; +use crate::random_access::BenchDataset; +use crate::random_access::data_path; + +/// Dataset identifier used for data path generation. +pub const DATASET: &str = "nested_lists"; + +/// Number of rows in the nested lists dataset. +pub const ROW_COUNT: usize = 1_000_000; + +pub struct NestedListsData; + +#[async_trait] +impl BenchDataset for NestedListsData { + fn name(&self) -> &str { + "nested-lists" + } + + fn row_count(&self) -> u64 { + ROW_COUNT as u64 + } + + async fn path(&self, format: Format) -> Result { + match format { + Format::OnDiskVortex => nested_lists_vortex().await, + Format::VortexCompact => nested_lists_vortex_compact().await, + Format::Parquet => nested_lists_parquet().await, + other => unimplemented!("Random access bench not implemented for {other}"), + } + } +} + +/// Maximum number of elements in each list. +const MAX_LIST_LEN: usize = 20; + +/// Batch size for data generation. +const BATCH_SIZE: usize = 100_000; + +/// Generate a synthetic nested lists parquet file. +/// +/// Schema: `id: Int64, values: List`. +/// Each row contains a variable-length list of 1 to 20 random integers. +pub async fn nested_lists_parquet() -> Result { + idempotent_async( + data_path(DATASET, Format::Parquet), + |temp_path| async move { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new( + "values", + DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + false, + ), + ])); + + let file = std::fs::File::create(&temp_path)?; + let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?; + let mut rng = StdRng::seed_from_u64(42); + + for batch_start in (0..ROW_COUNT).step_by(BATCH_SIZE) { + let batch_len = BATCH_SIZE.min(ROW_COUNT - batch_start); + + let ids = Int64Array::from_iter_values( + (batch_start as i64)..((batch_start + batch_len) as i64), + ); + + let mut list_builder = ListBuilder::new(Int64Builder::new()); + for _ in 0..batch_len { + let list_len = rng.random_range(1..=MAX_LIST_LEN); + for _ in 0..list_len { + list_builder.values().append_value(rng.random::()); + } + list_builder.append(true); + } + let values = list_builder.finish(); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(values)])?; + writer.write(&batch)?; + } + + writer.close()?; + Ok(()) + }, + ) + .await +} + +/// Get the path to the nested lists vortex file, converting from parquet if needed. +pub async fn nested_lists_vortex() -> Result { + let parquet_path = nested_lists_parquet().await?; + let path = data_path(DATASET, Format::OnDiskVortex); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Default).await +} + +/// Get the path to the nested lists compact vortex file, converting from parquet if needed. +pub async fn nested_lists_vortex_compact() -> Result { + let parquet_path = nested_lists_parquet().await?; + let path = data_path(DATASET, Format::VortexCompact); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Compact).await +} diff --git a/vortex-bench/src/datasets/nested_structs.rs b/vortex-bench/src/datasets/nested_structs.rs new file mode 100644 index 00000000000..b3f362ed904 --- /dev/null +++ b/vortex-bench/src/datasets/nested_structs.rs @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Result; +use arrow_array::Float64Array; +use arrow_array::Int64Array; +use arrow_array::RecordBatch; +use arrow_array::StructArray; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Fields; +use arrow_schema::Schema; +use async_trait::async_trait; +use parquet::arrow::ArrowWriter; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; + +use crate::CompactionStrategy; +use crate::Format; +use crate::conversions::write_parquet_as_vortex; +use crate::idempotent_async; +use crate::random_access::BenchDataset; +use crate::random_access::data_path; + +/// Dataset identifier used for data path generation. +pub const DATASET: &str = "nested_structs"; + +/// Number of rows in the nested structs dataset. +pub const ROW_COUNT: usize = 1_000_000; + +pub struct NestedStructsData; + +#[async_trait] +impl BenchDataset for NestedStructsData { + fn name(&self) -> &str { + "nested-structs" + } + + fn row_count(&self) -> u64 { + ROW_COUNT as u64 + } + + async fn path(&self, format: Format) -> Result { + match format { + Format::OnDiskVortex => nested_structs_vortex().await, + Format::VortexCompact => nested_structs_vortex_compact().await, + Format::Parquet => nested_structs_parquet().await, + other => unimplemented!("Random access bench not implemented for {other}"), + } + } +} + +/// Batch size for data generation. +const BATCH_SIZE: usize = 100_000; + +/// Generate a synthetic nested structs parquet file. +/// +/// Schema: +/// ```text +/// id: Int64 +/// metadata: Struct { +/// a: Int64, +/// b: Float64, +/// inner: Struct { +/// x: Float64, +/// y: Float64, +/// z: Float64, +/// } +/// } +/// ``` +pub async fn nested_structs_parquet() -> Result { + idempotent_async( + data_path(DATASET, Format::Parquet), + |temp_path| async move { + let inner_fields = Fields::from(vec![ + Field::new("x", DataType::Float64, false), + Field::new("y", DataType::Float64, false), + Field::new("z", DataType::Float64, false), + ]); + let outer_fields = Fields::from(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Float64, false), + Field::new("inner", DataType::Struct(inner_fields.clone()), false), + ]); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("metadata", DataType::Struct(outer_fields.clone()), false), + ])); + + let file = std::fs::File::create(&temp_path)?; + let mut writer = ArrowWriter::try_new(file, schema.clone(), None)?; + let mut rng = StdRng::seed_from_u64(42); + + for batch_start in (0..ROW_COUNT).step_by(BATCH_SIZE) { + let batch_len = BATCH_SIZE.min(ROW_COUNT - batch_start); + + let ids = Int64Array::from_iter_values( + (batch_start as i64)..((batch_start + batch_len) as i64), + ); + + let inner_x = + Float64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let inner_y = + Float64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let inner_z = + Float64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let inner = StructArray::try_new( + inner_fields.clone(), + vec![Arc::new(inner_x), Arc::new(inner_y), Arc::new(inner_z)], + None, + )?; + + let outer_a = + Int64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let outer_b = + Float64Array::from_iter_values((0..batch_len).map(|_| rng.random::())); + let outer = StructArray::try_new( + outer_fields.clone(), + vec![Arc::new(outer_a), Arc::new(outer_b), Arc::new(inner)], + None, + )?; + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(outer)])?; + writer.write(&batch)?; + } + + writer.close()?; + Ok(()) + }, + ) + .await +} + +/// Get the path to the nested structs vortex file, converting from parquet if needed. +pub async fn nested_structs_vortex() -> Result { + let parquet_path = nested_structs_parquet().await?; + let path = data_path(DATASET, Format::OnDiskVortex); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Default).await +} + +/// Get the path to the nested structs compact vortex file, converting from parquet if needed. +pub async fn nested_structs_vortex_compact() -> Result { + let parquet_path = nested_structs_parquet().await?; + let path = data_path(DATASET, Format::VortexCompact); + write_parquet_as_vortex(parquet_path, &path, CompactionStrategy::Compact).await +} diff --git a/vortex-bench/src/datasets/taxi_data.rs b/vortex-bench/src/datasets/taxi_data.rs index 9367d9746aa..133f7c55586 100644 --- a/vortex-bench/src/datasets/taxi_data.rs +++ b/vortex-bench/src/datasets/taxi_data.rs @@ -13,12 +13,20 @@ use vortex::file::OpenOptionsSessionExt; use vortex::file::WriteOptionsSessionExt; use crate::CompactionStrategy; +use crate::Format; use crate::IdempotentPath; use crate::SESSION; use crate::conversions::parquet_to_vortex_chunks; use crate::datasets::Dataset; use crate::datasets::data_downloads::download_data; use crate::idempotent_async; +use crate::random_access::BenchDataset; + +/// Dataset identifier used for data path generation. +pub const DATASET: &str = "taxi"; + +/// Total number of rows in the taxi dataset. +pub const ROW_COUNT: u64 = 3_339_715; pub struct TaxiData; @@ -37,6 +45,26 @@ impl Dataset for TaxiData { } } +#[async_trait] +impl BenchDataset for TaxiData { + fn name(&self) -> &str { + "taxi" + } + + fn row_count(&self) -> u64 { + ROW_COUNT + } + + async fn path(&self, format: Format) -> Result { + match format { + Format::OnDiskVortex => taxi_data_vortex().await, + Format::VortexCompact => taxi_data_vortex_compact().await, + Format::Parquet => taxi_data_parquet().await, + other => unimplemented!("Random access bench not implemented for {other}"), + } + } +} + pub async fn taxi_data_parquet() -> Result { let taxi_parquet_fpath = "taxi/taxi.parquet".to_data_path(); let taxi_data_url = diff --git a/vortex-bench/src/random_access/mod.rs b/vortex-bench/src/random_access/mod.rs index 2193b76e673..5421ebb01b5 100644 --- a/vortex-bench/src/random_access/mod.rs +++ b/vortex-bench/src/random_access/mod.rs @@ -14,21 +14,45 @@ pub mod take; pub use take::ParquetRandomAccessor; pub use take::VortexRandomAccessor; +/// Generate the data path for a random-access benchmark dataset file. +/// +/// Returns a path like `random_access/{dataset}/{dataset}.{ext}` +/// (or `{dataset}-compact.{ext}` for [`Format::VortexCompact`]). +pub fn data_path(dataset: &str, format: Format) -> String { + let ext = format.ext(); + match format { + Format::VortexCompact => format!("random_access/{dataset}/{dataset}-compact.{ext}"), + _ => format!("random_access/{dataset}/{dataset}.{ext}"), + } +} + +/// Trait for a benchmark dataset that knows how to prepare data files. +#[async_trait] +pub trait BenchDataset: Send + Sync { + /// A descriptive name for this dataset (used in benchmark output and CLI). + fn name(&self) -> &str; + + /// The total number of rows in this dataset. + fn row_count(&self) -> u64; + + /// Prepare the data file for the given format and return its path. + /// + /// This writes the file if it doesn't already exist. + async fn path(&self, format: Format) -> Result; +} + /// Trait for format-specific random access (take) operations. /// /// Implementations handle reading specific rows by index from a data source. -/// Each implementation wraps a prepared file path and knows how to read from it. +/// Accessors are constructed in a ready-to-use state with metadata already parsed. #[async_trait] pub trait RandomAccessor: Send + Sync { - /// The format this accessor handles. - fn format(&self) -> Format; - /// A descriptive name for this accessor (used in benchmark output). fn name(&self) -> &str; - /// The file path this accessor reads from. - fn path(&self) -> &PathBuf; + /// The format this accessor handles. + fn format(&self) -> Format; /// Take rows at the given indices, returning the number of rows read. - async fn take(&self, indices: Vec) -> Result; + async fn take(&self, indices: &[u64]) -> Result; } diff --git a/vortex-bench/src/random_access/take.rs b/vortex-bench/src/random_access/take.rs index 1f251c2d8db..df48cd075c0 100644 --- a/vortex-bench/src/random_access/take.rs +++ b/vortex-bench/src/random_access/take.rs @@ -1,12 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::iter; -use std::path::Path; +use std::iter::once; use std::path::PathBuf; use arrow_array::PrimitiveArray; -use arrow_array::RecordBatch; use arrow_array::types::Int64Type; use arrow_select::concat::concat_batches; use arrow_select::take::take_record_batch; @@ -14,17 +12,18 @@ use async_trait::async_trait; use futures::stream; use itertools::Itertools; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; use parquet::arrow::arrow_reader::ArrowReaderOptions; -use parquet::file::metadata::RowGroupMetaData; use stream::StreamExt; +use tokio::fs::File; use vortex::array::Array; -use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::IntoArray; use vortex::array::VortexSessionExecute; use vortex::array::stream::ArrayStreamExt; use vortex::buffer::Buffer; use vortex::file::OpenOptionsSessionExt; +use vortex::file::VortexFile; use vortex::utils::aliases::hash_map::HashMap; use crate::Format; @@ -32,38 +31,27 @@ use crate::SESSION; use crate::random_access::RandomAccessor; /// Random accessor for Vortex format files. +/// +/// The file handle is opened at construction time and reused across `take()` calls. pub struct VortexRandomAccessor { - path: PathBuf, name: String, format: Format, + file: VortexFile, } impl VortexRandomAccessor { - /// Create a new Vortex random accessor for the given file path. - pub fn new(path: PathBuf) -> Self { - Self { - path, - name: "random-access/vortex-tokio-local-disk".to_string(), - format: Format::OnDiskVortex, - } - } - - /// Create a new Vortex random accessor with a custom name. - pub fn with_name(path: PathBuf, name: impl Into) -> Self { - Self { - path, + /// Open a Vortex file and return a ready-to-use accessor. + pub async fn open( + path: impl AsRef, + name: impl Into, + format: Format, + ) -> anyhow::Result { + let file = SESSION.open_options().open_path(path.as_ref()).await?; + Ok(Self { name: name.into(), - format: Format::OnDiskVortex, - } - } - - /// Create a new Vortex random accessor for compact format. - pub fn compact(path: PathBuf) -> Self { - Self { - path, - name: "random-access/vortex-compact-tokio-local-disk".to_string(), - format: Format::VortexCompact, - } + format, + file, + }) } } @@ -77,37 +65,64 @@ impl RandomAccessor for VortexRandomAccessor { &self.name } - fn path(&self) -> &PathBuf { - &self.path - } - - async fn take(&self, indices: Vec) -> anyhow::Result { - let result = take_vortex(&self.path, indices.into()).await?; - Ok(result.len()) + async fn take(&self, indices: &[u64]) -> anyhow::Result { + let indices_buf: Buffer = Buffer::from(indices.to_vec()); + let array = self + .file + .scan()? + .with_row_indices(indices_buf) + .into_array_stream()? + .read_all() + .await?; + + // We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es. + let mut ctx = SESSION.create_execution_ctx(); + let canonical = array.execute::(&mut ctx)?.into_array(); + Ok(canonical.len()) } } /// Random accessor for Parquet format files. +/// +/// Parquet footer and row group offsets are parsed at construction time and +/// reused to map indices to row groups in each `take()` call. pub struct ParquetRandomAccessor { - path: PathBuf, name: String, + /// Cumulative row offsets per row group (length = num_row_groups + 1). + row_group_offsets: Vec, + /// Cached Arrow reader metadata (footer) to avoid re-parsing on each take. + arrow_metadata: ArrowReaderMetadata, + /// Path to the Parquet file (for re-opening on each take). + path: PathBuf, } impl ParquetRandomAccessor { - /// Create a new Parquet random accessor for the given file path. - pub fn new(path: PathBuf) -> Self { - Self { - path, - name: "random-access/parquet-tokio-local-disk".to_string(), - } - } - - /// Create a new Parquet random accessor with a custom name. - pub fn with_name(path: PathBuf, name: impl Into) -> Self { - Self { - path, + /// Open a Parquet file, parse the footer, and return a ready-to-use accessor. + pub async fn open(path: PathBuf, name: impl Into) -> anyhow::Result { + let mut file = File::open(&path).await?; + let options = ArrowReaderOptions::new().with_page_index(true); + let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, options).await?; + + let row_group_offsets = once(0) + .chain( + arrow_metadata + .metadata() + .row_groups() + .iter() + .map(|rg| rg.num_rows()), + ) + .scan(0i64, |acc, x| { + *acc += x; + Some(*acc) + }) + .collect::>(); + + Ok(Self { name: name.into(), - } + row_group_offsets, + arrow_metadata, + path, + }) } } @@ -121,90 +136,50 @@ impl RandomAccessor for ParquetRandomAccessor { &self.name } - fn path(&self) -> &PathBuf { - &self.path - } + async fn take(&self, indices: &[u64]) -> anyhow::Result { + // Map indices to row groups. + let mut row_groups = HashMap::new(); + for &idx in indices { + let row_group_idx = self + .row_group_offsets + .binary_search(&(idx as i64)) + .unwrap_or_else(|e| e - 1); + row_groups + .entry(row_group_idx) + .or_insert_with(Vec::new) + .push((idx as i64) - self.row_group_offsets[row_group_idx]); + } - async fn take(&self, indices: Vec) -> anyhow::Result { - let result = take_parquet(&self.path, indices).await?; + let sorted_row_group_keys = row_groups.keys().copied().sorted().collect_vec(); + let row_group_indices = sorted_row_group_keys + .iter() + .map(|i| row_groups[i].clone()) + .collect_vec(); + + // Re-open the file but reuse cached metadata (avoids re-parsing the footer). + let file = File::open(&self.path).await?; + let builder = + ParquetRecordBatchStreamBuilder::new_with_metadata(file, self.arrow_metadata.clone()); + + let reader = builder + .with_row_groups(sorted_row_group_keys) + // FIXME(ngates): our indices code assumes the batch size == the row group sizes + .with_batch_size(10_000_000) + .build()?; + + let schema = reader.schema().clone(); + + let batches = reader + .enumerate() + .map(|(idx, batch)| { + let batch = batch.unwrap(); + let indices = PrimitiveArray::::from(row_group_indices[idx].clone()); + take_record_batch(&batch, &indices).unwrap() + }) + .collect::>() + .await; + + let result = concat_batches(&schema, &batches)?; Ok(result.num_rows()) } } - -async fn take_vortex(reader: impl AsRef, indices: Buffer) -> anyhow::Result { - let array = SESSION - .open_options() - .open_path(reader.as_ref()) - .await? - .scan()? - .with_row_indices(indices) - .into_array_stream()? - .read_all() - .await?; - - // We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es. - let mut ctx = SESSION.create_execution_ctx(); - // TODO(joe): should we go to a vector. - Ok(array.execute::(&mut ctx)?.into_array()) -} - -pub async fn take_parquet(path: &Path, indices: Vec) -> anyhow::Result { - let file = tokio::fs::File::open(path).await?; - - let builder = ParquetRecordBatchStreamBuilder::new_with_options( - file, - ArrowReaderOptions::new().with_page_index(true), - ) - .await?; - - // We figure out which row groups we need to read and a selection filter for each of them. - let mut row_groups = HashMap::new(); - let row_group_offsets = iter::once(0) - .chain( - builder - .metadata() - .row_groups() - .iter() - .map(RowGroupMetaData::num_rows), - ) - .scan(0i64, |acc, x| { - *acc += x; - Some(*acc) - }) - .collect::>(); - - for idx in indices { - let row_group_idx = row_group_offsets - .binary_search(&(idx as i64)) - .unwrap_or_else(|e| e - 1); - row_groups - .entry(row_group_idx) - .or_insert_with(Vec::new) - .push((idx as i64) - row_group_offsets[row_group_idx]); - } - let row_group_indices = row_groups - .keys() - .sorted() - .map(|i| row_groups[i].clone()) - .collect_vec(); - - let reader = builder - .with_row_groups(row_groups.keys().copied().collect_vec()) - // FIXME(ngates): our indices code assumes the batch size == the row group sizes - .with_batch_size(10_000_000) - .build()?; - - let schema = reader.schema().clone(); - - let batches = reader - .enumerate() - .map(|(idx, batch)| { - let batch = batch.unwrap(); - let indices = PrimitiveArray::::from(row_group_indices[idx].clone()); - take_record_batch(&batch, &indices).unwrap() - }) - .collect::>() - .await; - - Ok(concat_batches(&schema, &batches)?) -}