From 396d8517d9efdc7b5af04e2850e174888955ba4c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 17 Sep 2024 09:38:22 -0500 Subject: [PATCH] feat(r): Add bindings to IPC writer (#608) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds a basic level of support for IPC writing in the R package. This is basically a thin wrapper around `ArrowIpcWriterWriteStream()` and could be more feature-rich like the Python version (that allows schemas and batches to be written individually). I also added a bit of code to handle interrupts (which should catch interrupts on read and write and wasn't handled before). ``` r library(nanoarrow) tf <- tempfile() nycflights13::flights |> write_nanoarrow(tf) (df <- tf |> read_nanoarrow() |> tibble::as_tibble()) #> # A tibble: 336,776 × 19 #> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time #> #> 1 2013 1 1 517 515 2 830 819 #> 2 2013 1 1 533 529 4 850 830 #> 3 2013 1 1 542 540 2 923 850 #> 4 2013 1 1 544 545 -1 1004 1022 #> 5 2013 1 1 554 600 -6 812 837 #> 6 2013 1 1 554 558 -4 740 728 #> 7 2013 1 1 555 600 -5 913 854 #> 8 2013 1 1 557 600 -3 709 723 #> 9 2013 1 1 557 600 -3 838 846 #> 10 2013 1 1 558 600 -2 753 745 #> # ℹ 336,766 more rows #> # ℹ 11 more variables: arr_delay , carrier , flight , #> # tailnum , origin , dest , air_time , distance , #> # hour , minute , time_hour identical(df, nycflights13::flights) #> [1] TRUE ``` Created on 2024-09-14 with [reprex v2.1.1](https://reprex.tidyverse.org) --- r/DESCRIPTION | 2 +- r/NAMESPACE | 3 + r/R/ipc.R | 72 +++++++++++++++-- r/man/read_nanoarrow.Rd | 18 +++-- r/src/init.c | 5 ++ r/src/ipc.c | 149 +++++++++++++++++++++++++++++++++++- r/src/util.c | 7 +- r/src/util.h | 1 + r/tests/testthat/test-ipc.R | 95 ++++++++++++++++++++++- 9 files changed, 331 insertions(+), 21 deletions(-) diff --git a/r/DESCRIPTION b/r/DESCRIPTION index d20eec34e..addb1381d 100644 --- a/r/DESCRIPTION +++ b/r/DESCRIPTION @@ -19,7 +19,7 @@ Description: Provides an 'R' interface to the 'nanoarrow' 'C' library and the License: Apache License (>= 2) Encoding: UTF-8 Roxygen: list(markdown = TRUE) -RoxygenNote: 7.2.3 +RoxygenNote: 7.3.2 URL: https://arrow.apache.org/nanoarrow/latest/r/, https://github.com/apache/arrow-nanoarrow BugReports: https://github.com/apache/arrow-nanoarrow/issues Suggests: diff --git a/r/NAMESPACE b/r/NAMESPACE index aa6e71a08..d9fa33037 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -127,6 +127,8 @@ S3method(str,nanoarrow_array_stream) S3method(str,nanoarrow_buffer) S3method(str,nanoarrow_schema) S3method(str,nanoarrow_vctr) +S3method(write_nanoarrow,character) +S3method(write_nanoarrow,connection) export(array_stream_set_finalizer) export(as_nanoarrow_array) export(as_nanoarrow_array_extension) @@ -210,6 +212,7 @@ export(read_nanoarrow) export(register_nanoarrow_extension) export(resolve_nanoarrow_extension) export(unregister_nanoarrow_extension) +export(write_nanoarrow) importFrom(utils,getFromNamespace) importFrom(utils,str) useDynLib(nanoarrow, .registration = TRUE) diff --git a/r/R/ipc.R b/r/R/ipc.R index 29471b0f5..14f49f927 100644 --- a/r/R/ipc.R +++ b/r/R/ipc.R @@ -15,17 +15,17 @@ # specific language governing permissions and limitations # under the License. -#' Read serialized streams of Arrow data +#' Read/write serialized streams of Arrow data #' -#' Reads connections, file paths, URLs, or raw vectors of serialized Arrow -#' data. Arrow documentation typically refers to this format as "Arrow IPC", -#' since its origin was as a means to transmit tables between processes +#' Reads/writes connections, file paths, URLs, or raw vectors from/to serialized +#' Arrow data. Arrow documentation typically refers to this format as "Arrow +#' IPC", since its origin was as a means to transmit tables between processes #' (e.g., multiple R sessions). This format can also be written to and read #' from files or URLs and is essentially a high performance equivalent of #' a CSV file that does a better job maintaining types. #' -#' The nanoarrow package does not currently have the ability to write serialized -#' IPC data: use [arrow::write_ipc_stream()] to write data from R, or use +#' The nanoarrow package implements an IPC writer; however, you can also +#' use [arrow::write_ipc_stream()] to write data from R, or use #' the equivalent writer from another Arrow implementation in Python, C++, #' Rust, JavaScript, Julia, C#, and beyond. #' @@ -35,6 +35,8 @@ #' @param x A `raw()` vector, connection, or file path from which to read #' binary data. Common extensions indicating compression (.gz, .bz2, .zip) #' are automatically uncompressed. +#' @param data An object to write as an Arrow IPC stream, converted using +#' [as_nanoarrow_array_stream()]. Notably, this includes a [data.frame()]. #' @param lazy By default, `read_nanoarrow()` will read and discard a copy of #' the reader's schema to ensure that invalid streams are discovered as #' soon as possible. Use `lazy = TRUE` to defer this check until the reader @@ -107,6 +109,42 @@ read_nanoarrow.connection <- function(x, ..., lazy = FALSE) { check_stream_if_requested(reader, lazy) } +#' @rdname read_nanoarrow +#' @export +write_nanoarrow <- function(data, x, ...) { + UseMethod("write_nanoarrow", x) +} + +#' @export +write_nanoarrow.connection <- function(data, x, ...) { + if (!isOpen(x)) { + open(x, "wb") + on.exit(close(x)) + } + + writer <- .Call(nanoarrow_c_ipc_writer_connection, x) + stream <- as_nanoarrow_array_stream(data) + on.exit(nanoarrow_pointer_release(stream), add = TRUE) + + .Call(nanoarrow_c_ipc_writer_write_stream, writer, stream) + invisible(data) +} + +#' @export +write_nanoarrow.character <- function(data, x, ...) { + if (length(x) != 1) { + stop(sprintf("Can't interpret character(%d) as file path", length(x))) + } + + con_type <- guess_connection_type(x) + if (con_type == "unz") { + stop("zip compression not supported for write_nanoarrow()") + } + + con <- do.call(con_type, list(x)) + write_nanoarrow(data, con) +} + #' @rdname read_nanoarrow #' @export example_ipc_stream <- function() { @@ -205,3 +243,25 @@ guess_zip_filename <- function(x) { files } + +# The C-level R_tryCatch() does not provide for handling interrupts (or +# I couldn't figure out how to make it work), so instead we provide wrappers +# around readBin() and writeBin() that convert interrupt conditions to errors +# (which the C code does know how to handle). +read_bin_wrapper <- function(con, what, n) { + withCallingHandlers( + readBin(con, what, n), + interrupt = function(e) { + stop("user interrupt") + } + ) +} + +write_bin_wrapper <- function(object, con) { + withCallingHandlers( + writeBin(object, con), + interrupt = function(e) { + stop("user interrupt") + } + ) +} diff --git a/r/man/read_nanoarrow.Rd b/r/man/read_nanoarrow.Rd index 87d300c8b..cf4d423eb 100644 --- a/r/man/read_nanoarrow.Rd +++ b/r/man/read_nanoarrow.Rd @@ -2,11 +2,14 @@ % Please edit documentation in R/ipc.R \name{read_nanoarrow} \alias{read_nanoarrow} +\alias{write_nanoarrow} \alias{example_ipc_stream} -\title{Read serialized streams of Arrow data} +\title{Read/write serialized streams of Arrow data} \usage{ read_nanoarrow(x, ..., lazy = FALSE) +write_nanoarrow(data, x, ...) + example_ipc_stream() } \arguments{ @@ -20,21 +23,24 @@ are automatically uncompressed.} the reader's schema to ensure that invalid streams are discovered as soon as possible. Use \code{lazy = TRUE} to defer this check until the reader is actually consumed.} + +\item{data}{An object to write as an Arrow IPC stream, converted using +\code{\link[=as_nanoarrow_array_stream]{as_nanoarrow_array_stream()}}. Notably, this includes a \code{\link[=data.frame]{data.frame()}}.} } \value{ A \link[=as_nanoarrow_array_stream]{nanoarrow_array_stream} } \description{ -Reads connections, file paths, URLs, or raw vectors of serialized Arrow -data. Arrow documentation typically refers to this format as "Arrow IPC", -since its origin was as a means to transmit tables between processes +Reads/writes connections, file paths, URLs, or raw vectors from/to serialized +Arrow data. Arrow documentation typically refers to this format as "Arrow +IPC", since its origin was as a means to transmit tables between processes (e.g., multiple R sessions). This format can also be written to and read from files or URLs and is essentially a high performance equivalent of a CSV file that does a better job maintaining types. } \details{ -The nanoarrow package does not currently have the ability to write serialized -IPC data: use \code{\link[arrow:write_ipc_stream]{arrow::write_ipc_stream()}} to write data from R, or use +The nanoarrow package implements an IPC writer; however, you can also +use \code{\link[arrow:write_ipc_stream]{arrow::write_ipc_stream()}} to write data from R, or use the equivalent writer from another Arrow implementation in Python, C++, Rust, JavaScript, Julia, C#, and beyond. diff --git a/r/src/init.c b/r/src/init.c index 69c943911..58fe9f839 100644 --- a/r/src/init.c +++ b/r/src/init.c @@ -58,6 +58,8 @@ extern SEXP nanoarrow_c_infer_ptype(SEXP schema_xptr); extern SEXP nanoarrow_c_convert_array(SEXP array_xptr, SEXP ptype_sexp); extern SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr); extern SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con); +extern SEXP nanoarrow_c_ipc_writer_connection(SEXP con); +extern SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP array_stream_xptr); extern SEXP nanoarrow_c_allocate_schema(void); extern SEXP nanoarrow_c_allocate_array(void); extern SEXP nanoarrow_c_allocate_array_stream(void); @@ -136,6 +138,9 @@ static const R_CallMethodDef CallEntries[] = { 1}, {"nanoarrow_c_ipc_array_reader_connection", (DL_FUNC)&nanoarrow_c_ipc_array_reader_connection, 1}, + {"nanoarrow_c_ipc_writer_connection", (DL_FUNC)&nanoarrow_c_ipc_writer_connection, 1}, + {"nanoarrow_c_ipc_writer_write_stream", (DL_FUNC)&nanoarrow_c_ipc_writer_write_stream, + 2}, {"nanoarrow_c_allocate_schema", (DL_FUNC)&nanoarrow_c_allocate_schema, 0}, {"nanoarrow_c_allocate_array", (DL_FUNC)&nanoarrow_c_allocate_array, 0}, {"nanoarrow_c_allocate_array_stream", (DL_FUNC)&nanoarrow_c_allocate_array_stream, 0}, diff --git a/r/src/ipc.c b/r/src/ipc.c index 3039c7e6e..396a25fc2 100644 --- a/r/src/ipc.c +++ b/r/src/ipc.c @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #define R_NO_REMAP #include #include @@ -48,6 +49,50 @@ static SEXP input_stream_owning_xptr(void) { return input_stream_xptr; } +static void finalize_output_stream_xptr(SEXP output_stream_xptr) { + struct ArrowIpcOutputStream* output_stream = + (struct ArrowIpcOutputStream*)R_ExternalPtrAddr(output_stream_xptr); + if (output_stream != NULL && output_stream->release != NULL) { + output_stream->release(output_stream); + } + + if (output_stream != NULL) { + ArrowFree(output_stream); + } +} + +static SEXP output_stream_owning_xptr(void) { + struct ArrowIpcOutputStream* output_stream = + (struct ArrowIpcOutputStream*)ArrowMalloc(sizeof(struct ArrowIpcOutputStream)); + output_stream->release = NULL; + SEXP output_stream_xptr = + PROTECT(R_MakeExternalPtr(output_stream, R_NilValue, R_NilValue)); + R_RegisterCFinalizer(output_stream_xptr, &finalize_output_stream_xptr); + UNPROTECT(1); + return output_stream_xptr; +} + +static void finalize_writer_xptr(SEXP writer_xptr) { + struct ArrowIpcWriter* writer = (struct ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr); + if (writer != NULL && writer->private_data != NULL) { + ArrowIpcWriterReset(writer); + } + + if (writer != NULL) { + ArrowFree(writer); + } +} + +static SEXP writer_owning_xptr(void) { + struct ArrowIpcWriter* writer = + (struct ArrowIpcWriter*)ArrowMalloc(sizeof(struct ArrowIpcWriter)); + writer->private_data = NULL; + SEXP writer_xptr = PROTECT(R_MakeExternalPtr(writer, R_NilValue, R_NilValue)); + R_RegisterCFinalizer(writer_xptr, &finalize_writer_xptr); + UNPROTECT(1); + return writer_xptr; +} + SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr) { struct ArrowBuffer* buffer = buffer_from_xptr(buffer_xptr); @@ -82,7 +127,7 @@ struct ConnectionInputStreamHandler { int return_code; }; -static SEXP handle_readbin_error(SEXP cond, void* hdata) { +static SEXP handle_readbin_writebin_error(SEXP cond, void* hdata) { struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata; SEXP fun = PROTECT(Rf_install("conditionMessage")); @@ -103,7 +148,7 @@ static SEXP call_readbin(void* hdata) { SEXP n = PROTECT(Rf_ScalarReal((double)data->buf_size_bytes)); SEXP call = PROTECT(Rf_lang4(nanoarrow_sym_readbin, data->con, nanoarrow_ptype_raw, n)); - SEXP result = PROTECT(Rf_eval(call, R_BaseEnv)); + SEXP result = PROTECT(Rf_eval(call, nanoarrow_ns_pkg)); R_xlen_t bytes_read = Rf_xlength(result); memcpy(data->buf, RAW(result), bytes_read); *(data->size_read_out) = bytes_read; @@ -112,6 +157,36 @@ static SEXP call_readbin(void* hdata) { return R_NilValue; } +static SEXP call_writebin(void* hdata) { + struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata; + + // Write 16MB chunks. This a balance between being small enough not to + // copy too much of the source unnecessarily and big enough to avoid + // unnecessary R evaluation overhead. + int64_t chunk_buffer_size = 16777216; + SEXP chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, chunk_buffer_size)); + SEXP call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con)); + while (data->buf_size_bytes > chunk_buffer_size) { + memcpy(RAW(chunk_buffer), data->buf, chunk_buffer_size); + Rf_eval(call, nanoarrow_ns_pkg); + data->buf_size_bytes -= chunk_buffer_size; + data->buf += chunk_buffer_size; + } + + UNPROTECT(2); + + // Write remaining bytes + if (data->buf_size_bytes > 0) { + chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, data->buf_size_bytes)); + call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con)); + memcpy(RAW(chunk_buffer), data->buf, data->buf_size_bytes); + Rf_eval(call, nanoarrow_ns_pkg); + UNPROTECT(2); + } + + return R_NilValue; +} + static ArrowErrorCode read_con_input_stream(struct ArrowIpcInputStream* stream, uint8_t* buf, int64_t buf_size_bytes, int64_t* size_read_out, @@ -129,7 +204,32 @@ static ArrowErrorCode read_con_input_stream(struct ArrowIpcInputStream* stream, data.error = error; data.return_code = NANOARROW_OK; - R_tryCatchError(&call_readbin, &data, &handle_readbin_error, &data); + R_tryCatchError(&call_readbin, &data, &handle_readbin_writebin_error, &data); + return data.return_code; +} + +static ArrowErrorCode write_con_output_stream(struct ArrowIpcOutputStream* stream, + const void* buf, int64_t buf_size_bytes, + int64_t* size_write_out, + struct ArrowError* error) { + if (!nanoarrow_is_main_thread()) { + ArrowErrorSet(error, "Can't read from R connection on a non-R thread"); + return EIO; + } + + struct ConnectionInputStreamHandler data; + data.con = (SEXP)stream->private_data; + data.buf = (void*)buf; + data.buf_size_bytes = buf_size_bytes; + data.size_read_out = NULL; + data.error = error; + data.return_code = NANOARROW_OK; + + R_tryCatchError(&call_writebin, &data, &handle_readbin_writebin_error, &data); + + // This implementation always blocks until all bytes have been written + *size_write_out = buf_size_bytes; + return data.return_code; } @@ -137,6 +237,10 @@ static void release_con_input_stream(struct ArrowIpcInputStream* stream) { nanoarrow_release_sexp((SEXP)stream->private_data); } +static void release_con_output_stream(struct ArrowIpcOutputStream* stream) { + nanoarrow_release_sexp((SEXP)stream->private_data); +} + SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) { SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr()); struct ArrowArrayStream* array_stream = @@ -153,9 +257,46 @@ SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) { int code = ArrowIpcArrayStreamReaderInit(array_stream, input_stream, NULL); if (code != NANOARROW_OK) { - Rf_error("ArrowIpcArrayStreamReaderInit() failed"); + Rf_error("ArrowIpcArrayStreamReaderInit() failed with errno %d", code); } UNPROTECT(2); return array_stream_xptr; } + +SEXP nanoarrow_c_ipc_writer_connection(SEXP con) { + SEXP output_stream_xptr = PROTECT(output_stream_owning_xptr()); + struct ArrowIpcOutputStream* output_stream = + (struct ArrowIpcOutputStream*)R_ExternalPtrAddr(output_stream_xptr); + + output_stream->write = &write_con_output_stream; + output_stream->release = &release_con_output_stream; + output_stream->private_data = (SEXP)con; + nanoarrow_preserve_sexp(con); + + SEXP writer_xptr = PROTECT(writer_owning_xptr()); + struct ArrowIpcWriter* writer = (struct ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr); + + int code = ArrowIpcWriterInit(writer, output_stream); + if (code != NANOARROW_OK) { + Rf_error("ArrowIpcWriterInit() failed with errno %d", code); + } + + UNPROTECT(2); + return writer_xptr; +} + +SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP array_stream_xptr) { + struct ArrowIpcWriter* writer = (struct ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr); + struct ArrowArrayStream* array_stream = + nanoarrow_array_stream_from_xptr(array_stream_xptr); + + struct ArrowError error; + ArrowErrorInit(&error); + int code = ArrowIpcWriterWriteArrayStream(writer, array_stream, &error); + if (code != NANOARROW_OK) { + Rf_error("ArrowIpcWriterWriteArrayStream() failed: %s", error.message); + } + + return R_NilValue; +} diff --git a/r/src/util.c b/r/src/util.c index 6d4035ba6..56d9d05a7 100644 --- a/r/src/util.c +++ b/r/src/util.c @@ -30,6 +30,7 @@ SEXP nanoarrow_cls_schema = NULL; SEXP nanoarrow_cls_array_stream = NULL; SEXP nanoarrow_cls_buffer = NULL; SEXP nanoarrow_sym_readbin = NULL; +SEXP nanoarrow_sym_writebin = NULL; SEXP nanoarrow_ptype_raw = NULL; void nanoarrow_init_cached_sexps(void) { @@ -42,7 +43,8 @@ void nanoarrow_init_cached_sexps(void) { nanoarrow_cls_schema = PROTECT(Rf_mkString("nanoarrow_schema")); nanoarrow_cls_array_stream = PROTECT(Rf_mkString("nanoarrow_array_stream")); nanoarrow_cls_buffer = PROTECT(Rf_mkString("nanoarrow_buffer")); - nanoarrow_sym_readbin = PROTECT(Rf_install("readBin")); + nanoarrow_sym_readbin = PROTECT(Rf_install("read_bin_wrapper")); + nanoarrow_sym_writebin = PROTECT(Rf_install("write_bin_wrapper")); nanoarrow_ptype_raw = PROTECT(Rf_allocVector(RAWSXP, 0)); R_PreserveObject(nanoarrow_ns_pkg); @@ -54,9 +56,10 @@ void nanoarrow_init_cached_sexps(void) { R_PreserveObject(nanoarrow_cls_array_stream); R_PreserveObject(nanoarrow_cls_buffer); R_PreserveObject(nanoarrow_sym_readbin); + R_PreserveObject(nanoarrow_sym_writebin); R_PreserveObject(nanoarrow_ptype_raw); - UNPROTECT(11); + UNPROTECT(12); } SEXP nanoarrow_c_preserved_count(void) { diff --git a/r/src/util.h b/r/src/util.h index d652330ed..23a9f2ed5 100644 --- a/r/src/util.h +++ b/r/src/util.h @@ -32,6 +32,7 @@ extern SEXP nanoarrow_cls_schema; extern SEXP nanoarrow_cls_array_stream; extern SEXP nanoarrow_cls_buffer; extern SEXP nanoarrow_sym_readbin; +extern SEXP nanoarrow_sym_writebin; extern SEXP nanoarrow_ptype_raw; void nanoarrow_init_cached_sexps(void); diff --git a/r/tests/testthat/test-ipc.R b/r/tests/testthat/test-ipc.R index 85adf0f04..3944de4b3 100644 --- a/r/tests/testthat/test-ipc.R +++ b/r/tests/testthat/test-ipc.R @@ -36,6 +36,20 @@ test_that("read_nanoarrow() works for open connections", { ) }) +test_that("write_nanoarrow() works for open connections", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- rawConnection(raw(), "wb") + on.exit(close(con)) + + write_nanoarrow(data.frame(), con) + expect_identical( + as.data.frame(read_nanoarrow(rawConnectionValue(con))), + data.frame() + ) +}) + test_that("read_nanoarrow() works for unopened connections", { tf <- tempfile() on.exit(unlink(tf)) @@ -56,6 +70,20 @@ test_that("read_nanoarrow() works for unopened connections", { ) }) +test_that("write_nanoarrow() works for unopened connections", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- file(tf) + # Don't close on exit, because we're supposed to do that + + write_nanoarrow(data.frame(), con) + expect_error( + close(con), + "invalid connection" + ) +}) + test_that("read_nanoarrow() works for file paths", { tf <- tempfile() on.exit(unlink(tf)) @@ -71,6 +99,15 @@ test_that("read_nanoarrow() works for file paths", { ) }) +test_that("write_nanoarrow() works for file paths", { + tf <- tempfile() + on.exit(unlink(tf)) + + df <- data.frame(letters = letters) + expect_identical(write_nanoarrow(df, tf), df) + expect_identical(as.data.frame(read_nanoarrow(tf)), df) +}) + test_that("read_nanoarrow() works for URLs", { tf <- tempfile() on.exit(unlink(tf)) @@ -86,6 +123,15 @@ test_that("read_nanoarrow() works for URLs", { ) }) +test_that("write_nanoarrow() works for URLs", { + tf <- tempfile() + on.exit(unlink(tf)) + + df <- data.frame(letters = letters) + expect_identical(write_nanoarrow(df, paste0("file://", tf)), df) + expect_identical(as.data.frame(read_nanoarrow(tf)), df) +}) + test_that("read_nanoarrow() works for compressed .gz file paths", { tf <- tempfile(fileext = ".gz") on.exit(unlink(tf)) @@ -101,6 +147,15 @@ test_that("read_nanoarrow() works for compressed .gz file paths", { ) }) +test_that("write_nanoarrow() works for compressed .gz file paths", { + tf <- tempfile(fileext = ".gz") + on.exit(unlink(tf)) + + df <- data.frame(letters = letters) + expect_identical(write_nanoarrow(df, tf), df) + expect_identical(as.data.frame(read_nanoarrow(tf)), df) +}) + test_that("read_nanoarrow() works for compressed .bz2 file paths", { tf <- tempfile(fileext = ".bz2") on.exit(unlink(tf)) @@ -116,6 +171,15 @@ test_that("read_nanoarrow() works for compressed .bz2 file paths", { ) }) +test_that("write_nanoarrow() works for compressed .bz2 file paths", { + tf <- tempfile(fileext = ".bz2") + on.exit(unlink(tf)) + + df <- data.frame(letters = letters) + expect_identical(write_nanoarrow(df, tf), df) + expect_identical(as.data.frame(read_nanoarrow(tf)), df) +}) + test_that("read_nanoarrow() works for compressed .zip file paths", { tf <- tempfile(fileext = ".zip") tdir <- tempfile() @@ -144,6 +208,17 @@ test_that("read_nanoarrow() works for compressed .zip file paths", { ) }) +test_that("write_nanoarrow() errors for compressed .zip file paths", { + tf <- tempfile(fileext = ".zip") + on.exit(unlink(tf)) + + df <- data.frame(letters = letters) + expect_error( + write_nanoarrow(df, tf), + "zip compression not supported" + ) +}) + test_that("read_nanoarrow() errors for compressed URL paths", { expect_error( read_nanoarrow("https://something.zip"), @@ -151,11 +226,15 @@ test_that("read_nanoarrow() errors for compressed URL paths", { ) }) -test_that("read_nanoarrow() errors for input with length != 1", { +test_that("read|write_nanoarrow() errors for input with length != 1", { expect_error( read_nanoarrow(character(0)), "Can't interpret character" ) + expect_error( + write_nanoarrow(data.frame(), character(0)), + "Can't interpret character" + ) }) test_that("read_nanoarrow() errors zip archives that contain files != 1", { @@ -187,7 +266,7 @@ test_that("read_nanoarrow() reports errors from readBin", { writeLines("this is not a binary file", tf) con <- file(tf, open = "r") - on.exit(close(con)) + on.exit(close(con), add = TRUE) expect_error( read_nanoarrow(con), @@ -195,6 +274,18 @@ test_that("read_nanoarrow() reports errors from readBin", { ) }) +test_that("write_nanoarrow() reports errors from writeBin", { + tf <- tempfile() + on.exit(unlink(tf)) + con <- file(tf, open = "w") + on.exit(close(con), add = TRUE) + + expect_error( + write_nanoarrow(data.frame(), con), + "R execution error" + ) +}) + test_that("read_nanoarrow() respects lazy argument", { expect_error( read_nanoarrow(raw(0), lazy = FALSE),