From 32a97c61bf2a46cc19a115a92bcf0fe95134d9e4 Mon Sep 17 00:00:00 2001 From: eitsupi <50911393+eitsupi@users.noreply.github.com> Date: Sat, 4 May 2024 12:14:33 +0900 Subject: [PATCH] feat: export/import DataFrame as raw vector (#1072) --- DESCRIPTION | 2 +- NEWS.md | 6 +++ R/dataframe__frame.R | 2 + R/extendr-wrappers.R | 6 ++- R/io_ipc.R | 62 +++++++++++++++++++++++-- man/DataFrame_to_raw_ipc.Rd | 51 ++++++++++++++++++++ man/IO_read_ipc.Rd | 14 +++++- man/IO_write_ipc.Rd | 5 ++ src/rust/Cargo.lock | 2 +- src/rust/Cargo.toml | 2 +- src/rust/src/rbackground.rs | 42 ++++++++++------- src/rust/src/rdataframe/mod.rs | 27 +++++++++++ src/rust/src/rdataframe/read_ipc.rs | 6 +-- tests/testthat/_snaps/after-wrappers.md | 45 +++++++++--------- tests/testthat/test-ipc.R | 18 +++++++ tools/lib-sums.tsv | 6 --- 16 files changed, 235 insertions(+), 61 deletions(-) create mode 100644 man/DataFrame_to_raw_ipc.Rd delete mode 100644 tools/lib-sums.tsv diff --git a/DESCRIPTION b/DESCRIPTION index c3734a2b8..542490ba7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -118,5 +118,5 @@ Collate: 'zzz.R' Config/rextendr/version: 0.3.1 VignetteBuilder: knitr -Config/polars/LibVersion: 0.39.2 +Config/polars/LibVersion: 0.39.3 Config/polars/RustToolchainVersion: nightly-2024-04-15 diff --git a/NEWS.md b/NEWS.md index e08dd0d63..1b8c38a08 100644 --- a/NEWS.md +++ b/NEWS.md @@ -2,6 +2,12 @@ ## Polars R Package (development version) +### New features + +- `pl$read_ipc()` can read a raw vector of Apache Arrow IPC file (#1072). +- New method `$to_raw_ipc()` to serialize a DataFrame to a raw vector + of Apache Arrow IPC file format (#1072). + ## Polars R Package 0.16.3 ### New features diff --git a/R/dataframe__frame.R b/R/dataframe__frame.R index 0743cf430..449e746ac 100644 --- a/R/dataframe__frame.R +++ b/R/dataframe__frame.R @@ -1982,6 +1982,8 @@ DataFrame_write_csv = function( #' This functionality is considered **unstable**. #' It may be changed at any point without it being considered a breaking change. #' @rdname IO_write_ipc +#' @seealso +#' - [`$to_raw_ipc()`][DataFrame_to_raw_ipc] #' @examples #' dat = pl$DataFrame(mtcars) #' diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index 5361e84cd..2fe094416 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -118,8 +118,6 @@ test_rbackgroundhandler <- function(lambda, arg) .Call(wrap__test_rbackgroundhan test_rthreadhandle <- function() .Call(wrap__test_rthreadhandle) -test_serde_df <- function(df) .Call(wrap__test_serde_df, df) - internal_wrap_e <- function(robj, str_to_lit) .Call(wrap__internal_wrap_e, robj, str_to_lit) create_col <- function(name) .Call(wrap__create_col, name) @@ -230,6 +228,10 @@ RPolarsDataFrame$write_csv <- function(file, include_bom, include_header, separa RPolarsDataFrame$write_ipc <- function(file, compression, future) .Call(wrap__RPolarsDataFrame__write_ipc, self, file, compression, future) +RPolarsDataFrame$to_raw_ipc <- function(compression, future) .Call(wrap__RPolarsDataFrame__to_raw_ipc, self, compression, future) + +RPolarsDataFrame$from_raw_ipc <- function(bits, n_rows, row_name, row_index, memory_map) .Call(wrap__RPolarsDataFrame__from_raw_ipc, bits, n_rows, row_name, row_index, memory_map) + RPolarsDataFrame$write_parquet <- function(file, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit) .Call(wrap__RPolarsDataFrame__write_parquet, self, file, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit) RPolarsDataFrame$write_json <- function(file, pretty, row_oriented) .Call(wrap__RPolarsDataFrame__write_json, self, file, pretty, row_oriented) diff --git a/R/io_ipc.R b/R/io_ipc.R index 57b8e62e6..053a95351 100644 --- a/R/io_ipc.R +++ b/R/io_ipc.R @@ -54,6 +54,9 @@ pl_scan_ipc = function( #' #' @inherit pl_read_csv return #' @inheritParams pl_scan_ipc +#' @param source A single character or a raw vector of Apache Arrow IPC file. +#' You can use globbing with `*` to scan/read multiple files in the same directory +#' (see examples). #' @rdname IO_read_ipc #' @examplesIf requireNamespace("arrow", quietly = TRUE) && arrow::arrow_with_dataset() #' temp_dir = tempfile() @@ -73,6 +76,15 @@ pl_scan_ipc = function( #' pl$read_ipc( #' file.path(temp_dir, "**/*.arrow") #' ) +#' +#' # Read a raw vector +#' arrow::arrow_table( +#' foo = 1:5, +#' bar = 6:10, +#' ham = letters[1:5] +#' ) |> +#' arrow::write_to_raw(format = "file") |> +#' pl$read_ipc() pl_read_ipc = function( source, ..., @@ -82,9 +94,49 @@ pl_read_ipc = function( row_index_offset = 0L, rechunk = FALSE, cache = TRUE) { - .args = as.list(environment()) - result({ - do.call(pl$scan_ipc, .args)$collect() - }) |> - unwrap("in pl$read_ipc():") + uw = function(res) unwrap(res, "in pl$read_ipc():") + + if (isTRUE(is.raw(source))) { + .pr$DataFrame$from_raw_ipc( + source, + n_rows, + row_index_name, + row_index_offset, + memory_map + ) |> + uw() + } else { + .args = as.list(environment()) + result(do.call(pl$scan_ipc, .args)$collect()) |> + uw() + } +} + + +#' Write Arrow IPC data to a raw vector +#' +#' @inheritParams DataFrame_write_ipc +#' @return A raw vector +#' @seealso +#' - [`$write_ipc()`][DataFrame_write_ipc] +#' @examples +#' df = pl$DataFrame( +#' foo = 1:5, +#' bar = 6:10, +#' ham = letters[1:5] +#' ) +#' +#' raw_ipc = df$to_raw_ipc() +#' +#' pl$read_ipc(raw_ipc) +#' +#' if (require("arrow", quietly = TRUE)) { +#' arrow::read_ipc_file(raw_ipc, as_data_frame = FALSE) +#' } +DataFrame_to_raw_ipc = function( + compression = c("uncompressed", "zstd", "lz4"), + ..., + future = FALSE) { + .pr$DataFrame$to_raw_ipc(self, compression, future) |> + unwrap("in $to_raw_ipc():") } diff --git a/man/DataFrame_to_raw_ipc.Rd b/man/DataFrame_to_raw_ipc.Rd new file mode 100644 index 000000000..0dfbd8d3e --- /dev/null +++ b/man/DataFrame_to_raw_ipc.Rd @@ -0,0 +1,51 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/io_ipc.R +\name{DataFrame_to_raw_ipc} +\alias{DataFrame_to_raw_ipc} +\title{Write Arrow IPC data to a raw vector} +\usage{ +DataFrame_to_raw_ipc( + compression = c("uncompressed", "zstd", "lz4"), + ..., + future = FALSE +) +} +\arguments{ +\item{compression}{\code{NULL} or a character of the compression method, +\code{"uncompressed"} or "lz4" or "zstd". \code{NULL} is equivalent to \code{"uncompressed"}. +Choose "zstd" for good compression performance. Choose "lz4" +for fast compression/decompression.} + +\item{...}{Ignored.} + +\item{future}{Setting this to \code{TRUE} will write Polars' internal data structures that +might not be available by other Arrow implementations. +This functionality is considered \strong{unstable}. +It may be changed at any point without it being considered a breaking change.} +} +\value{ +A raw vector +} +\description{ +Write Arrow IPC data to a raw vector +} +\examples{ +df = pl$DataFrame( + foo = 1:5, + bar = 6:10, + ham = letters[1:5] +) + +raw_ipc = df$to_raw_ipc() + +pl$read_ipc(raw_ipc) + +if (require("arrow", quietly = TRUE)) { + arrow::read_ipc_file(raw_ipc, as_data_frame = FALSE) +} +} +\seealso{ +\itemize{ +\item \code{\link[=DataFrame_write_ipc]{$write_ipc()}} +} +} diff --git a/man/IO_read_ipc.Rd b/man/IO_read_ipc.Rd index 1b7c54a89..90175eb41 100644 --- a/man/IO_read_ipc.Rd +++ b/man/IO_read_ipc.Rd @@ -16,8 +16,9 @@ pl_read_ipc( ) } \arguments{ -\item{source}{Path to a file. You can use globbing with \code{*} to scan/read multiple -files in the same directory (see examples).} +\item{source}{A single character or a raw vector of Apache Arrow IPC file. +You can use globbing with \code{*} to scan/read multiple files in the same directory +(see examples).} \item{...}{Ignored.} @@ -63,5 +64,14 @@ list.files(temp_dir, recursive = TRUE) pl$read_ipc( file.path(temp_dir, "**/*.arrow") ) + +# Read a raw vector +arrow::arrow_table( + foo = 1:5, + bar = 6:10, + ham = letters[1:5] +) |> + arrow::write_to_raw(format = "file") |> + pl$read_ipc() \dontshow{\}) # examplesIf} } diff --git a/man/IO_write_ipc.Rd b/man/IO_write_ipc.Rd index 328e4fbf4..bae6f0d67 100644 --- a/man/IO_write_ipc.Rd +++ b/man/IO_write_ipc.Rd @@ -42,3 +42,8 @@ if (require("arrow", quietly = TRUE)) { arrow::read_ipc_file(destination, as_data_frame = FALSE) } } +\seealso{ +\itemize{ +\item \code{\link[=DataFrame_to_raw_ipc]{$to_raw_ipc()}} +} +} diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 711a797a2..a4e0f1eb8 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -2054,7 +2054,7 @@ dependencies = [ [[package]] name = "r-polars" -version = "0.39.2" +version = "0.39.3" dependencies = [ "either", "extendr-api", diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index 1844886ff..0ab104487 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "r-polars" -version = "0.39.2" +version = "0.39.3" edition = "2021" rust-version = "1.76.0" publish = false diff --git a/src/rust/src/rbackground.rs b/src/rust/src/rbackground.rs index fe7a6581c..5f9cf2307 100644 --- a/src/rust/src/rbackground.rs +++ b/src/rust/src/rbackground.rs @@ -11,7 +11,7 @@ use extendr_api::{ use flume::{bounded, Sender}; use ipc_channel::ipc; use once_cell::sync::Lazy; -use polars::prelude::Series as PSeries; +use polars::prelude as pl; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::thread; @@ -93,31 +93,45 @@ pub fn deserialize_robj(bits: Vec) -> RResult { .when("deserializing an R object") } -pub fn serialize_dataframe(dataframe: &mut polars::prelude::DataFrame) -> RResult> { +pub fn serialize_dataframe( + dataframe: &mut polars::prelude::DataFrame, + compression: Option, + future: bool, +) -> RResult> { use polars::io::SerWriter; let mut dump = Vec::new(); polars::io::ipc::IpcWriter::new(&mut dump) + .with_compression(compression) + .with_pl_flavor(future) .finish(dataframe) .map_err(polars_to_rpolars_err)?; Ok(dump) } -pub fn deserialize_dataframe(bits: &[u8]) -> RResult { +pub fn deserialize_dataframe( + bits: &[u8], + n_rows: Option, + row_index: Option, + memory_map: bool, +) -> RResult { use polars::io::SerReader; polars::io::ipc::IpcReader::new(std::io::Cursor::new(bits)) + .with_n_rows(n_rows) + .with_row_index(row_index) + .memory_mapped(memory_map) .finish() .map_err(polars_to_rpolars_err) } -pub fn serialize_series(series: PSeries) -> RResult> { - serialize_dataframe(&mut std::iter::once(series).collect()) +pub fn serialize_series(series: pl::Series) -> RResult> { + serialize_dataframe(&mut std::iter::once(series).collect(), None, true) } -pub fn deserialize_series(bits: &[u8]) -> RResult { - let tn = std::any::type_name::(); - deserialize_dataframe(bits)? +pub fn deserialize_series(bits: &[u8]) -> RResult { + let tn = std::any::type_name::(); + deserialize_dataframe(bits, None, None, true)? .get_columns() .split_first() .ok_or(RPolarsErr::new()) @@ -480,8 +494,8 @@ impl RBackgroundPool { pub fn rmap_series( &self, raw_func: Vec, - series: PSeries, - ) -> RResult RResult + '_> { + series: pl::Series, + ) -> RResult RResult + '_> { #[cfg(feature = "rpolars_debug_print")] dbg!("rmap_series"); let handler = self.lease()?; @@ -579,13 +593,6 @@ pub fn test_rthreadhandle() -> RPolarsRThreadHandle> { }) } -#[extendr] -pub fn test_serde_df(df: &RPolarsDataFrame) -> RResult { - let x = serialize_dataframe(&mut df.0.clone())?; - let df2 = deserialize_dataframe(x.as_slice())?; - Ok(RPolarsDataFrame(df2)) -} - extendr_module! { mod rbackground; impl RPolarsRThreadHandle>; @@ -595,5 +602,4 @@ extendr_module! { fn handle_background_request; fn test_rbackgroundhandler; fn test_rthreadhandle; - fn test_serde_df; } diff --git a/src/rust/src/rdataframe/mod.rs b/src/rust/src/rdataframe/mod.rs index 9c44732fa..3290f748d 100644 --- a/src/rust/src/rdataframe/mod.rs +++ b/src/rust/src/rdataframe/mod.rs @@ -526,6 +526,33 @@ impl RPolarsDataFrame { .map_err(polars_to_rpolars_err) } + pub fn to_raw_ipc(&self, compression: Robj, future: Robj) -> RResult> { + let compression = rdatatype::new_ipc_compression(compression)?; + let future = robj_to!(bool, future)?; + + crate::rbackground::serialize_dataframe(&mut self.0.clone(), compression, future) + } + + pub fn from_raw_ipc( + bits: Robj, + n_rows: Robj, + row_name: Robj, + row_index: Robj, + memory_map: Robj, + ) -> RResult { + let bits = robj_to!(Raw, bits)?; + let n_rows = robj_to!(Option, usize, n_rows)?; + let row_index = robj_to!(Option, String, row_name)? + .map(|name| { + robj_to!(u32, row_index).map(|offset| polars::io::RowIndex { name, offset }) + }) + .transpose()?; + let memory_map = robj_to!(bool, memory_map)?; + let df = crate::rbackground::deserialize_dataframe(&bits, n_rows, row_index, memory_map)?; + + Ok(RPolarsDataFrame(df)) + } + pub fn write_parquet( &self, file: Robj, diff --git a/src/rust/src/rdataframe/read_ipc.rs b/src/rust/src/rdataframe/read_ipc.rs index 0ab4f601d..0ab8fb784 100644 --- a/src/rust/src/rdataframe/read_ipc.rs +++ b/src/rust/src/rdataframe/read_ipc.rs @@ -1,4 +1,4 @@ -use crate::lazy::dataframe::RPolarsLazyFrame as RLazyFrame; +use crate::lazy::dataframe::RPolarsLazyFrame; use crate::robj_to; use crate::rpolarserr::RResult; use extendr_api::prelude::*; @@ -14,7 +14,7 @@ pub fn import_arrow_ipc( row_name: Robj, row_index: Robj, memory_map: Robj, -) -> RResult { +) -> RResult { let args = ScanArgsIpc { n_rows: robj_to!(Option, usize, n_rows)?, cache: robj_to!(bool, cache)?, @@ -27,7 +27,7 @@ pub fn import_arrow_ipc( }; let lf = LazyFrame::scan_ipc(robj_to!(String, path)?, args) .map_err(crate::rpolarserr::polars_to_rpolars_err)?; - Ok(RLazyFrame(lf)) + Ok(RPolarsLazyFrame(lf)) } extendr_module! { diff --git a/tests/testthat/_snaps/after-wrappers.md b/tests/testthat/_snaps/after-wrappers.md index 90695389c..6cb0462cb 100644 --- a/tests/testthat/_snaps/after-wrappers.md +++ b/tests/testthat/_snaps/after-wrappers.md @@ -91,10 +91,10 @@ [49] "select_seq" "shape" "shift" "shift_and_fill" [53] "slice" "sort" "sql" "std" [57] "sum" "tail" "to_data_frame" "to_list" - [61] "to_series" "to_struct" "transpose" "unique" - [65] "unnest" "var" "width" "with_columns" - [69] "with_columns_seq" "with_row_index" "write_csv" "write_ipc" - [73] "write_json" "write_ndjson" "write_parquet" + [61] "to_raw_ipc" "to_series" "to_struct" "transpose" + [65] "unique" "unnest" "var" "width" + [69] "with_columns" "with_columns_seq" "with_row_index" "write_csv" + [73] "write_ipc" "write_json" "write_ndjson" "write_parquet" --- @@ -107,24 +107,25 @@ [7] "dtype_strings" "dtypes" [9] "equals" "estimated_size" [11] "export_stream" "from_arrow_record_batches" - [13] "get_column" "get_columns" - [15] "lazy" "melt" - [17] "n_chunks" "new_with_capacity" - [19] "null_count" "partition_by" - [21] "pivot_expr" "print" - [23] "rechunk" "sample_frac" - [25] "sample_n" "schema" - [27] "select" "select_at_idx" - [29] "select_seq" "set_column_from_robj" - [31] "set_column_from_series" "set_column_names_mut" - [33] "shape" "to_list" - [35] "to_list_tag_structs" "to_list_unwind" - [37] "to_struct" "transpose" - [39] "unnest" "with_columns" - [41] "with_columns_seq" "with_row_index" - [43] "write_csv" "write_ipc" - [45] "write_json" "write_ndjson" - [47] "write_parquet" + [13] "from_raw_ipc" "get_column" + [15] "get_columns" "lazy" + [17] "melt" "n_chunks" + [19] "new_with_capacity" "null_count" + [21] "partition_by" "pivot_expr" + [23] "print" "rechunk" + [25] "sample_frac" "sample_n" + [27] "schema" "select" + [29] "select_at_idx" "select_seq" + [31] "set_column_from_robj" "set_column_from_series" + [33] "set_column_names_mut" "shape" + [35] "to_list" "to_list_tag_structs" + [37] "to_list_unwind" "to_raw_ipc" + [39] "to_struct" "transpose" + [41] "unnest" "with_columns" + [43] "with_columns_seq" "with_row_index" + [45] "write_csv" "write_ipc" + [47] "write_json" "write_ndjson" + [49] "write_parquet" # public and private methods of each class GroupBy diff --git a/tests/testthat/test-ipc.R b/tests/testthat/test-ipc.R index 04171decc..d2cf3d808 100644 --- a/tests/testthat/test-ipc.R +++ b/tests/testthat/test-ipc.R @@ -74,3 +74,21 @@ test_that("write_ipc returns the input data", { x = dat$write_ipc(tmpf) expect_identical(x$to_list(), dat$to_list()) }) + + +patrick::with_parameters_test_that("input/output DataFrame as raw vector", + { + df = as_polars_df(mtcars) + + raw_vec = df$to_raw_ipc( + compression = compression, + future = TRUE + ) + + expect_true( + df$equals(pl$read_ipc(raw_vec)) + ) + }, + compression = c("uncompressed", "lz4", "zstd"), + .test_name = compression +) diff --git a/tools/lib-sums.tsv b/tools/lib-sums.tsv deleted file mode 100644 index 7d3ceb982..000000000 --- a/tools/lib-sums.tsv +++ /dev/null @@ -1,6 +0,0 @@ -url sha256sum -https://github.com/pola-rs/r-polars/releases/download/lib-v0.39.2/libr_polars-0.39.2-aarch64-apple-darwin.tar.gz 514ec8af46cbc56988a95b8143f63a0f74afba59c3d8844aa9cf83d98b6d2e54 -https://github.com/pola-rs/r-polars/releases/download/lib-v0.39.2/libr_polars-0.39.2-aarch64-unknown-linux-gnu.tar.gz 8b8978339a87c485e93825ef27e4cd3f11c1f2e1f35cdaa0e917b7371f4638fc -https://github.com/pola-rs/r-polars/releases/download/lib-v0.39.2/libr_polars-0.39.2-x86_64-apple-darwin.tar.gz 4738714d14e8d8ea6efc7aea46757791ef0a0f6a27662a7b529fd348bcb18395 -https://github.com/pola-rs/r-polars/releases/download/lib-v0.39.2/libr_polars-0.39.2-x86_64-pc-windows-gnu.tar.gz 72381da18f7a9d65f6e0378ddcb3979af0ae4c465d948569150ec15e06bb7b2c -https://github.com/pola-rs/r-polars/releases/download/lib-v0.39.2/libr_polars-0.39.2-x86_64-unknown-linux-gnu.tar.gz fb5ccbf83e4e4f6c2ffcd17ab43e09112567a5e966900f5a21778f2d002587a3