Skip to content

Commit

Permalink
introduce new until() behaviour in .until()
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Oct 20, 2023
1 parent 8e597f5 commit 331b457
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 9 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 0.10.2.9005
Version: 0.10.2.9006
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library providing high-performance scalability protocols, a
cross-platform standard for messaging and communications. Serves as a
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ S3method(start,nanoListener)
export("opt<-")
export(.context)
export(.unresolved)
export(.until)
export(base64dec)
export(base64enc)
export(call_aio)
Expand Down
6 changes: 5 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# nanonext 0.10.2.9005 (development)
# nanonext 0.10.2.9006 (development)

#### New Features

* `.until()` contains revised behaviour for this synchronisation primitive that will be ported to `until()` in a future version. The function now returns FALSE instead of TRUE if the timeout has been reached.

#### Updates

Expand Down
12 changes: 10 additions & 2 deletions R/sync.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#' For \strong{wait} and \strong{until}: (invisibly) logical TRUE, or else
#' FALSE if a flag has been set.
#'
#' For \strong{.until}: (invisibly) logical TRUE if signalled, or else FALSE
#' if the timeout was reached.
#'
#' For \strong{cv_value}: integer value of the condition variable.
#'
#' For \strong{cv_reset} and \strong{cv_signal}: zero (invisibly).
Expand Down Expand Up @@ -60,8 +63,8 @@
#'
#' The condition variable also contains a flag that certain signalling
#' functions such as \code{\link{pipe_notify}} can set. When this flag has
#' been set, all subsequent \code{wait} or \code{until} calls will return
#' logical FALSE instead of TRUE.
#' been set, all subsequent \code{wait} calls will return logical FALSE
#' instead of TRUE.
#'
#' Note that the flag is not automatically reset, but may be reset manually
#' using \code{cv_reset}.
Expand Down Expand Up @@ -104,6 +107,11 @@ wait <- function(cv) invisible(.Call(rnng_cv_wait, cv))
#'
until <- function(cv, msec) invisible(.Call(rnng_cv_until, cv, msec))

#' @rdname cv
#' @export
#'
.until <- function(cv, msec) invisible(.Call(rnng_cv_until2, cv, msec))

#' Condition Variables - Value
#'
#' \code{cv_value} inspects the internal value of a condition variable.
Expand Down
10 changes: 8 additions & 2 deletions man/cv.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,40 @@ SEXP rnng_cv_until(SEXP cvar, SEXP msec) {

}

SEXP rnng_cv_until2(SEXP cvar, SEXP msec) {

if (R_ExternalPtrTag(cvar) != nano_CvSymbol)
Rf_error("'cv' is not a valid Condition Variable");

nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar);
nng_cv *cv = ncv->cv;
nng_mtx *mtx = ncv->mtx;

nng_time time = nng_clock();
switch (TYPEOF(msec)) {
case INTSXP:
time = time + (nng_time) INTEGER(msec)[0];
break;
case REALSXP:
time = time + (nng_time) Rf_asInteger(msec);
break;
}

uint8_t signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
if (signalled) ncv->condition--;
nng_mtx_unlock(mtx);

return Rf_ScalarLogical(signalled);

}

SEXP rnng_cv_reset(SEXP cvar) {

if (R_ExternalPtrTag(cvar) != nano_CvSymbol)
Expand Down
1 change: 1 addition & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_cv_reset", (DL_FUNC) &rnng_cv_reset, 1},
{"rnng_cv_signal", (DL_FUNC) &rnng_cv_signal, 1},
{"rnng_cv_until", (DL_FUNC) &rnng_cv_until, 2},
{"rnng_cv_until2", (DL_FUNC) &rnng_cv_until2, 2},
{"rnng_cv_value", (DL_FUNC) &rnng_cv_value, 1},
{"rnng_cv_wait", (DL_FUNC) &rnng_cv_wait, 1},
{"rnng_dial", (DL_FUNC) &rnng_dial, 5},
Expand Down
1 change: 1 addition & 0 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ extern SEXP rnng_cv_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP rnng_cv_reset(SEXP);
extern SEXP rnng_cv_signal(SEXP);
extern SEXP rnng_cv_until(SEXP, SEXP);
extern SEXP rnng_cv_until2(SEXP, SEXP);
extern SEXP rnng_cv_value(SEXP);
extern SEXP rnng_cv_wait(SEXP);
extern SEXP rnng_dial(SEXP, SEXP, SEXP, SEXP, SEXP);
Expand Down
6 changes: 3 additions & 3 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ nanotest(is.complex(call_aio(rek)[["data"]]))

nanotest(inherits(cv <- cv(), "conditionVariable"))
nanotestp(cv)
nanotest(until(cv, 10L))
nanotest(until(cv, 10))
nanotest(!.until(cv, 10L))
nanotest(!.until(cv, 10))
nanotestz(cv_reset(cv))
nanotestz(cv_value(cv))
nanotestaio(cs <- request_signal(req$context, "test", send_mode = "next", cv = cv, timeout = 500))
Expand All @@ -239,7 +239,7 @@ nanotesterr(recv_aio_signal(err, cv = cv, timeout = 500))
nanotesterr(request_signal(.context(req$socket), "test", cv = err), "valid")
nanotesterr(recv_aio_signal(rep, "test", cv = err), "valid")
nanotesterr(wait(err), "valid")
nanotesterr(until(err, 10), "valid")
nanotesterr(.until(err, 10), "valid")
nanotesterr(cv_value(err), "valid")
nanotesterr(cv_reset(err), "valid")
nanotesterr(cv_signal(err), "valid")
Expand Down

0 comments on commit 331b457

Please sign in to comment.