Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
560 changes: 550 additions & 10 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ members = [
"benchmarks/datafusion-bench",
"benchmarks/duckdb-bench",
"benchmarks/random-access-bench",
"vortex-sqllogictest",
]
exclude = ["java/testfiles", "wasm-test"]
resolver = "2"
Expand Down Expand Up @@ -134,6 +135,7 @@ datafusion-physical-expr-adapter = { version = "52" }
datafusion-physical-expr-common = { version = "52" }
datafusion-physical-plan = { version = "52" }
datafusion-pruning = { version = "52" }
datafusion-sqllogictest = { version = "52" }
dirs = "6.0.0"
divan = { package = "codspeed-divan-compat", version = "4.0.4" }
enum-iterator = "2.0.0"
Expand Down
10 changes: 8 additions & 2 deletions vortex-duckdb/src/duckdb/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,14 @@ mod tests {
.query("SELECT 1 as int_col, 'text' as str_col")
.unwrap();

assert_eq!(result.column_type(0), cpp::DUCKDB_TYPE::DUCKDB_TYPE_INTEGER);
assert_eq!(result.column_type(1), cpp::DUCKDB_TYPE::DUCKDB_TYPE_VARCHAR);
assert_eq!(
result.column_type(0).as_type_id(),
cpp::DUCKDB_TYPE::DUCKDB_TYPE_INTEGER
);
assert_eq!(
result.column_type(1).as_type_id(),
cpp::DUCKDB_TYPE::DUCKDB_TYPE_VARCHAR
);
}

#[test]
Expand Down
16 changes: 12 additions & 4 deletions vortex-duckdb/src/duckdb/logical_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ impl LogicalType {
Self::new(DUCKDB_TYPE::DUCKDB_TYPE_BLOB)
}

pub fn int64() -> Self {
Self::new(DUCKDB_TYPE::DUCKDB_TYPE_BIGINT)
}

pub fn uint64() -> Self {
Self::new(DUCKDB_TYPE::DUCKDB_TYPE_UBIGINT)
}
Expand All @@ -146,10 +142,22 @@ impl LogicalType {
Self::new(DUCKDB_TYPE::DUCKDB_TYPE_INTEGER)
}

pub fn int64() -> Self {
Self::new(DUCKDB_TYPE::DUCKDB_TYPE_BIGINT)
}

pub fn bool() -> Self {
Self::new(DUCKDB_TYPE::DUCKDB_TYPE_BOOLEAN)
}

pub fn float32() -> Self {
Self::new(DUCKDB_TYPE::DUCKDB_TYPE_FLOAT)
}

pub fn float64() -> Self {
Self::new(DUCKDB_TYPE::DUCKDB_TYPE_DOUBLE)
}

pub fn as_decimal(&self) -> (u8, u8) {
unsafe {
(
Expand Down
6 changes: 4 additions & 2 deletions vortex-duckdb/src/duckdb/query_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex::error::VortexResult;
use vortex::error::vortex_bail;
use vortex::error::vortex_err;

use crate::LogicalType;
use crate::cpp;
use crate::duckdb::DataChunk;
use crate::wrapper;
Expand Down Expand Up @@ -67,8 +68,9 @@ impl QueryResult {
}

/// Get the type of a column by index.
pub fn column_type(&self, col_idx: usize) -> cpp::DUCKDB_TYPE {
unsafe { cpp::duckdb_column_type(self.as_ptr(), col_idx as u64) }
pub fn column_type(&self, col_idx: usize) -> LogicalType {
let dtype = unsafe { cpp::duckdb_column_type(self.as_ptr(), col_idx as u64) };
LogicalType::new(dtype)
}
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-duckdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use crate::duckdb::LogicalType;
pub use crate::duckdb::Value;
use crate::scan::VortexTableFunction;

mod convert;
pub mod convert;
pub mod duckdb;
pub mod exporter;
mod scan;
Expand Down
37 changes: 37 additions & 0 deletions vortex-sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[package]
name = "vortex-sqllogictest"
authors = { workspace = true }
description = "Test runner for SQL integrations"
edition = { workspace = true }
homepage = { workspace = true }
include = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
publish = false
readme = "README.md"
repository = { workspace = true }
rust-version = { workspace = true }
version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] }
datafusion = { workspace = true }
datafusion-sqllogictest = { workspace = true }
futures.workspace = true
indicatif.workspace = true
sqllogictest = "0.28"
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
vortex = { workspace = true, features = ["tokio"] }
vortex-datafusion = { workspace = true }
vortex-duckdb = { workspace = true }

[lints]
workspace = true

[[test]]
harness = false
name = "sqllogictests"
path = "bin/sqllogictests-runner.rs"
79 changes: 79 additions & 0 deletions vortex-sqllogictest/bin/sqllogictests-runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use clap::Parser;
use datafusion::common::GetExt;
use datafusion::{
datasource::provider::DefaultTableFactory, execution::SessionStateBuilder,
prelude::SessionContext,
};
use datafusion_sqllogictest::DataFusion;
use futures::{StreamExt, TryStreamExt};
use indicatif::ProgressBar;
use sqllogictest::parse_file;
use vortex_datafusion::VortexFormatFactory;
use vortex_sqllogictest::{args::Args, utils::list_files};

fn main() -> anyhow::Result<()> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(run_all())?;
Ok(())
}

async fn run_all() -> anyhow::Result<()> {
let args = Args::parse();
println!("Hello, world!");
println!("Args: {args:?}");

if args.list {
eprintln!("Ignoring `--list` which is unsupported by `sqlogictests-runner`");

return Ok(());
}

let pb = ProgressBar::new_spinner();

let all_errors = futures::stream::iter(list_files("../slt")?)
.map(|path| {
let pb = pb.clone();

async move {
let mut errors = vec![];
let factory = Arc::new(VortexFormatFactory::new());
let session_state_builder = SessionStateBuilder::new()
.with_default_features()
.with_table_factory(
factory.get_ext().to_uppercase(),
Arc::new(DefaultTableFactory::new()),
)
.with_file_formats(vec![factory]);

let session = SessionContext::new_with_state(session_state_builder.build())
.enable_url_table();

let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(session.clone(), path.clone(), pb.clone()))
});

for record in parse_file(path.canonicalize()?)? {
if let Err(e) = runner.run_async(record).await {
errors.push(e.to_string());
}
}

anyhow::Ok(errors)
}
})
.buffer_unordered(args.test_threads)
.flat_map(|errs| {
let errs = errs?;
Ok(errs)
})
.try_collect::<Vec<Vec<String>>>();

Ok(())
}
5 changes: 5 additions & 0 deletions vortex-sqllogictest/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() {
// Propagate DuckDB rpath from vortex-duckdb
let duckdb_lib = std::env::var("DEP_DUCKDB_LIB_DIR").unwrap();
println!("cargo:rustc-link-arg=-Wl,-rpath,{duckdb_lib}");
}
2 changes: 2 additions & 0 deletions vortex-sqllogictest/slt/create.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
statement ok
CREATE TABLE foo AS VALUES(1,2),(2,3);
64 changes: 64 additions & 0 deletions vortex-sqllogictest/src/args.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use clap::Parser;

#[derive(clap::ValueEnum, Clone, Copy, Debug)]
pub enum Engine {
#[clap(name = "datafusion")]
DataFusion,
#[clap(name = "duckdb")]
DuckDB,
}

#[derive(Parser, Debug)]
pub struct Args {
#[arg(short, long, value_enum, value_delimiter = ',')]
pub engine: Option<Vec<Engine>>,
#[arg(action)]
pub filter: Option<String>,

#[clap(
long,
help = "IGNORED (for compatibility with built in rust test runner)"
)]
pub format: Option<String>,

#[clap(
short = 'Z',
long,
help = "IGNORED (for compatibility with built in rust test runner)"
)]
pub z_options: Option<String>,

#[clap(
long,
help = "IGNORED (for compatibility with built in rust test runner)"
)]
pub show_output: bool,

#[clap(
long,
help = "Quits immediately, not listing anything (for compatibility with built-in rust test runner)"
)]
pub list: bool,

#[clap(
long,
help = "IGNORED (for compatibility with built-in rust test runner)"
)]
pub ignored: bool,

#[clap(
long,
help = "IGNORED (for compatibility with built-in rust test runner)"
)]
pub nocapture: bool,

#[clap(
long,
help = "Number of threads used for running tests in parallel",
default_value_t = 16
)]
pub test_threads: usize,
}
97 changes: 97 additions & 0 deletions vortex-sqllogictest/src/duckdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;
use std::{process::Command, time::Duration};

use async_trait::async_trait;
use datafusion_sqllogictest::DFColumnType;
use sqllogictest::{DBOutput, runner::AsyncDB};

use vortex_duckdb::LogicalType;
use vortex_duckdb::duckdb::Connection;
use vortex_duckdb::duckdb::Database;
use vortex_duckdb::duckdb::{Config, DuckDBType};

use crate::error::TestError;

struct Inner {
conn: Connection,
db: Database,
}

unsafe impl Send for Inner {}
unsafe impl Sync for Inner {}

struct DuckDB {
inner: Arc<Inner>,
}

impl DuckDB {
fn normalize_column_type(dtype: LogicalType) -> DFColumnType {
let type_id = dtype.as_type_id();
if type_id == LogicalType::int32().as_type_id()
|| type_id == LogicalType::int64().as_type_id()
|| type_id == LogicalType::uint64().as_type_id()
{
DFColumnType::Integer
} else if type_id == LogicalType::varchar().as_type_id() {
DFColumnType::Text
} else if type_id == LogicalType::bool().as_type_id() {
DFColumnType::Boolean
} else if type_id == LogicalType::float32().as_type_id()
|| type_id == LogicalType::float64().as_type_id()
{
DFColumnType::Float
} else {
DFColumnType::Another
}
}
}

#[async_trait]
impl AsyncDB for DuckDB {
type Error = TestError;
type ColumnType = DFColumnType;

async fn run(&mut self, sql: &str) -> Result<DBOutput<Self::ColumnType>, Self::Error> {
let r = self
.inner
.conn
.query(sql)
.map_err(|e| TestError::Other(e.to_string()))?;

if r.column_count() == 0 && r.row_count() == 0 {
Ok(DBOutput::StatementComplete(0))
} else {
let mut types = Vec::default();
let rows = Vec::default();

for col_idx in 0..r.column_count() as usize {
let dtype = r.column_type(col_idx);
types.push(Self::normalize_column_type(dtype));
}

Ok(DBOutput::Rows { types, rows })
}
}

async fn shutdown(&mut self) {}

fn engine_name(&self) -> &str {
"DuckDB"
}

async fn sleep(dur: Duration) {
tokio::time::sleep(dur).await
}

/// [`Runner`] calls this function to run a system command.
///
/// The default implementation is `std::process::Command::output`, which is universal to any
/// async runtime but would block the current thread. If you are running in tokio runtime, you
/// should override this by `tokio::process::Command::output`.
async fn run_command(command: Command) -> std::io::Result<std::process::Output> {
tokio::process::Command::from(command).output().await
}
}
Loading
Loading