From aa64a2eb40932453642f595b0d7029edaf2be823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 18 Dec 2024 18:21:51 +0100 Subject: [PATCH] Support waiting on condition variable until a specific time point (#665) * Add z_clock_advance functions * Add z_condvar_timedwait function * Use monotonic clock in unix port of condvar * Add timeout argument * Fix build on macos * Add missing docstrings * Use POSIX implementation in arduino, espidf and zephyr ports * Add missing errno includes * Fix formatting * Implement z_clock_advance for mbed port * Implement z_clock_advance for freertos_plus_tcp port * Implement z_clock_advance for windows port * Implement z_condvar_wait_until in windows port * Implement z_condvar_wait_until for freertos_plus_tcp and mbed ports * Implement z_condvar_wait_until for rpi_pico port * Add opencv and flipper clock advance implementations * Add emscripten implementation * Return a timeout result code instead of setting timeout variable --- include/zenoh-pico/system/common/platform.h | 44 +++++++++++++++ include/zenoh-pico/utils/result.h | 1 + src/system/arduino/esp32/system.c | 40 +++++++++++++- src/system/arduino/opencr/system.c | 23 ++++++++ src/system/common/platform.c | 3 ++ src/system/emscripten/system.c | 30 ++++++++++- src/system/espidf/system.c | 40 +++++++++++++- src/system/flipper/system.c | 24 +++++++++ src/system/freertos_plus_tcp/system.c | 38 +++++++++++++ src/system/mbed/system.cpp | 52 ++++++++++++++++++ src/system/rpi_pico/system.c | 51 ++++++++++++++++++ src/system/unix/system.c | 42 ++++++++++++++- src/system/windows/system.c | 60 +++++++++++++++++++++ src/system/zephyr/system.c | 40 +++++++++++++- 14 files changed, 481 insertions(+), 7 deletions(-) diff --git a/include/zenoh-pico/system/common/platform.h b/include/zenoh-pico/system/common/platform.h index 9ab0d4fc9..4b3937abd 100644 --- a/include/zenoh-pico/system/common/platform.h +++ b/include/zenoh-pico/system/common/platform.h @@ -274,6 +274,7 @@ z_result_t _z_condvar_drop(_z_condvar_t *cv); z_result_t _z_condvar_signal(_z_condvar_t *cv); z_result_t _z_condvar_signal_all(_z_condvar_t *cv); z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m); +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime); /** * Initializes a condition variable. @@ -323,6 +324,22 @@ z_result_t z_condvar_signal(z_loaned_condvar_t *cv); */ z_result_t z_condvar_wait(z_loaned_condvar_t *cv, z_loaned_mutex_t *m); +/** + * Waits for a signal on the condition variable while holding a mutex until a specified time. + * + * The calling thread is blocked until the condition variable is signaled or the timeout occurs. + * The associated mutex must be locked by the calling thread, and it will be automatically unlocked while waiting. + * + * Parameters: + * cv: Pointer to a :c:type:`z_loaned_condvar_t` on which to wait. + * m: Pointer to a :c:type:`z_loaned_mutex_t` that will be unlocked during the wait. + * abstime: Absolute end time. + * + * Returns: + * ``0`` if the wait is successful, ``Z_ETIMEDOUT`` if a timeout occurred, other negative value otherwise. + */ +z_result_t z_condvar_wait_until(z_loaned_condvar_t *cv, z_loaned_mutex_t *m, const z_clock_t *abstime); + /*------------------ Sleep ------------------*/ /** * Suspends execution for a specified amount of time in microseconds. @@ -396,6 +413,33 @@ unsigned long z_clock_elapsed_ms(z_clock_t *time); */ unsigned long z_clock_elapsed_s(z_clock_t *time); +/** + * Offsets the clock by a specified duration in microseconds. + * + * Parameters: + * clock: Pointer to a `z_clock_t` to offset. + * duration: The duration in microseconds. + */ +void z_clock_advance_us(z_clock_t *clock, unsigned long duration); + +/** + * Offsets the clock by a specified duration in milliseconds. + * + * Parameters: + * clock: Pointer to a `z_clock_t` to offset. + * duration: The duration in milliseconds. + */ +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration); + +/** + * Offsets the clock by a specified duration in seconds. + * + * Parameters: + * clock: Pointer to a `z_clock_t` to offset. + * duration: The duration in seconds. + */ +void z_clock_advance_s(z_clock_t *clock, unsigned long duration); + /*------------------ Time ------------------*/ /** diff --git a/include/zenoh-pico/utils/result.h b/include/zenoh-pico/utils/result.h index 6b1061d21..7ac80ee1f 100644 --- a/include/zenoh-pico/utils/result.h +++ b/include/zenoh-pico/utils/result.h @@ -86,6 +86,7 @@ typedef enum { _Z_ERR_OVERFLOW = -74, _Z_ERR_SESSION_CLOSED = -73, Z_EDESERIALIZE = -72, + Z_ETIMEDOUT = -71, _Z_ERR_GENERIC = -128 } _z_res_t; diff --git a/src/system/arduino/esp32/system.c b/src/system/arduino/esp32/system.c index 77f6b3459..e0d4a630d 100644 --- a/src/system/arduino/esp32/system.c +++ b/src/system/arduino/esp32/system.c @@ -13,6 +13,7 @@ // #include +#include #include #include #include @@ -114,7 +115,12 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_try z_result_t _z_mutex_unlock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_unlock(m)); } /*------------------ Condvar ------------------*/ -z_result_t _z_condvar_init(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_init(cv, NULL)); } +z_result_t _z_condvar_init(_z_condvar_t *cv) { + pthread_condattr_t attr; + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + _Z_CHECK_SYS_ERR(pthread_cond_init(cv, &attr)); +} z_result_t _z_condvar_drop(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_destroy(cv)); } @@ -122,7 +128,15 @@ z_result_t _z_condvar_signal(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_s z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_broadcast(cv)); } -z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_cond_wait(cv, m)); } +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + int error = pthread_cond_timedwait(cv, m, abstime); + + if (error == ETIMEDOUT) { + return Z_ETIMEDOUT; + } + + _Z_CHECK_SYS_ERR(error); +} #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -177,6 +191,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) { return elapsed; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000000; + clock->tv_nsec += (duration % 1000000) * 1000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000; + clock->tv_nsec += (duration % 1000) * 1000000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/arduino/opencr/system.c b/src/system/arduino/opencr/system.c index e6ea10fce..ab49a1e5c 100644 --- a/src/system/arduino/opencr/system.c +++ b/src/system/arduino/opencr/system.c @@ -97,6 +97,7 @@ z_result_t _z_condvar_signal(_z_condvar_t *cv) { return -1; } z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { return -1; } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { return -1; } +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { return -1; } #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -160,6 +161,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) { return elapsed; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000000; + clock->tv_nsec += (duration % 1000000) * 1000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000; + clock->tv_nsec += (duration % 1000) * 1000000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/common/platform.c b/src/system/common/platform.c index 7697b9ed1..9d042a4f9 100644 --- a/src/system/common/platform.c +++ b/src/system/common/platform.c @@ -61,5 +61,8 @@ z_result_t z_condvar_drop(z_moved_condvar_t *cv) { return _z_condvar_drop(&cv->_ z_result_t z_condvar_signal(z_loaned_condvar_t *cv) { return _z_condvar_signal(cv); } z_result_t z_condvar_wait(z_loaned_condvar_t *cv, z_loaned_mutex_t *m) { return _z_condvar_wait(cv, m); } +z_result_t z_condvar_wait_until(z_loaned_condvar_t *cv, z_loaned_mutex_t *m, const z_clock_t *abstime) { + return _z_condvar_wait_until(cv, m, abstime); +} #endif // Z_FEATURE_MULTI_THREAD == 1 diff --git a/src/system/emscripten/system.c b/src/system/emscripten/system.c index 52065d2db..b224c0f6b 100644 --- a/src/system/emscripten/system.c +++ b/src/system/emscripten/system.c @@ -13,6 +13,7 @@ // #include +#include #include #include #include @@ -69,7 +70,12 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_try z_result_t _z_mutex_unlock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_unlock(m)); } /*------------------ Condvar ------------------*/ -z_result_t _z_condvar_init(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_init(cv, 0)); } +z_result_t _z_condvar_init(_z_condvar_t *cv) { + pthread_condattr_t attr; + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + _Z_CHECK_SYS_ERR(pthread_cond_init(cv, &attr)); +} z_result_t _z_condvar_drop(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_destroy(cv)); } @@ -78,6 +84,20 @@ z_result_t _z_condvar_signal(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_s z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_broadcast(cv)); } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_cond_wait(cv, m)); } + +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + struct timespec ts; + ts.tv_sec = (time_t)(*abstime / 1000); + ts.tv_nsec = (long)((*abstime - (ts.tv_sec * 1000)) * 1000000); + + int error = pthread_cond_timedwait(cv, m, &ts); + + if (error == ETIMEDOUT) { + return Z_ETIMEDOUT; + } + + _Z_CHECK_SYS_ERR(error); +} #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -111,7 +131,13 @@ unsigned long z_clock_elapsed_us(z_clock_t *instant) { return z_clock_elapsed_ms unsigned long z_clock_elapsed_ms(z_clock_t *instant) { return z_time_elapsed_ms(instant); } -unsigned long z_clock_elapsed_s(z_clock_t *instant) { return z_time_elapsed_ms(instant) * 1000; } +unsigned long z_clock_elapsed_s(z_clock_t *instant) { return z_time_elapsed_ms(instant) / 1000; } + +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { *clock += (double)(duration / 1000); } + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { *clock += (double)duration; } + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { *clock += (double)(duration * 1000); } /*------------------ Time ------------------*/ z_time_t z_time_now(void) { return emscripten_get_now(); } diff --git a/src/system/espidf/system.c b/src/system/espidf/system.c index b66bb94c2..648679a59 100644 --- a/src/system/espidf/system.c +++ b/src/system/espidf/system.c @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // +#include #include #include #include @@ -142,7 +143,12 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_try z_result_t _z_mutex_unlock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_unlock(m)); } /*------------------ Condvar ------------------*/ -z_result_t _z_condvar_init(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_init(cv, NULL)); } +z_result_t _z_condvar_init(_z_condvar_t *cv) { + pthread_condattr_t attr; + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + _Z_CHECK_SYS_ERR(pthread_cond_init(cv, &attr)); +} z_result_t _z_condvar_drop(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_destroy(cv)); } @@ -151,6 +157,16 @@ z_result_t _z_condvar_signal(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_s z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_broadcast(cv)); } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_cond_wait(cv, m)); } + +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + int error = pthread_cond_timedwait(cv, m, abstime); + + if (error == ETIMEDOUT) { + return Z_ETIMEDOUT; + } + + _Z_CHECK_SYS_ERR(error); +} #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -202,6 +218,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) { return elapsed; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000000; + clock->tv_nsec += (duration % 1000000) * 1000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000; + clock->tv_nsec += (duration % 1000) * 1000000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/flipper/system.c b/src/system/flipper/system.c index 3d27aaeb1..92c1091ca 100644 --- a/src/system/flipper/system.c +++ b/src/system/flipper/system.c @@ -153,6 +153,8 @@ z_result_t _z_condvar_signal_all(_z_condvar_t* cv) { return -1; } z_result_t _z_condvar_wait(_z_condvar_t* cv, _z_mutex_t* m) { return -1; } +z_result_t _z_condvar_wait_until(_z_condvar_t* cv, _z_mutex_t* m, const z_clock_t* abstime) { return -1; } + /*------------------ Sleep ------------------*/ z_result_t z_sleep_us(size_t time) { furi_delay_us(time); @@ -214,6 +216,28 @@ unsigned long z_clock_elapsed_s(z_clock_t* instant) { return elapsed; } +void z_clock_advance_us(z_clock_t* clock, unsigned long duration) { + clock->tv_sec += duration / 1000000; + clock->tv_nsec += (duration % 1000000) * 1000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_ms(z_clock_t* clock, unsigned long duration) { + clock->tv_sec += duration / 1000; + clock->tv_nsec += (duration % 1000) * 1000000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_s(z_clock_t* clock, unsigned long duration) { clock->tv_sec += duration; } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/freertos_plus_tcp/system.c b/src/system/freertos_plus_tcp/system.c index 632a9b9c5..c1740843d 100644 --- a/src/system/freertos_plus_tcp/system.c +++ b/src/system/freertos_plus_tcp/system.c @@ -264,6 +264,35 @@ z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { return _Z_RES_OK; } + +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + if (!cv || !m) { + return _Z_ERR_GENERIC; + } + + TickType_t now = xTaskGetTickCount(); + TickType_t target_time = *abstime; + TickType_t block_duration = (target_time > now) ? (target_time - now) : 0; + + xSemaphoreTake(cv->mutex, portMAX_DELAY); + cv->waiters++; + xSemaphoreGive(cv->mutex); + + _z_mutex_unlock(m); + + bool timed_out = xSemaphoreTake(cv->sem, block_duration) == pdFALSE; + + _z_mutex_lock(m); + + if (timed_out) { + xSemaphoreTake(cv->mutex, portMAX_DELAY); + cv->waiters--; + xSemaphoreGive(cv->mutex); + return Z_ETIMEDOUT; + } + + return _Z_RES_OK; +} #endif // Z_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -296,6 +325,15 @@ unsigned long z_clock_elapsed_ms(z_clock_t *instant) { unsigned long z_clock_elapsed_s(z_clock_t *instant) { return z_clock_elapsed_ms(instant) / 1000; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { z_clock_advance_ms(clock, duration / 1000); } + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + unsigned long ticks = pdMS_TO_TICKS(duration); + *clock += ticks; +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { z_clock_advance_ms(clock, duration * 1000); } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/mbed/system.cpp b/src/system/mbed/system.cpp index 92ed064ac..5ab66242c 100644 --- a/src/system/mbed/system.cpp +++ b/src/system/mbed/system.cpp @@ -171,6 +171,36 @@ z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { return _Z_RES_OK; } + +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + if (!cv || !m) { + return _Z_ERR_GENERIC; + } + + auto &cond_var = *(condvar *)*cv; + + auto target_time = + Kernel::Clock::time_point(Kernel::Clock::duration(abstime->tv_sec * 1000LL + abstime->tv_nsec / 1000000)); + + cond_var.mutex.lock(); + cond_var.waiters++; + cond_var.mutex.unlock(); + + _z_mutex_unlock(m); + + bool timed_out = cond_var.sem.try_acquire_until(target_time) == false; + + _z_mutex_lock(m); + + if (timed_out) { + cond_var.mutex.lock(); + cond_var.waiters--; + cond_var.mutex.unlock(); + return Z_ETIMEDOUT; + } + + return _Z_RES_OK; +} #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -222,6 +252,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) { return elapsed; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000000; + clock->tv_nsec += (duration % 1000000) * 1000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000; + clock->tv_nsec += (duration % 1000) * 1000000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/rpi_pico/system.c b/src/system/rpi_pico/system.c index eb7f6c4cc..26f3b011a 100644 --- a/src/system/rpi_pico/system.c +++ b/src/system/rpi_pico/system.c @@ -206,6 +206,35 @@ z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { return _z_mutex_lock(m); } + +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + if (!cv || !m) { + return _Z_ERR_GENERIC; + } + + TickType_t now = xTaskGetTickCount(); + TickType_t target_time = pdMS_TO_TICKS(abstime->tv_sec * 1000 + abstime->tv_nsec / 1000000); + TickType_t block_duration = (target_time > now) ? (target_time - now) : 0; + + xSemaphoreTake(cv->mutex, portMAX_DELAY); + cv->waiters++; + xSemaphoreGive(cv->mutex); + + _z_mutex_unlock(m); + + bool timed_out = xSemaphoreTake(cv->sem, block_duration) == pdFALSE; + + _z_mutex_lock(m); + + if (timed_out) { + xSemaphoreTake(cv->mutex, portMAX_DELAY); + cv->waiters--; + xSemaphoreGive(cv->mutex); + return Z_ETIMEDOUT; + } + + return _Z_RES_OK; +} #endif // Z_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -263,6 +292,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) { return elapsed; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000000; + clock->tv_nsec += (duration % 1000000) * 1000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000; + clock->tv_nsec += (duration % 1000) * 1000000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/unix/system.c b/src/system/unix/system.c index bd28018b7..2fcf2b3c8 100644 --- a/src/system/unix/system.c +++ b/src/system/unix/system.c @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // +#include #include #include #include @@ -127,7 +128,14 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_try z_result_t _z_mutex_unlock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_unlock(m)); } /*------------------ Condvar ------------------*/ -z_result_t _z_condvar_init(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_init(cv, 0)); } +z_result_t _z_condvar_init(_z_condvar_t *cv) { + pthread_condattr_t attr; + pthread_condattr_init(&attr); +#ifndef ZENOH_MACOS + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); +#endif + _Z_CHECK_SYS_ERR(pthread_cond_init(cv, &attr)); +} z_result_t _z_condvar_drop(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_destroy(cv)); } @@ -136,6 +144,16 @@ z_result_t _z_condvar_signal(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_s z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_broadcast(cv)); } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_cond_wait(cv, m)); } + +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + int error = pthread_cond_timedwait(cv, m, abstime); + + if (error == ETIMEDOUT) { + return Z_ETIMEDOUT; + } + + _Z_CHECK_SYS_ERR(error); +} #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -192,6 +210,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) { return elapsed; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000000; + clock->tv_nsec += (duration % 1000000) * 1000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000; + clock->tv_nsec += (duration % 1000) * 1000000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/windows/system.c b/src/system/windows/system.c index 51acd4146..78086a6c0 100644 --- a/src/system/windows/system.c +++ b/src/system/windows/system.c @@ -158,6 +158,30 @@ z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { SleepConditionVariableSRW(cv, m, INFINITE, 0); return ret; } + +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + z_clock_t now = z_clock_now(); + LARGE_INTEGER frequency; + QueryPerformanceFrequency(&frequency); // ticks per second + + // Hardware not supporting QueryPerformanceFrequency + if (frequency.QuadPart == 0) { + return _Z_ERR_GENERIC; + } + + double remaining = (double)(abstime->QuadPart - now.QuadPart) / frequency.QuadPart * 1000.0; + DWORD block_duration = remaining > 0.0 ? (DWORD)remaining : 0; + + if (SleepConditionVariableSRW(cv, m, block_duration, 0) == 0) { + if (GetLastError() == ERROR_TIMEOUT) { + return Z_ETIMEDOUT; + } else { + return _Z_ERR_GENERIC; + } + } + + return _Z_RES_OK; +} #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -237,6 +261,42 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) { return (unsigned long)elapsed; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { + LARGE_INTEGER frequency; + QueryPerformanceFrequency(&frequency); // ticks per second + + // Hardware not supporting QueryPerformanceFrequency + if (frequency.QuadPart == 0) { + return; + } + double ticks = (double)duration * frequency.QuadPart / 1000000.0; + clock->QuadPart += (LONGLONG)ticks; +} + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + LARGE_INTEGER frequency; + QueryPerformanceFrequency(&frequency); // ticks per second + + // Hardware not supporting QueryPerformanceFrequency + if (frequency.QuadPart == 0) { + return; + } + double ticks = (double)duration * frequency.QuadPart / 1000.0; + clock->QuadPart += (LONGLONG)ticks; +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { + LARGE_INTEGER frequency; + QueryPerformanceFrequency(&frequency); // ticks per second + + // Hardware not supporting QueryPerformanceFrequency + if (frequency.QuadPart == 0) { + return; + } + double ticks = (double)duration * frequency.QuadPart; + clock->QuadPart += (LONGLONG)ticks; +} + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now; diff --git a/src/system/zephyr/system.c b/src/system/zephyr/system.c index 7818c9993..3cd8ee634 100644 --- a/src/system/zephyr/system.c +++ b/src/system/zephyr/system.c @@ -20,6 +20,7 @@ #include #endif +#include #include #include #include @@ -108,7 +109,12 @@ z_result_t _z_mutex_try_lock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_try z_result_t _z_mutex_unlock(_z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_mutex_unlock(m)); } /*------------------ Condvar ------------------*/ -z_result_t _z_condvar_init(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_init(cv, 0)); } +z_result_t _z_condvar_init(_z_condvar_t *cv) { + pthread_condattr_t attr; + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + _Z_CHECK_SYS_ERR(pthread_cond_init(cv, &attr)); +} z_result_t _z_condvar_drop(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_destroy(cv)); } @@ -117,6 +123,16 @@ z_result_t _z_condvar_signal(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_s z_result_t _z_condvar_signal_all(_z_condvar_t *cv) { _Z_CHECK_SYS_ERR(pthread_cond_broadcast(cv)); } z_result_t _z_condvar_wait(_z_condvar_t *cv, _z_mutex_t *m) { _Z_CHECK_SYS_ERR(pthread_cond_wait(cv, m)); } + +z_result_t _z_condvar_wait_until(_z_condvar_t *cv, _z_mutex_t *m, const z_clock_t *abstime) { + int error = pthread_cond_timedwait(cv, m, abstime); + + if (error == ETIMEDOUT) { + return Z_ETIMEDOUT; + } + + _Z_CHECK_SYS_ERR(error); +} #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Sleep ------------------*/ @@ -183,6 +199,28 @@ unsigned long z_clock_elapsed_s(z_clock_t *instant) { return elapsed; } +void z_clock_advance_us(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000000; + clock->tv_nsec += (duration % 1000000) * 1000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_ms(z_clock_t *clock, unsigned long duration) { + clock->tv_sec += duration / 1000; + clock->tv_nsec += (duration % 1000) * 1000000; + + if (clock->tv_nsec >= 1000000000) { + clock->tv_sec += 1; + clock->tv_nsec -= 1000000000; + } +} + +void z_clock_advance_s(z_clock_t *clock, unsigned long duration) { clock->tv_sec += duration; } + /*------------------ Time ------------------*/ z_time_t z_time_now(void) { z_time_t now;