Skip to content

Commit

Permalink
lock() adds argument 'decr'
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Oct 17, 2023
1 parent c2a9e15 commit b9b6594
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 12 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 0.10.2.9003
Version: 0.10.2.9004
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library providing high-performance scalability protocols, a
cross-platform standard for messaging and communications. Serves as a
Expand Down
6 changes: 5 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# nanonext 0.10.2.9003 (development)
# nanonext 0.10.2.9004 (development)

#### New Features

* `lock()` adds logical argument 'decr', which allows the socket locking mechanism to work correctly with 'conditionVariables' for which a pipe removal notification has been registered.

#### Updates

Expand Down
12 changes: 10 additions & 2 deletions R/sync.R
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,19 @@ pipe_notify <- function(socket, cv, cv2 = NULL, add = TRUE, remove = TRUE, flag

#' Lock / Unlock a Socket
#'
#' Prevents further pipe connections from being established at a Socket.
#' Prevents further pipe connections from being established at a Socket. If a
#' socket is locked, new pipe connections are closed before they can be
#' added to the socket.
#'
#' @param socket a Socket.
#' @param cv (optional) a 'conditionVariable'. If supplied, the socket is locked
#' only while the value of the condition variable is non-zero.
#' @param decr [default FALSE] applicable only if 'cv' is specified, should be
#' set to TRUE only if a \code{\link{pipe_notify}} event on removal has been
#' registered on 'cv'. In this case, the counter within the
#' 'conditionVariable' is decremented to offset the increment that is
#' registered for the pipe removal, even though this occurs before it has
#' been added to the socket.
#'
#' @return Invisibly, zero on success (will otherwise error).
#'
Expand All @@ -229,7 +237,7 @@ pipe_notify <- function(socket, cv, cv2 = NULL, add = TRUE, remove = TRUE, flag
#'
#' @export
#'
lock <- function(socket, cv = NULL) invisible(.Call(rnng_socket_lock, socket, cv))
lock <- function(socket, cv = NULL, decr = FALSE) invisible(.Call(rnng_socket_lock, socket, cv, decr))

#' @rdname lock
#' @export
Expand Down
13 changes: 11 additions & 2 deletions man/lock.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 30 additions & 4 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,32 @@ static void pipe_cb_dropcon(nng_pipe p, nng_pipe_ev ev, void *arg) {

if (arg != NULL) {
nano_cv *ncv = (nano_cv *) arg;
if (ncv->condition)
nng_mtx *mtx = ncv->mtx;
int cond;
nng_mtx_lock(mtx);
cond = ncv->condition;
nng_mtx_unlock(mtx);
if (cond)
nng_pipe_close(p);

} else {
nng_pipe_close(p);
}

}

static void pipe_cb_dropcon_decr(nng_pipe p, nng_pipe_ev ev, void *arg) {

if (arg != NULL) {
nano_cv *ncv = (nano_cv *) arg;
nng_mtx *mtx = ncv->mtx;
int cond;
nng_mtx_lock(mtx);
cond = ncv->condition;
if (cond)
ncv->condition--;
nng_mtx_unlock(mtx);
if (cond)
nng_pipe_close(p);
} else {
nng_pipe_close(p);
Expand Down Expand Up @@ -792,7 +817,7 @@ SEXP rnng_unresolved(SEXP x) {
SEXP value = Rf_findVarInFrame(x, nano_DataSymbol);
if (value == R_UnboundValue)
value = Rf_findVarInFrame(x, nano_ResultSymbol);
xc = value == nano_unresolved || value == nano_UnresSymbol || TYPEOF(value) == ENVSXP && Rf_inherits(value, "unresolvedValue");
xc = value == nano_unresolved || value == nano_UnresSymbol || (TYPEOF(value) == ENVSXP && Rf_inherits(value, "unresolvedValue"));
break;
case LGLSXP:
xc = x == nano_unresolved;
Expand Down Expand Up @@ -2002,7 +2027,7 @@ SEXP rnng_pipe_notify(SEXP socket, SEXP cv, SEXP cv2, SEXP add, SEXP remove, SEX

}

SEXP rnng_socket_lock(SEXP socket, SEXP cv) {
SEXP rnng_socket_lock(SEXP socket, SEXP cv, SEXP decrement) {

if (R_ExternalPtrTag(socket) != nano_SocketSymbol)
Rf_error("'socket' is not a valid Socket");
Expand All @@ -2012,8 +2037,9 @@ SEXP rnng_socket_lock(SEXP socket, SEXP cv) {
if (cv != R_NilValue) {
if (R_ExternalPtrTag(cv) != nano_CvSymbol)
Rf_error("'cv' is not a valid Condition Variable");
const int decr = LOGICAL(decrement)[0];
nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cv);
xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_PRE, pipe_cb_dropcon, ncv);
xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_PRE, decr ? pipe_cb_dropcon_decr : pipe_cb_dropcon, ncv);
} else {
xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_PRE, pipe_cb_dropcon, NULL);
}
Expand Down
2 changes: 1 addition & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_sha384", (DL_FUNC) &rnng_sha384, 3},
{"rnng_sha512", (DL_FUNC) &rnng_sha512, 3},
{"rnng_sleep", (DL_FUNC) &rnng_sleep, 1},
{"rnng_socket_lock", (DL_FUNC) &rnng_socket_lock, 2},
{"rnng_socket_lock", (DL_FUNC) &rnng_socket_lock, 3},
{"rnng_socket_unlock", (DL_FUNC) &rnng_socket_unlock, 1},
{"rnng_stats_get", (DL_FUNC) &rnng_stats_get, 2},
{"rnng_status_code", (DL_FUNC) &rnng_status_code, 1},
Expand Down
2 changes: 1 addition & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ extern SEXP rnng_sha256(SEXP, SEXP, SEXP);
extern SEXP rnng_sha384(SEXP, SEXP, SEXP);
extern SEXP rnng_sha512(SEXP, SEXP, SEXP);
extern SEXP rnng_sleep(SEXP);
extern SEXP rnng_socket_lock(SEXP, SEXP);
extern SEXP rnng_socket_lock(SEXP, SEXP, SEXP);
extern SEXP rnng_socket_unlock(SEXP);
extern SEXP rnng_stats_get(SEXP, SEXP);
extern SEXP rnng_status_code(SEXP);
Expand Down
1 change: 1 addition & 0 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ nanotestnano(s3 <- socket("bus", dial = "inproc://nanolock"))
nanotestz(send(s, "test", block = 500))
nanotestnn(recv(s3, block = 500))
nanotestz(lock(s, cv = cv))
nanotestz(lock(s, cv = cv, decr = TRUE))
nanotesterr(lock(cv), "valid Socket")
nanotesterr(lock(s, cv = s), "valid Condition Variable")
nanotestnano(s4 <- socket("bus", dial = "inproc://nanolock"))
Expand Down

0 comments on commit b9b6594

Please sign in to comment.