diff --git a/src/thread.c b/src/thread.c index 0ec191cc4..c1a6d6ad8 100644 --- a/src/thread.c +++ b/src/thread.c @@ -193,6 +193,9 @@ SEXP rnng_messenger_thread_create(SEXP args) { // threaded functions ---------------------------------------------------------- +// # nocov start +// tested interactively + static void thread_aio_finalizer(SEXP xptr) { if (NANO_PTR(xptr) == NULL) return; @@ -209,6 +212,82 @@ static void thread_aio_finalizer(SEXP xptr) { } + +static void rnng_wait_thread_single(void *args) { + + nano_thread_aio *taio = (nano_thread_aio *) args; + nano_cv *ncv = taio->cv; + nng_mtx *mtx = ncv->mtx; + nng_cv *cv = ncv->cv; + + nng_aio_wait(taio->aio); + + nng_mtx_lock(mtx); + ncv->condition = 1; + nng_cv_wake(cv); + nng_mtx_unlock(mtx); + +} + +void single_wait_thread_create(SEXP x) { + + nano_aio *aiop = (nano_aio *) NANO_PTR(x); + nano_thread_aio *taio = R_Calloc(1, nano_thread_aio); + nano_cv *ncv = R_Calloc(1, nano_cv); + taio->aio = aiop->aio; + taio->cv = ncv; + nng_mtx *mtx; + nng_cv *cv; + int xc, signalled; + + if ((xc = nng_mtx_alloc(&mtx))) + goto exitlevel1; + + if ((xc = nng_cv_alloc(&cv, mtx))) + goto exitlevel2; + + ncv->mtx = mtx; + ncv->cv = cv; + + if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio))) + goto exitlevel3; + + SEXP xptr; + PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue)); + R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE); + R_MakeWeakRef(x, xptr, R_NilValue, TRUE); + UNPROTECT(1); + + nng_time time = nng_clock(); + + while (1) { + time = time + 400; + signalled = 1; + nng_mtx_lock(mtx); + while (ncv->condition == 0) { + if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) { + signalled = 0; + break; + } + } + nng_mtx_unlock(mtx); + if (signalled) break; + R_CheckUserInterrupt(); + } + + return; + + exitlevel3: + nng_cv_free(cv); + exitlevel2: + nng_mtx_free(mtx); + exitlevel1: + ERROR_OUT(xc); + +} + +// # nocov end + static void thread_duo_finalizer(SEXP xptr) { if (NANO_PTR(xptr) == NULL) return; @@ -286,79 +365,6 @@ static void rnng_wait_thread(void *args) { } -static void rnng_wait_thread_single(void *args) { - - nano_thread_aio *taio = (nano_thread_aio *) args; - nano_cv *ncv = taio->cv; - nng_mtx *mtx = ncv->mtx; - nng_cv *cv = ncv->cv; - - nng_aio_wait(taio->aio); - - nng_mtx_lock(mtx); - ncv->condition = 1; - nng_cv_wake(cv); - nng_mtx_unlock(mtx); - -} - -void single_wait_thread_create(SEXP x) { - - nano_aio *aiop = (nano_aio *) NANO_PTR(x); - nano_thread_aio *taio = R_Calloc(1, nano_thread_aio); - nano_cv *ncv = R_Calloc(1, nano_cv); - taio->aio = aiop->aio; - taio->cv = ncv; - nng_mtx *mtx; - nng_cv *cv; - int xc, signalled; - - if ((xc = nng_mtx_alloc(&mtx))) - goto exitlevel1; - - if ((xc = nng_cv_alloc(&cv, mtx))) - goto exitlevel2; - - ncv->mtx = mtx; - ncv->cv = cv; - - if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio))) - goto exitlevel3; - - SEXP xptr; - PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue)); - R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE); - R_MakeWeakRef(x, xptr, R_NilValue, TRUE); - UNPROTECT(1); - - nng_time time = nng_clock(); - - while (1) { - time = time + 400; - signalled = 1; - nng_mtx_lock(mtx); - while (ncv->condition == 0) { - if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) { - signalled = 0; - break; - } - } - nng_mtx_unlock(mtx); - if (signalled) break; - R_CheckUserInterrupt(); - } - - return; - - exitlevel3: - nng_cv_free(cv); - exitlevel2: - nng_mtx_free(mtx); - exitlevel1: - ERROR_OUT(xc); - -} - SEXP rnng_wait_thread_create(SEXP x) { const SEXPTYPE typ = TYPEOF(x);