Skip to content

Commit 406e5aa

Browse files
authored
test(wasmtest): enable compression feature for wasm build (#19860)
## Which issue does this PR close? - Refs #17509 (Switch from xz2 to liblzma to reduce duplicate dependencies) ## Rationale for this change Switching from `xz2` to `liblzma` enabled wasm builds (liblzma supports wasm), so we can now enable the `compression` feature in wasm tests. ## What changes are included in this PR? - Add the `compression` feature to the `datafusion` dependency in `datafusion/wasmtest/Cargo.toml`. ## Are these changes tested? - Verified by CI in this PR. ## Are there any user-facing changes? - No. (test configuration change only)
1 parent 774a7b5 commit 406e5aa

File tree

3 files changed

+62
-3
lines changed

3 files changed

+62
-3
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/wasmtest/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ chrono = { version = "0.4", features = ["wasmbind"] }
4747
# all the `std::fmt` and `std::panicking` infrastructure, so isn't great for
4848
# code size when deploying.
4949
console_error_panic_hook = { version = "0.1.1", optional = true }
50-
datafusion = { workspace = true, features = ["parquet", "sql"] }
50+
datafusion = { workspace = true, features = ["compression", "parquet", "sql"] }
5151
datafusion-common = { workspace = true }
5252
datafusion-execution = { workspace = true }
5353
datafusion-expr = { workspace = true }
@@ -59,6 +59,8 @@ getrandom = { version = "0.3", features = ["wasm_js"] }
5959
wasm-bindgen = "0.2.99"
6060

6161
[dev-dependencies]
62+
bytes = { workspace = true }
63+
futures = { workspace = true }
6264
object_store = { workspace = true }
6365
# needs to be compiled
6466
tokio = { workspace = true }

datafusion/wasmtest/src/lib.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,27 @@ mod test {
8080
use std::sync::Arc;
8181

8282
use super::*;
83+
use bytes::Bytes;
84+
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
8385
use datafusion::{
8486
arrow::{
8587
array::{ArrayRef, Int32Array, RecordBatch, StringArray},
8688
datatypes::{DataType, Field, Schema},
8789
},
8890
datasource::MemTable,
8991
execution::context::SessionContext,
92+
prelude::CsvReadOptions,
9093
};
91-
use datafusion_common::test_util::batches_to_string;
94+
use datafusion_common::{DataFusionError, test_util::batches_to_string};
9295
use datafusion_execution::{
9396
config::SessionConfig,
9497
disk_manager::{DiskManagerBuilder, DiskManagerMode},
9598
runtime_env::RuntimeEnvBuilder,
9699
};
97100
use datafusion_physical_plan::collect;
98101
use datafusion_sql::parser::DFParser;
99-
use object_store::{ObjectStore, memory::InMemory, path::Path};
102+
use futures::{StreamExt, TryStreamExt, stream};
103+
use object_store::{ObjectStore, PutPayload, memory::InMemory, path::Path};
100104
use url::Url;
101105
use wasm_bindgen_test::wasm_bindgen_test;
102106

@@ -259,4 +263,55 @@ mod test {
259263
+----+-------+"
260264
);
261265
}
266+
267+
#[wasm_bindgen_test(unsupported = tokio::test)]
268+
async fn test_csv_read_xz_compressed() {
269+
let csv_data = "id,value\n1,a\n2,b\n3,c\n";
270+
let input = Bytes::from(csv_data.as_bytes().to_vec());
271+
let input_stream =
272+
stream::iter(vec![Ok::<Bytes, DataFusionError>(input)]).boxed();
273+
274+
let compressed_stream = FileCompressionType::XZ
275+
.convert_to_compress_stream(input_stream)
276+
.unwrap();
277+
let compressed_data: Vec<Bytes> = compressed_stream.try_collect().await.unwrap();
278+
279+
let store = InMemory::new();
280+
let path = Path::from("data.csv.xz");
281+
store
282+
.put(&path, PutPayload::from_iter(compressed_data))
283+
.await
284+
.unwrap();
285+
286+
let url = Url::parse("memory://").unwrap();
287+
let ctx = SessionContext::new();
288+
ctx.register_object_store(&url, Arc::new(store));
289+
290+
let csv_options = CsvReadOptions::new()
291+
.has_header(true)
292+
.file_compression_type(FileCompressionType::XZ)
293+
.file_extension("csv.xz");
294+
ctx.register_csv("compressed", "memory:///data.csv.xz", csv_options)
295+
.await
296+
.unwrap();
297+
298+
let result = ctx
299+
.sql("SELECT * FROM compressed")
300+
.await
301+
.unwrap()
302+
.collect()
303+
.await
304+
.unwrap();
305+
306+
assert_eq!(
307+
batches_to_string(&result),
308+
"+----+-------+\n\
309+
| id | value |\n\
310+
+----+-------+\n\
311+
| 1 | a |\n\
312+
| 2 | b |\n\
313+
| 3 | c |\n\
314+
+----+-------+"
315+
);
316+
}
262317
}

0 commit comments

Comments
 (0)