From e5e0dca1d58c31f1eb0918501e4f7f5d0a402c3b Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 29 Sep 2025 16:42:01 -0400 Subject: [PATCH 1/3] test: add sqllogictest runner This change adds a sqllogictest runner + CLI which runs .slt files in the tests/sqllogictest directory. Right now, the runner uses a hardcoded distributed config (ex. `with_network_shuffle_tasks(2)` etc.) but can be extended in the future. Using sqllogictest will make it much easier to write tests (especially for `explain` and `explain (analyze)`). `explain (analyze)` tests will be added in a future commit as it is still being implemented. Also, this change deletes `tests/distributed_aggregation.rs` and moves the test cases to `.slt` files. Documentation: - https://sqlite.org/sqllogictest/doc/trunk/about.wiki - https://github.com/risinglightdb/sqllogictest-rs --- Cargo.lock | 290 ++++++++++++++++++++++++--- Cargo.toml | 8 + src/bin/logictest.rs | 106 ++++++++++ src/test_utils/mod.rs | 1 + src/test_utils/sqllogictest.rs | 154 ++++++++++++++ tests/distributed_aggregation.rs | 155 -------------- tests/sqllogictest/basic_queries.slt | 32 +++ tests/sqllogictest/explain.slt | 24 +++ 8 files changed, 591 insertions(+), 179 deletions(-) create mode 100644 src/bin/logictest.rs create mode 100644 src/test_utils/sqllogictest.rs delete mode 100644 tests/distributed_aggregation.rs create mode 100644 tests/sqllogictest/basic_queries.slt create mode 100644 tests/sqllogictest/explain.slt diff --git a/Cargo.lock b/Cargo.lock index 00881dd..69e4017 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -452,7 +452,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -703,12 +703,52 @@ dependencies = [ "ansi_term", "atty", "bitflags 1.3.2", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width 0.1.14", "vec_map", ] +[[package]] +name = "clap" +version = "4.5.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.1", +] + +[[package]] +name = "clap_derive" +version = "4.5.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.104", +] + +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + [[package]] name = "colorchoice" version = "1.0.4" @@ -883,7 +923,7 @@ dependencies = [ "flate2", "futures", "hex", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -918,7 +958,7 @@ dependencies = [ "datafusion-session", "datafusion-sql", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -1008,7 +1048,7 @@ dependencies = [ "flate2", "futures", "glob", - "itertools", + "itertools 0.14.0", "log", "object_store", "parquet", @@ -1095,7 +1135,7 @@ dependencies = [ "datafusion-session", "futures", "hex", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -1112,20 +1152,24 @@ dependencies = [ "arrow-flight", "async-trait", "chrono", + "clap 4.5.48", "dashmap", "datafusion", "datafusion-proto", "delegate", + "env_logger 0.10.2", "futures", "http", "hyper-util", "insta", - "itertools", + "itertools 0.14.0", "object_store", "parquet", "pin-project", "prost", "rand 0.8.5", + "regex", + "sqllogictest", "structopt", "tokio", "tokio-stream", @@ -1147,7 +1191,7 @@ dependencies = [ "datafusion", "datafusion-distributed", "datafusion-proto", - "env_logger", + "env_logger 0.11.8", "futures", "log", "parquet", @@ -1214,7 +1258,7 @@ dependencies = [ "arrow", "datafusion-common", "indexmap 2.10.0", - "itertools", + "itertools 0.14.0", "paste", ] @@ -1237,7 +1281,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-macros", "hex", - "itertools", + "itertools 0.14.0", "log", "md-5", "rand 0.9.2", @@ -1298,7 +1342,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-macros", "datafusion-physical-expr-common", - "itertools", + "itertools 0.14.0", "log", "paste", ] @@ -1371,7 +1415,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-physical-expr", "indexmap 2.10.0", - "itertools", + "itertools 0.14.0", "log", "recursive", "regex", @@ -1394,7 +1438,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap 2.10.0", - "itertools", + "itertools 0.14.0", "log", "paste", "petgraph", @@ -1411,7 +1455,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "itertools", + "itertools 0.14.0", ] [[package]] @@ -1429,7 +1473,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-pruning", - "itertools", + "itertools 0.14.0", "log", "recursive", ] @@ -1457,7 +1501,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap 2.10.0", - "itertools", + "itertools 0.14.0", "log", "parking_lot", "pin-project-lite", @@ -1505,7 +1549,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools", + "itertools 0.14.0", "log", ] @@ -1526,7 +1570,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-sql", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -1583,6 +1627,18 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "educe" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7bc049e1bd8cdeb31b68bbd586a9464ecf9f3944af3958a7a9d0f8b9799417" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "either" version = "1.15.0" @@ -1595,6 +1651,26 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" +[[package]] +name = "enum-ordinalize" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" +dependencies = [ + "enum-ordinalize-derive", +] + +[[package]] +name = "enum-ordinalize-derive" +version = "4.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -1605,6 +1681,19 @@ dependencies = [ "regex", ] +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "env_logger" version = "0.11.8" @@ -1634,6 +1723,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "escape8259" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5692dd7b5a1978a5aeb0ce83b7655c58ca8efdcb79d21036ea249da95afec2c6" + [[package]] name = "fastrand" version = "2.3.0" @@ -1688,6 +1783,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs-err" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a41f105fe1d5b6b34b2055e3dc59bb79b46b48b2040b9e6c7b4b5de097aa41" +dependencies = [ + "autocfg", +] + [[package]] name = "futures" version = "0.3.31" @@ -1890,6 +1994,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1899,6 +2009,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -2192,12 +2308,32 @@ dependencies = [ "libc", ] +[[package]] +name = "is-terminal" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" +dependencies = [ + "hermit-abi 0.5.2", + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -2345,6 +2481,18 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +[[package]] +name = "libtest-mimic" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc0bda45ed5b3a2904262c1bb91e526127aa70e7ef3758aba2ef93cf896b9b58" +dependencies = [ + "clap 4.5.48", + "escape8259", + "termcolor", + "threadpool", +] + [[package]] name = "libz-rs-sys" version = "0.5.1" @@ -2524,6 +2672,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi 0.5.2", + "libc", +] + [[package]] name = "object" version = "0.36.7" @@ -2545,10 +2703,10 @@ dependencies = [ "futures", "http", "humantime", - "itertools", + "itertools 0.14.0", "parking_lot", "percent-encoding", - "thiserror", + "thiserror 2.0.12", "tokio", "tracing", "url", @@ -2578,6 +2736,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "owo-colors" +version = "4.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c6901729fa79e91a0913333229e9ca5dc725089d1c363b2f4b4760709dc4a52" + [[package]] name = "parking_lot" version = "0.12.4" @@ -2801,7 +2965,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.104", @@ -3152,6 +3316,30 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "sqllogictest" +version = "0.20.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff40a700928567c7303adc6f86cd17bfade5fa8c7dce1256f4691eca66e3ca42" +dependencies = [ + "async-trait", + "educe", + "fs-err", + "futures", + "glob", + "humantime", + "itertools 0.13.0", + "libtest-mimic", + "md-5", + "owo-colors", + "regex", + "similar", + "subst", + "tempfile", + "thiserror 1.0.69", + "tracing", +] + [[package]] name = "sqlparser" version = "0.55.0" @@ -3205,13 +3393,19 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "structopt" version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" dependencies = [ - "clap", + "clap 2.34.0", "lazy_static", "structopt-derive", ] @@ -3222,13 +3416,23 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro-error", "proc-macro2", "quote", "syn 1.0.109", ] +[[package]] +name = "subst" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a9a86e5144f63c2d18334698269a8bfae6eece345c70b64821ea5b35054ec99" +dependencies = [ + "memchr", + "unicode-width 0.1.14", +] + [[package]] name = "subtle" version = "2.6.1" @@ -3287,6 +3491,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -3296,13 +3509,33 @@ dependencies = [ "unicode-width 0.1.14", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.12", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", ] [[package]] @@ -3316,6 +3549,15 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "thrift" version = "0.17.0" diff --git a/Cargo.toml b/Cargo.toml index 0381b34..12a50e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,10 @@ parquet = { version = "55.2.0", optional = true } arrow = { version = "55.2.0", optional = true } tokio-stream = { version = "0.1.17", optional = true } hyper-util = { version = "0.1.16", optional = true } +sqllogictest = { version = "0.20", optional = true } +regex = { version = "1.0", optional = true } +clap = { version = "4.0", features = ["derive"], optional = true } +env_logger = { version = "0.10", optional = true } pin-project = "1.1.10" [features] @@ -50,6 +54,10 @@ integration = [ "arrow", "tokio-stream", "hyper-util", + "sqllogictest", + "regex", + "clap", + "env_logger", ] tpch = ["integration"] diff --git a/src/bin/logictest.rs b/src/bin/logictest.rs new file mode 100644 index 0000000..2f2dd3c --- /dev/null +++ b/src/bin/logictest.rs @@ -0,0 +1,106 @@ +use clap::Parser; +use datafusion_distributed::test_utils::sqllogictest::DatafusionDistributedDB; +use sqllogictest::Runner; +use std::path::PathBuf; + +#[derive(Parser)] +#[command(name = "logictest")] +#[command( + about = "A SQLLogicTest runner for DataFusion Distributed. Docs: https://sqlite.org/sqllogictest/doc/trunk/about.wiki" +)] +struct Args { + /// Files or directories to run + #[arg(required = true)] + files: Vec, + + /// Update test files with actual output rather than verifying the existing output + #[arg(long = "override")] + override_mode: bool, + + /// Number of workers + #[arg(long, default_value = "3")] + num_workers: usize, +} + +async fn run( + paths: Vec, + runner: &mut Runner, + override_mode: bool, +) -> Result<(), Box> +where + D: sqllogictest::AsyncDB, + M: sqllogictest::MakeConnection, +{ + let mut queue = paths; + let mut idx = 0; + while idx < queue.len() { + let file_path = &queue[idx]; + idx += 1; + if !file_path.is_file() { + queue.extend( + expand_directory(file_path).await.unwrap_or_else(|_| { + panic!("Failed to expand directory: {}", file_path.display()) + }), + ); + continue; + } + let file_path_str = file_path.to_str().expect("Invalid file path"); + + let result = match override_mode { + true => { + runner + .update_test_file( + file_path_str, + " ", + sqllogictest::default_validator, + sqllogictest::default_column_validator, + ) + .await + } + + false => runner.run_file_async(file_path).await.map_err(|e| e.into()), + }; + match result { + Ok(_) => println!("🟢 Success: {}", file_path.display()), + Err(e) => { + eprintln!("{e}"); + return Err(format!("🔴 Failure: {}", file_path.display()).into()); + } + } + } + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + + let mut runner = + Runner::new( + move || async move { Ok(DatafusionDistributedDB::new(args.num_workers).await) }, + ); + + run(args.files, &mut runner, args.override_mode).await?; + + Ok(()) +} + +async fn expand_directory(dir_path: &PathBuf) -> Result, Box> { + let mut entries: Vec<_> = std::fs::read_dir(dir_path)? + .filter_map(|entry| entry.ok()) + .filter(|entry| { + entry + .path() + .extension() + .and_then(|ext| ext.to_str()) + .map(|ext| ext == "slt") + .unwrap_or(false) + }) + .map(|entry| entry.path()) + .collect(); + + // Sort entries for consistent order + entries.sort(); + + Ok(entries) +} diff --git a/src/test_utils/mod.rs b/src/test_utils/mod.rs index 140bd63..e91b00d 100644 --- a/src/test_utils/mod.rs +++ b/src/test_utils/mod.rs @@ -5,4 +5,5 @@ pub mod metrics; pub mod mock_exec; pub mod parquet; pub mod session_context; +pub mod sqllogictest; pub mod tpch; diff --git a/src/test_utils/sqllogictest.rs b/src/test_utils/sqllogictest.rs new file mode 100644 index 0000000..940cbcd --- /dev/null +++ b/src/test_utils/sqllogictest.rs @@ -0,0 +1,154 @@ +use crate::DefaultSessionBuilder; +use crate::DistributedPhysicalOptimizerRule; +use crate::test_utils::localhost::start_localhost_context; +use crate::test_utils::parquet::register_parquet_tables; +use async_trait::async_trait; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::array::{ArrayRef, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::util::display::array_value_to_string; +use datafusion::common::runtime::JoinSet; +use datafusion::error::DataFusionError; +use datafusion::execution::context::SessionContext; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::displayable; +use datafusion::physical_plan::execution_plan::collect; +use regex::Regex; +use sqllogictest::{AsyncDB, DBOutput, DefaultColumnType}; +use std::sync::Arc; + +pub struct DatafusionDistributedDB { + ctx: SessionContext, + _guard: JoinSet<()>, +} + +impl DatafusionDistributedDB { + pub async fn new(num_nodes: usize) -> Self { + let (ctx, _guard) = start_localhost_context(num_nodes, DefaultSessionBuilder).await; + register_parquet_tables(&ctx).await.unwrap(); + Self { ctx, _guard } + } + + pub fn optimize_distributed( + plan: Arc, + ) -> Result, DataFusionError> { + DistributedPhysicalOptimizerRule::default() + .with_network_shuffle_tasks(2) + .with_network_coalesce_tasks(2) + .optimize(plan, &Default::default()) + } + + /// Sanitize file paths in EXPLAIN output to make tests portable across machines + /// Ex. "Users/jay/code/datafusion-distributed/....../file-0000.parquet" -> "datafusion-distributed/....../file-0000.parquet" + fn sanitize_file_paths(plan_str: &str) -> String { + let re = Regex::new(r"(?m)(^|[\[,]\s*)(?:[^,\[\s]*/)*datafusion-distributed/").unwrap(); + re.replace_all(plan_str, "${1}datafusion-distributed/") + .to_string() + } + + fn convert_batches_to_output( + &self, + batches: Vec, + ) -> Result, datafusion::error::DataFusionError> { + if batches.is_empty() { + return Ok(DBOutput::Rows { + types: vec![], + rows: vec![], + }); + } + + let num_columns = batches[0].num_columns(); + let column_types = vec![DefaultColumnType::Text; num_columns]; // Everything as text + + let mut rows = Vec::new(); + for batch in batches { + for row_idx in 0..batch.num_rows() { + let mut row = Vec::new(); + for col_idx in 0..batch.num_columns() { + let column = batch.column(col_idx); + let value = array_value_to_string(column, row_idx) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + row.push(value); + } + rows.push(row); + } + } + + Ok(DBOutput::Rows { + types: column_types, + rows, + }) + } + + async fn handle_explain_analyze( + &mut self, + _sql: &str, + ) -> Result, datafusion::error::DataFusionError> { + unimplemented!(); + } + + async fn handle_explain( + &mut self, + sql: &str, + ) -> Result, datafusion::error::DataFusionError> { + let query = sql.trim_start_matches("EXPLAIN").trim(); + let df = self.ctx.sql(query).await?; + let physical_plan = df.create_physical_plan().await?; + + let physical_distributed = Self::optimize_distributed(physical_plan)?; + + let physical_distributed_str = displayable(physical_distributed.as_ref()) + .indent(true) + .to_string(); + + // Sanitize file paths to make tests portable across machines + let sanitized_str = Self::sanitize_file_paths(&physical_distributed_str); + + let lines: Vec = sanitized_str.lines().map(|s| s.to_string()).collect(); + let schema = Arc::new(Schema::new(vec![Field::new("plan", DataType::Utf8, false)])); + let batch = + RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(lines)) as ArrayRef])?; + + self.convert_batches_to_output(vec![batch]) + } +} + +#[async_trait] +impl AsyncDB for DatafusionDistributedDB { + type Error = datafusion::error::DataFusionError; + type ColumnType = DefaultColumnType; + + async fn run(&mut self, sql: &str) -> Result, Self::Error> { + let sql = sql.trim(); + + // Ignore DDL/DML + if sql.to_uppercase().starts_with("CREATE") + || sql.to_uppercase().starts_with("INSERT") + || sql.to_uppercase().starts_with("DROP") + { + return Ok(DBOutput::StatementComplete(0)); + } + + if sql.to_uppercase().starts_with("EXPLAIN ANALYZE") { + return self.handle_explain_analyze(sql).await; + } + + if sql.to_uppercase().starts_with("EXPLAIN") { + return self.handle_explain(sql).await; + } + + // Default: Execute SELECT statement + let df = self.ctx.sql(sql).await?; + let task_ctx = Arc::new(df.task_ctx()); + let plan = df.create_physical_plan().await?; + let distributed_plan = Self::optimize_distributed(plan)?; + let batches = collect(distributed_plan, task_ctx).await?; + + self.convert_batches_to_output(batches) + } + + fn engine_name(&self) -> &str { + "datafusion-distributed" + } +} diff --git a/tests/distributed_aggregation.rs b/tests/distributed_aggregation.rs deleted file mode 100644 index fa0f3b0..0000000 --- a/tests/distributed_aggregation.rs +++ /dev/null @@ -1,155 +0,0 @@ -#[cfg(all(feature = "integration", test))] -mod tests { - use datafusion::arrow::util::pretty::pretty_format_batches; - use datafusion::physical_optimizer::PhysicalOptimizerRule; - use datafusion::physical_plan::{displayable, execute_stream}; - use datafusion_distributed::test_utils::localhost::start_localhost_context; - use datafusion_distributed::test_utils::parquet::register_parquet_tables; - use datafusion_distributed::{ - DefaultSessionBuilder, DistributedPhysicalOptimizerRule, assert_snapshot, - }; - use futures::TryStreamExt; - use std::error::Error; - - #[tokio::test] - async fn distributed_aggregation() -> Result<(), Box> { - let (ctx, _guard) = start_localhost_context(3, DefaultSessionBuilder).await; - register_parquet_tables(&ctx).await?; - - let df = ctx - .sql(r#"SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)"#) - .await?; - let physical = df.create_physical_plan().await?; - - let physical_str = displayable(physical.as_ref()).indent(true).to_string(); - - let physical_distributed = DistributedPhysicalOptimizerRule::default() - .with_network_shuffle_tasks(2) - .optimize(physical.clone(), &Default::default())?; - - let physical_distributed_str = displayable(physical_distributed.as_ref()) - .indent(true) - .to_string(); - - assert_snapshot!(physical_str, - @r" - ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday] - SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] - SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true] - ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))] - AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=3 - AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] - DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet - ", - ); - - assert_snapshot!(physical_distributed_str, - @r" - ┌───── Stage 2 Tasks: t0:[p0] - │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday] - │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] - │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true] - │ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))] - │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] - │ CoalesceBatchesExec: target_batch_size=8192 - │ NetworkShuffleExec read_from=Stage 1, output_partitions=3, n_tasks=1, input_tasks=2 - └────────────────────────────────────────────────── - ┌───── Stage 1 Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2] - │ RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=2 - │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] - │ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0] - │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet - └────────────────────────────────────────────────── - ", - ); - - let batches = pretty_format_batches( - &execute_stream(physical, ctx.task_ctx())? - .try_collect::>() - .await?, - )?; - - assert_snapshot!(batches, @r" - +----------+-----------+ - | count(*) | RainToday | - +----------+-----------+ - | 66 | Yes | - | 300 | No | - +----------+-----------+ - "); - - let batches_distributed = pretty_format_batches( - &execute_stream(physical_distributed, ctx.task_ctx())? - .try_collect::>() - .await?, - )?; - assert_snapshot!(batches_distributed, @r" - +----------+-----------+ - | count(*) | RainToday | - +----------+-----------+ - | 66 | Yes | - | 300 | No | - +----------+-----------+ - "); - - Ok(()) - } - - #[tokio::test] - async fn distributed_aggregation_head_node_partitioned() -> Result<(), Box> { - let (ctx, _guard) = start_localhost_context(6, DefaultSessionBuilder).await; - register_parquet_tables(&ctx).await?; - - let df = ctx - .sql(r#"SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday""#) - .await?; - let physical = df.create_physical_plan().await?; - - let physical_str = displayable(physical.as_ref()).indent(true).to_string(); - - let physical_distributed = DistributedPhysicalOptimizerRule::default() - .with_network_shuffle_tasks(6) - .with_network_coalesce_tasks(6) - .optimize(physical.clone(), &Default::default())?; - - let physical_distributed_str = displayable(physical_distributed.as_ref()) - .indent(true) - .to_string(); - - assert_snapshot!(physical_str, - @r" - ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday] - AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=3 - AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] - DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet - ", - ); - - assert_snapshot!(physical_distributed_str, - @r" - ┌───── Stage 3 Tasks: t0:[p0] - │ CoalescePartitionsExec - │ NetworkCoalesceExec read_from=Stage 2, output_partitions=18, input_tasks=6 - └────────────────────────────────────────────────── - ┌───── Stage 2 Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2] t2:[p0,p1,p2] t3:[p0,p1,p2] t4:[p0,p1,p2] t5:[p0,p1,p2] - │ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday] - │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] - │ CoalesceBatchesExec: target_batch_size=8192 - │ NetworkShuffleExec read_from=Stage 1, output_partitions=3, n_tasks=6, input_tasks=3 - └────────────────────────────────────────────────── - ┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] - │ RepartitionExec: partitioning=Hash([RainToday@0], 18), input_partitions=1 - │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] - │ PartitionIsolatorExec Tasks: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] - │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet - └────────────────────────────────────────────────── - ", - ); - - Ok(()) - } -} diff --git a/tests/sqllogictest/basic_queries.slt b/tests/sqllogictest/basic_queries.slt new file mode 100644 index 0000000..289fa82 --- /dev/null +++ b/tests/sqllogictest/basic_queries.slt @@ -0,0 +1,32 @@ +# Basic SELECT queries for DataFusion Distributed + +# Group By +query IT nosort +SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*) +---- +66 Yes +300 No + +# Count +query I +SELECT COUNT(*) FROM weather +---- +366 + +# Filtering +query I +SELECT COUNT(*) FROM weather WHERE "RainToday" = 'Yes' +---- +66 + +# Filtering +query I +SELECT COUNT(*) FROM weather WHERE "RainToday" = 'No' +---- +300 + +# Count +query I rowsort +SELECT COUNT(*) FROM flights_1m LIMIT 1 +---- +1000000 diff --git a/tests/sqllogictest/explain.slt b/tests/sqllogictest/explain.slt new file mode 100644 index 0000000..e83a6bb --- /dev/null +++ b/tests/sqllogictest/explain.slt @@ -0,0 +1,24 @@ +# EXPLAIN queries for DataFusion Distributed + +# This should produce the distributed physical plan with network shuffle stages +query T nosort +EXPLAIN SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*) +---- +┌───── Stage 3 Tasks: t0:[p0] +│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday] +│ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] +│ NetworkCoalesceExec read_from=Stage 2, output_partitions=6, input_tasks=2 +└────────────────────────────────────────────────── + ┌───── Stage 2 Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2] + │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true] + │ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))] + │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] + │ CoalesceBatchesExec: target_batch_size=8192 + │ NetworkShuffleExec read_from=Stage 1, output_partitions=3, n_tasks=2, input_tasks=2 + └────────────────────────────────────────────────── + ┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5] t1:[p0,p1,p2,p3,p4,p5] + │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=2 + │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] + │ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0] + │ DataSourceExec: file_groups={3 groups: [[datafusion-distributed/testdata/weather/result-000000.parquet], [datafusion-distributed/testdata/weather/result-000001.parquet], [datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet + └────────────────────────────────────────────────── From 81d62a02bc77f3be1c5e70eda06ce9cdc936ac0d Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 29 Sep 2025 16:57:54 -0400 Subject: [PATCH 2/3] ci: add sqllogictest files to github ci We now run any sqllogictest commands from tests/sqllogictest.sh in CI --- .github/workflows/ci.yml | 9 +++++++++ tests/sqllogictest.sh | 13 +++++++++++++ 2 files changed, 22 insertions(+) create mode 100755 tests/sqllogictest.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f93d292..52bdaaf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -40,6 +40,15 @@ jobs: - uses: ./.github/actions/setup - run: cargo test --features tpch --test tpch_validation_test + sqllogictest: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + lfs: true + - uses: ./.github/actions/setup + - run: tests/sqllogictest.sh + format-check: runs-on: ubuntu-latest steps: diff --git a/tests/sqllogictest.sh b/tests/sqllogictest.sh new file mode 100755 index 0000000..bc25780 --- /dev/null +++ b/tests/sqllogictest.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# SQLLogicTest runner script for DataFusion Distributed +# This script runs the sqllogictest CLI against our test files to verify +# that distributed query execution produces expected results. + +set -e # Exit on any error + +# Test basic queries (aggregations, filtering, etc.) +cargo run --features integration --bin logictest -- tests/sqllogictest/basic_queries.slt --num-workers 3 + +# Test EXPLAIN queries (distributed physical plans) +cargo run --features integration --bin logictest -- tests/sqllogictest/explain.slt --num-workers 3 From 0dd8d3ea831ee5ca2b1b09ec6cad8709ba297189 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 29 Sep 2025 20:53:46 -0400 Subject: [PATCH 3/3] misc: add .vscode dir to .gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index be0175d..bca5c43 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /.idea /target /benchmarks/data/ -testdata/tpch/data/ \ No newline at end of file +testdata/tpch/data/ +.vscode \ No newline at end of file