Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 160 additions & 18 deletions benchmarks/random-access-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,39 @@ use std::time::Duration;
use std::time::Instant;

use clap::Parser;
use clap::ValueEnum;
use indicatif::ProgressBar;
use vortex_bench::BenchmarkOutput;
use vortex_bench::Engine;
use vortex_bench::Format;
use vortex_bench::Target;
use vortex_bench::create_output_writer;
use vortex_bench::datasets::gharchive::gharchive_parquet;
use vortex_bench::datasets::gharchive::gharchive_vortex;
use vortex_bench::datasets::taxi_data::*;
use vortex_bench::display::DisplayFormat;
use vortex_bench::display::print_measurements_json;
use vortex_bench::display::render_table;
use vortex_bench::measurements::TimingMeasurement;
use vortex_bench::random_access::FieldPath;
use vortex_bench::random_access::ParquetProjectingAccessor;
use vortex_bench::random_access::ParquetRandomAccessor;
use vortex_bench::random_access::ProjectingRandomAccessor;
use vortex_bench::random_access::RandomAccessor;
use vortex_bench::random_access::VortexProjectingAccessor;
use vortex_bench::random_access::VortexRandomAccessor;
use vortex_bench::setup_logging_and_tracing;
use vortex_bench::utils::constants::STORAGE_NVME;

/// Available datasets for random access benchmarks.
#[derive(Clone, Copy, Debug, Default, ValueEnum)]
enum Dataset {
/// NYC Taxi trip data - flat schema with many columns.
#[default]
Taxi,
/// GitHub Archive event data - deeply nested schema with struct fields.
GhArchive,
}

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
Expand All @@ -32,6 +49,9 @@ struct Args {
default_values_t = vec![Format::Parquet, Format::OnDiskVortex]
)]
formats: Vec<Format>,
/// Dataset to benchmark.
#[arg(long, value_enum, default_value_t = Dataset::Taxi)]
dataset: Dataset,
/// Time limit in seconds for each benchmark target (e.g., 10 for 10 seconds).
#[arg(long, default_value_t = 10)]
time_limit: u64,
Expand All @@ -51,21 +71,40 @@ async fn main() -> anyhow::Result<()> {

setup_logging_and_tracing(args.verbose, args.tracing)?;

// Row count of the dataset is 3,339,715.
let indices = vec![10u64, 11, 12, 13, 100_000, 3_000_000];

run_random_access(
args.formats,
args.time_limit,
args.display_format,
indices,
args.output_path,
)
.await
match args.dataset {
Dataset::Taxi => {
// Row count of the taxi dataset is 3,339,715.
let indices = vec![10u64, 11, 12, 13, 100_000, 3_000_000];
run_taxi_random_access(
args.formats,
args.time_limit,
args.display_format,
indices,
args.output_path,
)
.await
}
Dataset::GhArchive => {
// Run gharchive benchmark with nested field projection.
// The field path is payload.ref - a deeply nested string field.
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says the field path is payload.ref, but the code actually benchmarks actor.login. Please update the comment (or change the field path) so benchmark output/intent is clear and doesn’t drift from the implementation.

Suggested change
// The field path is payload.ref - a deeply nested string field.
// The field path is actor.login - a deeply nested string field.

Copilot uses AI. Check for mistakes.
let field_path = vec!["actor".to_string(), "login".to_string()];
// Use smaller indices as gharchive may have fewer rows per row group.
let indices = vec![10u64, 11, 12, 13, 1_000, 10_000];
run_gharchive_random_access(
args.formats,
args.time_limit,
args.display_format,
indices,
field_path,
args.output_path,
)
.await
}
}
}

/// Create a random accessor for the given format using taxi data.
async fn get_accessor(format: Format) -> anyhow::Result<Box<dyn RandomAccessor>> {
async fn get_taxi_accessor(format: Format) -> anyhow::Result<Box<dyn RandomAccessor>> {
match format {
Format::OnDiskVortex => {
let path = taxi_data_vortex().await?;
Expand All @@ -91,6 +130,28 @@ async fn get_accessor(format: Format) -> anyhow::Result<Box<dyn RandomAccessor>>
}
}

/// Create a projecting random accessor for the given format using gharchive data.
async fn get_gharchive_accessor(
format: Format,
) -> anyhow::Result<Box<dyn ProjectingRandomAccessor>> {
match format {
Format::OnDiskVortex => {
let path = gharchive_vortex().await?;
Ok(Box::new(VortexProjectingAccessor::new(path)))
}
Format::VortexCompact => {
// For now, use the same path as OnDiskVortex (compact not yet implemented for gharchive)
let path = gharchive_vortex().await?;
Ok(Box::new(VortexProjectingAccessor::compact(path)))
Comment on lines +143 to +145
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format::VortexCompact is currently using gharchive_vortex() (non-compact) but labeling the accessor/target as compact via VortexProjectingAccessor::compact(...). This will misreport results and can confuse comparisons. Either use gharchive_vortex_compact() here, or return unimplemented!/error until a compact dataset path is available.

Suggested change
// For now, use the same path as OnDiskVortex (compact not yet implemented for gharchive)
let path = gharchive_vortex().await?;
Ok(Box::new(VortexProjectingAccessor::compact(path)))
unimplemented!("Compact gharchive dataset path is not yet implemented");

Copilot uses AI. Check for mistakes.
}
Format::Parquet => {
let path = gharchive_parquet().await?;
Ok(Box::new(ParquetProjectingAccessor::new(path)))
}
_ => unimplemented!("Projected random access bench not implemented for {format}"),
}
}

/// Run a random access benchmark for the given accessor.
///
/// Runs the take operation repeatedly until the time limit is reached,
Expand Down Expand Up @@ -125,6 +186,41 @@ async fn benchmark_random_access(
})
}

/// Run a projected random access benchmark for the given accessor.
///
/// Runs the take operation with field projection repeatedly until the time limit is reached,
/// collecting timing for each run.
async fn benchmark_projected_random_access(
accessor: &dyn ProjectingRandomAccessor,
indices: &[u64],
field_path: &FieldPath,
time_limit_secs: u64,
storage: &str,
) -> anyhow::Result<TimingMeasurement> {
let time_limit = Duration::from_secs(time_limit_secs);
let overall_start = Instant::now();
let mut runs = Vec::new();

// Run at least once, then continue until time limit
loop {
let indices = indices.to_vec();
let start = Instant::now();
let _row_count = accessor.take_projected(indices, field_path).await?;
runs.push(start.elapsed());

if overall_start.elapsed() >= time_limit {
break;
}
}

Ok(TimingMeasurement {
name: accessor.name().to_string(),
storage: storage.to_string(),
target: Target::new(format_to_engine(accessor.format()), accessor.format()),
runs,
})
}

/// Map format to the appropriate engine for random access benchmarks.
fn format_to_engine(format: Format) -> Engine {
match format {
Expand All @@ -137,9 +233,10 @@ fn format_to_engine(format: Format) -> Engine {
}

/// The benchmark ID used for output path.
const BENCHMARK_ID: &str = "random-access";
const TAXI_BENCHMARK_ID: &str = "random-access-taxi";
const GHARCHIVE_BENCHMARK_ID: &str = "random-access-gharchive";

async fn run_random_access(
async fn run_taxi_random_access(
formats: Vec<Format>,
time_limit: u64,
display_format: DisplayFormat,
Expand All @@ -152,7 +249,7 @@ async fn run_random_access(
let mut measurements = Vec::new();

for format in formats {
let accessor = get_accessor(format).await?;
let accessor = get_taxi_accessor(format).await?;
let measurement =
benchmark_random_access(accessor.as_ref(), &indices, time_limit, STORAGE_NVME).await?;

Expand All @@ -164,8 +261,53 @@ async fn run_random_access(

progress.finish();

let output = BenchmarkOutput::with_path(BENCHMARK_ID, output_path);
let mut writer = output.create_writer()?;
let mut writer = create_output_writer(&display_format, output_path, TAXI_BENCHMARK_ID)?;

match display_format {
DisplayFormat::Table => {
render_table(&mut writer, measurements, &targets)?;
}
DisplayFormat::GhJson => {
print_measurements_json(&mut writer, measurements)?;
}
}

Ok(())
}

async fn run_gharchive_random_access(
formats: Vec<Format>,
time_limit: u64,
display_format: DisplayFormat,
indices: Vec<u64>,
field_path: FieldPath,
output_path: Option<PathBuf>,
) -> anyhow::Result<()> {
let progress = ProgressBar::new(formats.len() as u64);

let mut targets = Vec::new();
let mut measurements = Vec::new();

for format in formats {
let accessor = get_gharchive_accessor(format).await?;
let measurement = benchmark_projected_random_access(
accessor.as_ref(),
&indices,
&field_path,
time_limit,
STORAGE_NVME,
)
.await?;

targets.push(measurement.target);
measurements.push(measurement);

progress.inc(1);
}

progress.finish();

let mut writer = create_output_writer(&display_format, output_path, GHARCHIVE_BENCHMARK_ID)?;

match display_format {
DisplayFormat::Table => {
Expand Down
Loading
Loading