diff --git a/DESCRIPTION b/DESCRIPTION index 67067bd29..ddb100e19 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: nanonext Type: Package Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library -Version: 0.10.2.9006 +Version: 0.10.2.9007 Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library providing high-performance scalability protocols, a cross-platform standard for messaging and communications. Serves as a diff --git a/NAMESPACE b/NAMESPACE index 293a9a81f..5b6ddf3dc 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -61,6 +61,7 @@ export(nano) export(ncurl) export(ncurl_aio) export(ncurl_session) +export(next_config) export(nng_error) export(nng_version) export(opt) diff --git a/NEWS.md b/NEWS.md index 5f332c246..90f7e307c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,7 +1,8 @@ -# nanonext 0.10.2.9006 (development) +# nanonext 0.10.2.9007 (development) #### New Features +* `next_config()` enables native extensions when sending using mode 'next'. Registers hook functions for custom serialization and unserialization of reference objects (such as those accessed via an external pointer). * `.until()` contains revised behaviour for this synchronisation primitive that will be ported to `until()` in a future version. The function now returns FALSE instead of TRUE if the timeout has been reached. #### Updates diff --git a/R/aio.R b/R/aio.R index c3a41d140..8c6ccb4a1 100644 --- a/R/aio.R +++ b/R/aio.R @@ -41,6 +41,8 @@ #' #' Alternatively, to stop the async operation, use \code{\link{stop_aio}}. #' +#' @inheritSection send Send Modes +#' #' @examples #' pub <- socket("pub", dial = "inproc://nanonext") #' diff --git a/R/context.R b/R/context.R index 723119e1a..a35820c1b 100644 --- a/R/context.R +++ b/R/context.R @@ -101,21 +101,19 @@ close.nanoContext <- function(con, ...) invisible(.Call(rnng_ctx_close, con)) #' @param execute a function which takes the received (converted) data as its #' first argument. Can be an anonymous function of the form \code{function(x) do(x)}. #' Additional arguments can also be passed in through '...'. -#' @param send_mode [default 'serial'] one of 'serial' to send serialised R objects, -#' 'raw' to send atomic vectors of any type as a raw byte vector, or 'next' -#' to send in a new R-compatible serialisation format. Use 'serial' to ensure -#' perfect reproducibility within R, although 'raw' must be used when -#' interfacing with external applications which do not understand R -#' serialisation. Alternatively, for performance, specify an integer position -#' in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' etc. +#' @param send_mode [default 'serial'] one of 'serial' to send serialised R +#' objects, 'raw' to send atomic vectors of any type as a raw byte vector, +#' or 'next' (see 'Send Modes' section below). Alternatively, specify an +#' integer position in the vector of choices e.g. 1L for 'serial' or 2L for +#' 'raw' etc. #' @param recv_mode [default 'serial'] mode of vector to be received - one of #' 'serial', 'character', 'complex', 'double', 'integer', 'logical', #' 'numeric', 'raw', or 'string'. The default 'serial' means a serialised R #' object, for the other modes, the raw vector received will be converted -#' into the respective mode. Note that 'string' is defined here as a character -#' scalar and is a faster alternative to 'character' for receiving a single -#' string. Alternatively, for performance, specify an integer position in -#' the vector of choices e.g. 1L for 'serial', 2L for 'character' etc. +#' into the respective mode. 'string' is a faster alternative to 'character' +#' for receiving a length 1 character string. Alternatively, specify an +#' integer position in the vector of choices e.g. 1L for 'serial', 2L for +#' 'character' etc. #' @param timeout [default NULL] integer value in milliseconds or NULL, which #' applies a socket-specific default, usually the same as no timeout. Note #' that this applies to receiving the request. The total elapsed time would @@ -136,6 +134,8 @@ close.nanoContext <- function(con, ...) invisible(.Call(rnng_ctx_close, con)) #' to be distinguishable from a possible return value. \code{\link{is_nul_byte}} #' can be used to test for a nul byte. #' +#' @inheritSection send Send Modes +#' #' @examples #' req <- socket("req", listen = "tcp://127.0.0.1:6546") #' rep <- socket("rep", dial = "tcp://127.0.0.1:6546") @@ -206,6 +206,8 @@ reply <- function(context, #' is closed when all references to the returned 'recvAio' are removed and #' the object is garbage collected. #' +#' @inheritSection send Send Modes +#' #' @examples #' req <- socket("req", listen = "tcp://127.0.0.1:6546") #' rep <- socket("rep", dial = "tcp://127.0.0.1:6546") diff --git a/R/sendrecv.R b/R/sendrecv.R index 15a9309bc..53b57116e 100644 --- a/R/sendrecv.R +++ b/R/sendrecv.R @@ -24,17 +24,14 @@ #' @param data an object (a vector, if mode = 'raw'). #' @param mode [default 'serial'] one of 'serial' to send serialised R objects, #' 'raw' to send atomic vectors of any type as a raw byte vector, or 'next' -#' to send in a new R-compatible serialisation format. For Streams, 'raw' is -#' the only option and this argument is ignored. Use 'serial' to ensure -#' perfect reproducibility within R, although 'raw' must be used when -#' interfacing with external applications which do not understand R -#' serialisation. Alternatively, for performance, specify an integer position -#' in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' etc. +#' (see 'Send Modes' section below). Alternatively, specify an integer +#' position in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' +#' etc. For Streams, 'raw' is the only option and this argument is ignored. #' @param block [default NULL] which applies the connection default (see section -#' 'Blocking' below). Specify logical TRUE to block until successful or FALSE -#' to return immediately even if unsuccessful (e.g. if no connection is -#' available), or else an integer value specifying the maximum time to block -#' in milliseconds, after which the operation will time out. +#' 'Blocking' below). Specify logical TRUE to block until successful or +#' FALSE to return immediately even if unsuccessful (e.g. if no connection +#' is available), or else an integer value specifying the maximum time to +#' block in milliseconds, after which the operation will time out. #' #' @return Integer exit code (zero on success). #' @@ -51,9 +48,26 @@ #' #' For Streams: the default behaviour is blocking with \code{block = TRUE}. #' This will wait until the send has completed. Set a timeout to ensure that -#' the function returns under all scenarios. As the underlying implementation -#' uses an asynchronous send with a wait, it is recommended to set a positive -#' integer value for \code{block} rather than FALSE. +#' the function returns under all scenarios. As the underlying +#' implementation uses an asynchronous send with a wait, it is recommended +#' to set a positive integer value for \code{block} rather than FALSE. +#' +#' @section Send Modes: +#' +#' The default mode 'serial' sends serialised R objects to ensure perfect +#' reproducibility within R. When receiving, the corresponding mode 'serial' +#' should be used. +#' +#' Mode 'raw' sends atomic vectors of any type as a raw byte vector, and +#' must be used when interfacing with external applications or raw system +#' sockets, where R serialization is not in use. When receiving, the mode +#' corresponding to the vector sent should be used. +#' +#' Mode 'next' sends serialised R objects, with native extensions enabled by +#' \code{\link{next_config}}. This allows 'refhook' functions to be +#' registered for custom serialization and unserialization of reference +#' objects, such as those accessed via an external pointer. When receiving, +#' mode 'serial' should be used as 'next' sends are fully compatible. #' #' @seealso \code{\link{send_aio}} for asynchronous send. #' @examples @@ -90,11 +104,10 @@ send <- function(con, data, mode = c("serial", "raw", "next"), block = NULL) #' 'character', 'complex', 'double', 'integer', 'logical', 'numeric', 'raw', #' or 'string'. The default 'serial' means a serialised R object, for the #' other modes, the raw vector received will be converted into the respective -#' mode. Note that 'string' is a faster alternative to 'character' for -#' receiving a character vector of length 1. For Streams, 'serial' is not an -#' option and the default is 'character'. Alternatively, for performance, -#' specify an integer position in the vector of choices e.g. 1L for 'serial', -#' 2L for 'character' etc. +#' mode. 'string' is a faster alternative to 'character' for receiving a +#' length 1 character string. For Streams, 'serial' is not an option and the +#' default is 'character'. Alternatively, specify an integer position in the +#' vector of choices e.g. 1L for 'serial', 2L for 'character' etc. #' @param n [default 65536L] applicable to Streams only, the maximum number of #' bytes to receive. Can be an over-estimate, but note that a buffer of this #' size is reserved. diff --git a/R/utils.R b/R/utils.R index b95b324a2..d8dbd33e9 100644 --- a/R/utils.R +++ b/R/utils.R @@ -272,3 +272,49 @@ status_code <- function(x) .Call(rnng_status_code, x) #' @export #' strcat <- function(a, b) .Call(rnng_strcat, a, b) + +#' Configure Next Mode +#' +#' Configures 'next' mode by registering 'refhook' functions for serialization +#' and unserialization. This permits sending and receiving reference +#' objects, such as those accessed via an external pointer, between +#' different R sessions. +#' +#' @param inhook a function (for custom serialization). The signature for this +#' function must accept a list and return a raw vector, e.g. +#' \code{safetensors::safe_serialize}, or else NULL to reset. +#' @param outhook a function (for custom unserialization). The signature for +#' this function must accept a raw vector and return a list, e.g. +#' \code{safetensors::safe_load_file}, or else NULL to reset. +#' +#' @return Invisibly, a pairlist comprising the currently-registered 'next' +#' configuration. +#' +#' @details Calling this function without any arguments returns (invisibly) the +#' currently-registered 'next' configuration. +#' +#' Alternatively, calling this function with a configuration pairlist +#' previously returned by this function registers the supplied configuration. +#' +#' @section Refhook: +#' +#' The 'refhook' functions are a native feature of R's serialization +#' mechanism and apply to all non-system reference objects (external +#' pointers, weak references, and all environments other than namespace and +#' package environments and the Global Environment). +#' +#' @examples +#' cfg <- next_config(inhook = function(x) serialize(x, NULL), +#' outhook = unserialize) +#' cfg +#' +#' nul <- next_config(NULL, NULL) +#' print(next_config()) +#' +#' print(next_config(cfg)) +#' print(next_config(nul)) +#' +#' @export +#' +next_config <- function(inhook, outhook) + invisible(.Call(rnng_next_config, if (missing(inhook)) "" else inhook, if (missing(outhook)) "" else outhook)) diff --git a/man/next_config.Rd b/man/next_config.Rd new file mode 100644 index 000000000..82bf4bc01 --- /dev/null +++ b/man/next_config.Rd @@ -0,0 +1,55 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utils.R +\name{next_config} +\alias{next_config} +\title{Configure Next Mode} +\usage{ +next_config(inhook, outhook) +} +\arguments{ +\item{inhook}{a function (for custom serialization). The signature for this +function must accept a list and return a raw vector, e.g. +\code{safetensors::safe_serialize}, or else NULL to reset.} + +\item{outhook}{a function (for custom unserialization). The signature for +this function must accept a raw vector and return a list, e.g. +\code{safetensors::safe_load_file}, or else NULL to reset.} +} +\value{ +Invisibly, a pairlist comprising the currently-registered 'next' + configuration. +} +\description{ +Configures 'next' mode by registering 'refhook' functions for serialization + and unserialization. This permits sending and receiving reference + objects, such as those accessed via an external pointer, between + different R sessions. +} +\details{ +Calling this function without any arguments returns (invisibly) the + currently-registered 'next' configuration. + + Alternatively, calling this function with a configuration pairlist + previously returned by this function registers the supplied configuration. +} +\section{Refhook}{ + + + The 'refhook' functions are a native feature of R's serialization + mechanism and apply to all non-system reference objects (external + pointers, weak references, and all environments other than namespace and + package environments and the Global Environment). +} + +\examples{ +cfg <- next_config(inhook = function(x) serialize(x, NULL), + outhook = unserialize) +cfg + +nul <- next_config(NULL, NULL) +print(next_config()) + +print(next_config(cfg)) +print(next_config(nul)) + +} diff --git a/man/recv.Rd b/man/recv.Rd index 6dc470bd3..6f987888c 100644 --- a/man/recv.Rd +++ b/man/recv.Rd @@ -19,17 +19,16 @@ recv( 'character', 'complex', 'double', 'integer', 'logical', 'numeric', 'raw', or 'string'. The default 'serial' means a serialised R object, for the other modes, the raw vector received will be converted into the respective -mode. Note that 'string' is a faster alternative to 'character' for -receiving a character vector of length 1. For Streams, 'serial' is not an -option and the default is 'character'. Alternatively, for performance, -specify an integer position in the vector of choices e.g. 1L for 'serial', -2L for 'character' etc.} +mode. 'string' is a faster alternative to 'character' for receiving a +length 1 character string. For Streams, 'serial' is not an option and the +default is 'character'. Alternatively, specify an integer position in the +vector of choices e.g. 1L for 'serial', 2L for 'character' etc.} \item{block}{[default NULL] which applies the connection default (see section -'Blocking' below). Specify logical TRUE to block until successful or FALSE -to return immediately even if unsuccessful (e.g. if no connection is -available), or else an integer value specifying the maximum time to block -in milliseconds, after which the operation will time out.} +'Blocking' below). Specify logical TRUE to block until successful or +FALSE to return immediately even if unsuccessful (e.g. if no connection +is available), or else an integer value specifying the maximum time to +block in milliseconds, after which the operation will time out.} \item{n}{[default 65536L] applicable to Streams only, the maximum number of bytes to receive. Can be an over-estimate, but note that a buffer of this diff --git a/man/recv_aio.Rd b/man/recv_aio.Rd index 572fe4b33..b47774547 100644 --- a/man/recv_aio.Rd +++ b/man/recv_aio.Rd @@ -29,11 +29,10 @@ recv_aio_signal( 'character', 'complex', 'double', 'integer', 'logical', 'numeric', 'raw', or 'string'. The default 'serial' means a serialised R object, for the other modes, the raw vector received will be converted into the respective -mode. Note that 'string' is a faster alternative to 'character' for -receiving a character vector of length 1. For Streams, 'serial' is not an -option and the default is 'character'. Alternatively, for performance, -specify an integer position in the vector of choices e.g. 1L for 'serial', -2L for 'character' etc.} +mode. 'string' is a faster alternative to 'character' for receiving a +length 1 character string. For Streams, 'serial' is not an option and the +default is 'character'. Alternatively, specify an integer position in the +vector of choices e.g. 1L for 'serial', 2L for 'character' etc.} \item{timeout}{[default NULL] integer value in milliseconds or NULL, which applies a socket-specific default, usually the same as no timeout.} diff --git a/man/reply.Rd b/man/reply.Rd index 9d924da32..149a792b4 100644 --- a/man/reply.Rd +++ b/man/reply.Rd @@ -25,18 +25,16 @@ Additional arguments can also be passed in through '...'.} 'serial', 'character', 'complex', 'double', 'integer', 'logical', 'numeric', 'raw', or 'string'. The default 'serial' means a serialised R object, for the other modes, the raw vector received will be converted -into the respective mode. Note that 'string' is defined here as a character -scalar and is a faster alternative to 'character' for receiving a single -string. Alternatively, for performance, specify an integer position in -the vector of choices e.g. 1L for 'serial', 2L for 'character' etc.} - -\item{send_mode}{[default 'serial'] one of 'serial' to send serialised R objects, -'raw' to send atomic vectors of any type as a raw byte vector, or 'next' -to send in a new R-compatible serialisation format. Use 'serial' to ensure -perfect reproducibility within R, although 'raw' must be used when -interfacing with external applications which do not understand R -serialisation. Alternatively, for performance, specify an integer position -in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' etc.} +into the respective mode. 'string' is a faster alternative to 'character' +for receiving a length 1 character string. Alternatively, specify an +integer position in the vector of choices e.g. 1L for 'serial', 2L for +'character' etc.} + +\item{send_mode}{[default 'serial'] one of 'serial' to send serialised R +objects, 'raw' to send atomic vectors of any type as a raw byte vector, +or 'next' (see 'Send Modes' section below). Alternatively, specify an +integer position in the vector of choices e.g. 1L for 'serial' or 2L for +'raw' etc.} \item{timeout}{[default NULL] integer value in milliseconds or NULL, which applies a socket-specific default, usually the same as no timeout. Note @@ -66,6 +64,25 @@ Receive will block while awaiting a message to arrive and is usually to be distinguishable from a possible return value. \code{\link{is_nul_byte}} can be used to test for a nul byte. } +\section{Send Modes}{ + + + The default mode 'serial' sends serialised R objects to ensure perfect + reproducibility within R. When receiving, the corresponding mode 'serial' + should be used. + + Mode 'raw' sends atomic vectors of any type as a raw byte vector, and + must be used when interfacing with external applications or raw system + sockets, where R serialization is not in use. When receiving, the mode + corresponding to the vector sent should be used. + + Mode 'next' sends serialised R objects, with native extensions enabled by + \code{\link{next_config}}. This allows 'refhook' functions to be + registered for custom serialization and unserialization of reference + objects, such as those accessed via an external pointer. When receiving, + mode 'serial' should be used as 'next' sends are fully compatible. +} + \examples{ req <- socket("req", listen = "tcp://127.0.0.1:6546") rep <- socket("rep", dial = "tcp://127.0.0.1:6546") diff --git a/man/request.Rd b/man/request.Rd index 044e23382..0a32de36c 100644 --- a/man/request.Rd +++ b/man/request.Rd @@ -29,22 +29,20 @@ request_signal( \item{data}{an object (if send_mode = 'raw', a vector).} -\item{send_mode}{[default 'serial'] one of 'serial' to send serialised R objects, -'raw' to send atomic vectors of any type as a raw byte vector, or 'next' -to send in a new R-compatible serialisation format. Use 'serial' to ensure -perfect reproducibility within R, although 'raw' must be used when -interfacing with external applications which do not understand R -serialisation. Alternatively, for performance, specify an integer position -in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' etc.} +\item{send_mode}{[default 'serial'] one of 'serial' to send serialised R +objects, 'raw' to send atomic vectors of any type as a raw byte vector, +or 'next' (see 'Send Modes' section below). Alternatively, specify an +integer position in the vector of choices e.g. 1L for 'serial' or 2L for +'raw' etc.} \item{recv_mode}{[default 'serial'] mode of vector to be received - one of 'serial', 'character', 'complex', 'double', 'integer', 'logical', 'numeric', 'raw', or 'string'. The default 'serial' means a serialised R object, for the other modes, the raw vector received will be converted -into the respective mode. Note that 'string' is defined here as a character -scalar and is a faster alternative to 'character' for receiving a single -string. Alternatively, for performance, specify an integer position in -the vector of choices e.g. 1L for 'serial', 2L for 'character' etc.} +into the respective mode. 'string' is a faster alternative to 'character' +for receiving a length 1 character string. Alternatively, specify an +integer position in the vector of choices e.g. 1L for 'serial', 2L for +'character' etc.} \item{timeout}{[default NULL] integer value in milliseconds or NULL, which applies a socket-specific default, usually the same as no timeout.} @@ -88,6 +86,25 @@ Sending the request and receiving the result are both performed async, by 1. This happens asynchronously and independently of the R execution thread. } +\section{Send Modes}{ + + + The default mode 'serial' sends serialised R objects to ensure perfect + reproducibility within R. When receiving, the corresponding mode 'serial' + should be used. + + Mode 'raw' sends atomic vectors of any type as a raw byte vector, and + must be used when interfacing with external applications or raw system + sockets, where R serialization is not in use. When receiving, the mode + corresponding to the vector sent should be used. + + Mode 'next' sends serialised R objects, with native extensions enabled by + \code{\link{next_config}}. This allows 'refhook' functions to be + registered for custom serialization and unserialization of reference + objects, such as those accessed via an external pointer. When receiving, + mode 'serial' should be used as 'next' sends are fully compatible. +} + \examples{ req <- socket("req", listen = "tcp://127.0.0.1:6546") rep <- socket("rep", dial = "tcp://127.0.0.1:6546") diff --git a/man/send.Rd b/man/send.Rd index 4828aea24..de7765905 100644 --- a/man/send.Rd +++ b/man/send.Rd @@ -13,18 +13,15 @@ send(con, data, mode = c("serial", "raw", "next"), block = NULL) \item{mode}{[default 'serial'] one of 'serial' to send serialised R objects, 'raw' to send atomic vectors of any type as a raw byte vector, or 'next' -to send in a new R-compatible serialisation format. For Streams, 'raw' is -the only option and this argument is ignored. Use 'serial' to ensure -perfect reproducibility within R, although 'raw' must be used when -interfacing with external applications which do not understand R -serialisation. Alternatively, for performance, specify an integer position -in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' etc.} +(see 'Send Modes' section below). Alternatively, specify an integer +position in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' +etc. For Streams, 'raw' is the only option and this argument is ignored.} \item{block}{[default NULL] which applies the connection default (see section -'Blocking' below). Specify logical TRUE to block until successful or FALSE -to return immediately even if unsuccessful (e.g. if no connection is -available), or else an integer value specifying the maximum time to block -in milliseconds, after which the operation will time out.} +'Blocking' below). Specify logical TRUE to block until successful or +FALSE to return immediately even if unsuccessful (e.g. if no connection +is available), or else an integer value specifying the maximum time to +block in milliseconds, after which the operation will time out.} } \value{ Integer exit code (zero on success). @@ -46,9 +43,28 @@ Send data over a connection (Socket, Context or Stream). For Streams: the default behaviour is blocking with \code{block = TRUE}. This will wait until the send has completed. Set a timeout to ensure that - the function returns under all scenarios. As the underlying implementation - uses an asynchronous send with a wait, it is recommended to set a positive - integer value for \code{block} rather than FALSE. + the function returns under all scenarios. As the underlying + implementation uses an asynchronous send with a wait, it is recommended + to set a positive integer value for \code{block} rather than FALSE. +} + +\section{Send Modes}{ + + + The default mode 'serial' sends serialised R objects to ensure perfect + reproducibility within R. When receiving, the corresponding mode 'serial' + should be used. + + Mode 'raw' sends atomic vectors of any type as a raw byte vector, and + must be used when interfacing with external applications or raw system + sockets, where R serialization is not in use. When receiving, the mode + corresponding to the vector sent should be used. + + Mode 'next' sends serialised R objects, with native extensions enabled by + \code{\link{next_config}}. This allows 'refhook' functions to be + registered for custom serialization and unserialization of reference + objects, such as those accessed via an external pointer. When receiving, + mode 'serial' should be used as 'next' sends are fully compatible. } \examples{ diff --git a/man/send_aio.Rd b/man/send_aio.Rd index 65ccd97cc..95498d529 100644 --- a/man/send_aio.Rd +++ b/man/send_aio.Rd @@ -13,12 +13,9 @@ send_aio(con, data, mode = c("serial", "raw", "next"), timeout = NULL) \item{mode}{[default 'serial'] one of 'serial' to send serialised R objects, 'raw' to send atomic vectors of any type as a raw byte vector, or 'next' -to send in a new R-compatible serialisation format. For Streams, 'raw' is -the only option and this argument is ignored. Use 'serial' to ensure -perfect reproducibility within R, although 'raw' must be used when -interfacing with external applications which do not understand R -serialisation. Alternatively, for performance, specify an integer position -in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' etc.} +(see 'Send Modes' section below). Alternatively, specify an integer +position in the vector of choices e.g. 1L for 'serial' or 2L for 'raw' +etc. For Streams, 'raw' is the only option and this argument is ignored.} \item{timeout}{[default NULL] integer value in milliseconds or NULL, which applies a socket-specific default, usually the same as no timeout.} @@ -43,6 +40,25 @@ Async send is always non-blocking and returns a 'sendAio' Alternatively, to stop the async operation, use \code{\link{stop_aio}}. } +\section{Send Modes}{ + + + The default mode 'serial' sends serialised R objects to ensure perfect + reproducibility within R. When receiving, the corresponding mode 'serial' + should be used. + + Mode 'raw' sends atomic vectors of any type as a raw byte vector, and + must be used when interfacing with external applications or raw system + sockets, where R serialization is not in use. When receiving, the mode + corresponding to the vector sent should be used. + + Mode 'next' sends serialised R objects, with native extensions enabled by + \code{\link{next_config}}. This allows 'refhook' functions to be + registered for custom serialization and unserialization of reference + objects, such as those accessed via an external pointer. When receiving, + mode 'serial' should be used as 'next' sends are fully compatible. +} + \examples{ pub <- socket("pub", dial = "inproc://nanonext") diff --git a/src/core.c b/src/core.c index d487e6ec4..bd26e8a53 100644 --- a/src/core.c +++ b/src/core.c @@ -127,6 +127,48 @@ SEXP rawToChar(unsigned char *buf, const size_t sz) { } +static SEXP nano_inHook(SEXP x, SEXP fun) { + + SEXP list, names, out; + R_xlen_t xlen; + if (fun == R_NilValue) { + xlen = 0; + list = Rf_allocVector(VECSXP, 1); + } else { + xlen = Rf_xlength(fun); + list = Rf_lengthgets(fun, xlen + 1); + } + PROTECT(list); + SET_VECTOR_ELT(list, xlen, x); + PROTECT(names = Rf_getAttrib(list, R_NamesSymbol)); + char *idx = R_alloc(sizeof(char), NANONEXT_SERIAL_MAXLEN); + snprintf(idx, NANONEXT_SERIAL_MAXLEN, "%ld", xlen + 1); + PROTECT(out = Rf_mkChar(idx)); + if (names == R_NilValue) { + names = Rf_ScalarString(out); + } else { + SET_STRING_ELT(names, xlen, out); + } + Rf_namesgets(list, names); + if (xlen) + R_ReleaseObject(nano_refList); + + UNPROTECT(3); + R_PreserveObject(nano_refList = list); + return Rf_ScalarString(out); + +} + +static SEXP nano_outHook(SEXP x, SEXP fun) { + + SEXP index; + index = Rf_coerceVector(x, INTSXP); + int idx = INTEGER(index)[0] - 1; + + return VECTOR_ELT(fun, idx); + +} + void nano_serialize(nano_buf *buf, SEXP object) { NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE); @@ -156,7 +198,7 @@ void nano_serialize_next(nano_buf *buf, SEXP object) { NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE); buf->buf[0] = 7u; - buf->cur += 4; + buf->cur += 8; struct R_outpstream_st output_stream; @@ -171,12 +213,36 @@ void nano_serialize_next(nano_buf *buf, SEXP object) { NANONEXT_SERIAL_VER, nano_write_char, nano_write_bytes, - NULL, - R_NilValue + nano_inHook, + nano_refList ); R_Serialize(object, &output_stream); + *((int *) (buf->buf + 1)) = (int) buf->cur; + + if (nano_refList != R_NilValue) { + SEXP call, out; + PROTECT(call = Rf_lcons(CAR(nano_refHook), Rf_cons(nano_refList, R_NilValue))); + PROTECT(out = Rf_eval(call, R_GlobalEnv)); + if (TYPEOF(out) != RAWSXP) { + R_ReleaseObject(nano_refList); + nano_refList = R_NilValue; + Rf_error("serialization refhook did not return a raw vector"); + } + R_xlen_t xlen = XLENGTH(out); + if (buf->cur + xlen > buf->len) { + buf->len = buf->cur + xlen; + buf->buf = R_Realloc(buf->buf, buf->len, unsigned char); + } + memcpy(buf->buf + buf->cur, STDVEC_DATAPTR(out), xlen); + buf->cur += xlen; + + UNPROTECT(2); + R_ReleaseObject(nano_refList); + nano_refList = R_NilValue; + } + } void nano_serialize_xdr(nano_buf *buf, SEXP object) { @@ -206,13 +272,28 @@ SEXP nano_unserialize(unsigned char *buf, const size_t sz) { case 66: case 88: cur = 0; break; - case 7: - cur = 4; break; + case 7: ; + SEXP raw, call; + const int offset = *(int *) (buf + 1); + if (sz > offset) { + PROTECT(raw = Rf_allocVector(RAWSXP, sz - offset)); + memcpy(STDVEC_DATAPTR(raw), buf + offset, sz - offset); + PROTECT(call = Rf_lcons(CADR(nano_refHook), Rf_cons(raw, R_NilValue))); + nano_refList = Rf_eval(call, R_GlobalEnv); + if (TYPEOF(nano_refList) != VECSXP) { + nano_refList = R_NilValue; + Rf_error("unserialization refhook did not return a list"); + } + R_PreserveObject(nano_refList); + UNPROTECT(2); + } + cur = 8; break; default: Rf_warning("received data could not be unserialized"); return nano_decode(buf, sz, 8); } + SEXP out; nano_buf nbuf; struct R_inpstream_st input_stream; @@ -226,11 +307,15 @@ SEXP nano_unserialize(unsigned char *buf, const size_t sz) { R_pstream_any_format, nano_read_char, nano_read_bytes, - NULL, - R_NilValue + cur ? nano_outHook : NULL, + nano_refList ); - return R_Unserialize(&input_stream); + PROTECT(out = R_Unserialize(&input_stream)); + R_ReleaseObject(nano_refList); + nano_refList = R_NilValue; + UNPROTECT(1); + return out; } @@ -1327,3 +1412,31 @@ void rnng_fini(void) { nng_fini(); } + +SEXP rnng_next_config(SEXP infun, SEXP outfun) { + + switch(TYPEOF(infun)) { + case LISTSXP: + if (Rf_xlength(infun) == 2) + rnng_next_config(CAR(infun), CADR(infun)); + break; + case CLOSXP: + case BUILTINSXP: + case SPECIALSXP: + case NILSXP: + SETCAR(nano_refHook, infun); + break; + } + + switch(TYPEOF(outfun)) { + case CLOSXP: + case BUILTINSXP: + case SPECIALSXP: + case NILSXP: + SETCADR(nano_refHook, outfun); + break; + } + + return Rf_shallow_duplicate(nano_refHook); + +} diff --git a/src/init.c b/src/init.c index 93e093008..3096b5cc8 100644 --- a/src/init.c +++ b/src/init.c @@ -51,6 +51,8 @@ SEXP nano_error; SEXP nano_ncurlAio; SEXP nano_ncurlSession; SEXP nano_recvAio; +SEXP nano_refHook; +SEXP nano_refList; SEXP nano_sendAio; SEXP nano_success; SEXP nano_unresolved; @@ -106,6 +108,7 @@ static void PreserveObjects(void) { SET_TAG(nano_ncurlSession, R_ClassSymbol); R_PreserveObject(nano_recvAio = Rf_cons(Rf_mkString("recvAio"), R_NilValue)); SET_TAG(nano_recvAio, R_ClassSymbol); + R_PreserveObject(nano_refHook = Rf_list2(R_NilValue, R_NilValue)); R_PreserveObject(nano_sendAio = Rf_cons(Rf_mkString("sendAio"), R_NilValue)); SET_TAG(nano_sendAio, R_ClassSymbol); R_PreserveObject(nano_success = Rf_ScalarInteger(0)); @@ -117,6 +120,7 @@ static void ReleaseObjects(void) { R_ReleaseObject(nano_unresolved); R_ReleaseObject(nano_success); R_ReleaseObject(nano_sendAio); + R_ReleaseObject(nano_refHook); R_ReleaseObject(nano_recvAio); R_ReleaseObject(nano_ncurlSession); R_ReleaseObject(nano_ncurlAio); @@ -169,6 +173,7 @@ static const R_CallMethodDef callMethods[] = { {"rnng_ncurl_session", (DL_FUNC) &rnng_ncurl_session, 8}, {"rnng_ncurl_session_close", (DL_FUNC) &rnng_ncurl_session_close, 1}, {"rnng_ncurl_transact", (DL_FUNC) &rnng_ncurl_transact, 1}, + {"rnng_next_config", (DL_FUNC) &rnng_next_config, 2}, {"rnng_pipe_notify", (DL_FUNC) &rnng_pipe_notify, 6}, {"rnng_protocol_open", (DL_FUNC) &rnng_protocol_open, 2}, {"rnng_random", (DL_FUNC) &rnng_random, 2}, @@ -212,6 +217,7 @@ static const R_ExternalMethodDef externalMethods[] = { void attribute_visible R_init_nanonext(DllInfo* dll) { RegisterSymbols(); PreserveObjects(); + nano_refList = R_NilValue; #if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6 nng_mtx_alloc(&shr_mtx); #endif diff --git a/src/nanonext.h b/src/nanonext.h index 7de97a8fd..89502c166 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -163,6 +163,7 @@ typedef struct nano_cv_duo_s { #define ERROR_RET(xc) { Rf_warning("%d | %s", xc, nng_strerror(xc)); return mk_error(xc); } #define NANONEXT_INIT_BUFSIZE 16384 #define NANONEXT_SERIAL_VER 3 +#define NANONEXT_SERIAL_MAXLEN 65536 #define NANO_ALLOC(x, sz) \ (x)->buf = R_Calloc(sz, unsigned char); \ (x)->len = (R_xlen_t) sz; \ @@ -226,6 +227,8 @@ extern SEXP nano_error; extern SEXP nano_ncurlAio; extern SEXP nano_ncurlSession; extern SEXP nano_recvAio; +extern SEXP nano_refHook; +extern SEXP nano_refList; extern SEXP nano_sendAio; extern SEXP nano_success; extern SEXP nano_unresolved; @@ -269,6 +272,7 @@ extern SEXP rnng_ncurl_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); extern SEXP rnng_ncurl_session(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); extern SEXP rnng_ncurl_session_close(SEXP); extern SEXP rnng_ncurl_transact(SEXP); +extern SEXP rnng_next_config(SEXP, SEXP); extern SEXP rnng_pipe_notify(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); extern SEXP rnng_protocol_open(SEXP, SEXP); extern SEXP rnng_random(SEXP, SEXP); diff --git a/tests/tests.R b/tests/tests.R index 3344656d3..23659d99d 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -215,6 +215,14 @@ nanotestaio(rek <- request(req$context, c(1+3i, 4+2i), send_mode = 2L, recv_mode nanotest(is.integer(reply(ctx, execute = identity, recv_mode = 3L, send_mode = "ra", timeout = 500))) nanotest(is.complex(call_aio(rek)[["data"]])) +nanotest(length(nxt <- next_config(inhook = function(x) serialize(x, NULL), outhook = unserialize)) == 2L) +nanotest(is.integer(req$send(list(new.env(), new.env()), mode = 3L, block = 500))) +nanotest(is.environment(recv(rep, block = 500)[[1L]])) +nanotest(is.pairlist(nnl <- next_config(NULL, NULL))) +nanotest(is.function(next_config(nxt)[[1L]])) +nanotest(is.pairlist(next_config(nnl))) +nanotestn(unlist(next_config())) + nanotest(inherits(cv <- cv(), "conditionVariable")) nanotestp(cv) nanotest(!.until(cv, 10L))