diff --git a/ww/eventloop/CMakeLists.txt b/ww/eventloop/CMakeLists.txt index 05cd95b4..ef0f87d4 100644 --- a/ww/eventloop/CMakeLists.txt +++ b/ww/eventloop/CMakeLists.txt @@ -115,8 +115,8 @@ if(APPLE) endif() # see Makefile -set(ALL_SRCDIRS . base ssl event util cpputil evpp protocol http http/client http/server mqtt) -set(CORE_SRCDIRS . base ssl event) +set(ALL_SRCDIRS . base event util cpputil evpp protocol http http/client http/server mqtt) +set(CORE_SRCDIRS . base event) if(WIN32) if(WITH_WEPOLL) set(CORE_SRCDIRS ${CORE_SRCDIRS} event/wepoll) diff --git a/ww/eventloop/base/hchan.c b/ww/eventloop/base/hchan.c index 723fa5d1..e4abdd3e 100644 --- a/ww/eventloop/base/hchan.c +++ b/ww/eventloop/base/hchan.c @@ -1,12 +1,18 @@ #include "hchan.h" #include "hmutex.h" +#include "threads.h" + +#define align2(n, w) \ + ({ \ + assert(((w) & ((w) - 1)) == 0); /* alignment w is not a power of two */ \ + ((n) + ((w) - 1)) & ~((w) - 1); \ + }) // DEBUG_CHAN_LOG: define to enable debug logging of send and recv -//#define DEBUG_CHAN_LOG +// #define DEBUG_CHAN_LOG // DEBUG_CHAN_LOCK: define to enable debug logging of channel locks -//#define DEBUG_CHAN_LOCK - +// #define DEBUG_CHAN_LOCK // LINE_CACHE_SIZE is the size of a cache line of the target CPU. // The value 64 covers i386, x86_64, arm32, arm64. @@ -16,59 +22,62 @@ #define ATTR_ALIGNED_LINE_CACHE __attribute__((aligned(LINE_CACHE_SIZE))) - +typedef _Atomic(unsigned int) atomic_uint32_t; +typedef _Atomic(unsigned long long) atomic_uint64_t; +typedef _Atomic(unsigned long long) atomic_size_t_t; +typedef _Atomic(long long) atomic_ssize_t; +typedef _Atomic(unsigned char) atomic_uint8_t; // ---------------------------------------------------------------------------- // debugging #if defined(DEBUG_CHAN_LOG) && !defined(DEBUG) - #undef DEBUG_CHAN_LOG +#undef DEBUG_CHAN_LOG #endif #ifdef DEBUG_CHAN_LOG - #define THREAD_ID_INVALID SIZE_MAX +#define THREAD_ID_INVALID SIZE_MAX - static size_t thread_id() { +static size_t thread_id() { static thread_local size_t _thread_id = THREAD_ID_INVALID; static atomic_size _thread_id_counter = ATOMIC_VAR_INIT(0); size_t tid = _thread_id; if (tid == THREAD_ID_INVALID) { - tid = AtomicAdd(&_thread_id_counter, 1); - _thread_id = tid; + tid = atomic_fetch_add_explicit(&_thread_id_counter, 1); + _thread_id = tid; } return tid; - } +} - static const char* tcolor() { +static const char* tcolor() { static const char* colors[] = { - //"\x1b[1m", // bold (white) - "\x1b[93m", // yellow - "\x1b[92m", // green - "\x1b[91m", // red - "\x1b[94m", // blue - "\x1b[96m", // cyan - "\x1b[95m", // magenta + //"\x1b[1m", // bold (white) + "\x1b[93m", // yellow + "\x1b[92m", // green + "\x1b[91m", // red + "\x1b[94m", // blue + "\x1b[96m", // cyan + "\x1b[95m", // magenta }; return colors[thread_id() % countof(colors)]; - } +} - // _dlog_chan writes a log message to stderr along with a globally unique "causality" - // sequence number. It does not use libc FILEs as those use mutex locks which would alter - // the behavior of multi-threaded channel operations. Instead it uses a buffer on the stack, - // which of course is local per thread and then calls the write syscall with the one buffer. - // This all means that log messages may be out of order; use the "causality" sequence number - // to understand what other messages were produced according to the CPU. - ATTR_FORMAT(printf, 2, 3) - static void _dlog_chan(const char* fname, const char* fmt, ...) { - static atomic_u32 seqnext = ATOMIC_VAR_INIT(1); // start at 1 to map to line nubmers +// _dlog_chan writes a log message to stderr along with a globally unique "causality" +// sequence number. It does not use libc FILEs as those use mutex locks which would alter +// the behavior of multi-threaded channel operations. Instead it uses a buffer on the stack, +// which of course is local per thread and then calls the write syscall with the one buffer. +// This all means that log messages may be out of order; use the "causality" sequence number +// to understand what other messages were produced according to the CPU. +ATTR_FORMAT(printf, 2, 3) +static void _dlog_chan(const char* fname, const char* fmt, ...) { + static atomic_uint32_t seqnext = ATOMIC_VAR_INIT(1); // start at 1 to map to line nubmers - uint32_t seq = AtomicAddx(&seqnext, 1, memory_order_acquire); + uint32_t seq = atomic_fetch_add_explicitx(&seqnext, 1, memory_order_acquire); char buf[256]; const ssize_t bufcap = (ssize_t)sizeof(buf); ssize_t buflen = 0; - buflen += (ssize_t)snprintf(&buf[buflen], bufcap - buflen, - "%04u \x1b[1m%sT%02zu ", seq, tcolor(), thread_id()); + buflen += (ssize_t)snprintf(&buf[buflen], bufcap - buflen, "%04u \x1b[1m%sT%02zu ", seq, tcolor(), thread_id()); va_list ap; va_start(ap, fmt); @@ -76,76 +85,80 @@ va_end(ap); if (buflen > 0) { - buflen += (ssize_t)snprintf(&buf[buflen], bufcap - buflen, "\x1b[0m (%s)\n", fname); - if (buflen >= bufcap) { - // truncated; make sure to end the line - buf[buflen - 1] = '\n'; - } + buflen += (ssize_t)snprintf(&buf[buflen], bufcap - buflen, "\x1b[0m (%s)\n", fname); + if (buflen >= bufcap) { + // truncated; make sure to end the line + buf[buflen - 1] = '\n'; + } } - #undef FMT +#undef FMT write(STDERR_FILENO, buf, buflen); - } +} - #define dlog_chan(fmt, ...) _dlog_chan(__FUNCTION__, fmt, ##__VA_ARGS__) - #define dlog_send(fmt, ...) dlog_chan("send: " fmt, ##__VA_ARGS__) - #define dlog_recv(fmt, ...) dlog_chan("recv: " fmt, ##__VA_ARGS__) - // #define dlog_send(fmt, ...) do{}while(0) - // #define dlog_recv(fmt, ...) do{}while(0) +#define dlog_chan(fmt, ...) _dlog_chan(__FUNCTION__, fmt, ##__VA_ARGS__) +#define dlog_send(fmt, ...) dlog_chan("send: " fmt, ##__VA_ARGS__) +#define dlog_recv(fmt, ...) dlog_chan("recv: " fmt, ##__VA_ARGS__) +// #define dlog_send(fmt, ...) do{}while(0) +// #define dlog_recv(fmt, ...) do{}while(0) #else - #define dlog_chan(fmt, ...) do{}while(0) - #define dlog_send(fmt, ...) do{}while(0) - #define dlog_recv(fmt, ...) do{}while(0) +#define dlog_chan(fmt, ...) \ + do { \ + } while (0) +#define dlog_send(fmt, ...) \ + do { \ + } while (0) +#define dlog_recv(fmt, ...) \ + do { \ + } while (0) #endif - // ---------------------------------------------------------------------------- // misc utils -#define is_power_of_two(intval) \ - (intval) && (0 == ((intval) & ((intval) - 1))) +#define is_power_of_two(intval) (intval) && (0 == ((intval) & ((intval) - 1))) // is_aligned checks if passed in pointer is aligned on a specific border. // bool is_aligned(T* pointer, uintptr_t alignment) -#define is_aligned(pointer, alignment) \ - (0 == ((uintptr_t)(pointer) & (((uintptr_t)alignment) - 1))) - +#define is_aligned(pointer, alignment) (0 == ((uintptr_t)(pointer) & (((uintptr_t)alignment) - 1))) // ------------------------------------------------------------------------- // channel lock #ifdef DEBUG_CHAN_LOCK - static uint32_t chlock_count = 0; - - #define CHAN_LOCK_T hmutex_t - #define chan_lock_init(lock) hmutex_init((lock), hmutex_plain) - #define chan_lock_dispose(lock) hmutex_destroy(lock) - - #define chan_lock(lock) do{ \ - uint32_t n = chlock_count++; \ - dlog("CL #%u LOCK %s:%d", n, __FILE__, __LINE__); \ - hmutex_lock(lock); \ - dlog("CL #%u UNLOCK %s:%d", n, __FILE__, __LINE__); \ - }while(0) - - #define chan_unlock(lock) do{ \ - /*dlog("CL UNLOCK %s:%d", __FILE__, __LINE__);*/ \ - hmutex_unlock(lock); \ - }while(0) +static uint32_t chlock_count = 0; + +#define CHAN_LOCK_T hmutex_t +#define chan_lock_init(lock) hmutex_init((lock), hmutex_plain) +#define chan_lock_destroy(lock) hmutex_destroy(lock) + +#define chan_lock(lock) \ + do { \ + uint32_t n = chlock_count++; \ + dlog("CL #%u LOCK %s:%d", n, __FILE__, __LINE__); \ + hmutex_lock(lock); \ + dlog("CL #%u UNLOCK %s:%d", n, __FILE__, __LINE__); \ + } while (0) + +#define chan_unlock(lock) \ + do { \ + /*dlog("CL UNLOCK %s:%d", __FILE__, __LINE__);*/ \ + hmutex_unlock(lock); \ + } while (0) #else - // // hmutex_t - // #define CHAN_LOCK_T hmutex_t - // #define chan_lock_init(lock) hmutex_init((lock), hmutex_plain) - // #define chan_lock_dispose(lock) hmutex_destroy(lock) - // #define chan_lock(lock) hmutex_lock(lock) - // #define chan_unlock(lock) hmutex_unlock(lock) - - // hhybridmutex_ - #define CHAN_LOCK_T hhybridmutex_t - #define chan_lock_init(lock) hhybridmutex_init(lock) - #define chan_lock_dispose(lock) hhybridmutex_dispose(lock) - #define chan_lock(lock) hhybridmutex_lock(lock) - #define chan_unlock(lock) hhybridmutex_unlock(lock) +// // hmutex_t +// #define CHAN_LOCK_T hmutex_t +// #define chan_lock_init(lock) hmutex_init((lock), hmutex_plain) +// #define chan_lock_destroy(lock) hmutex_destroy(lock) +// #define chan_lock(lock) hmutex_lock(lock) +// #define chan_unlock(lock) hmutex_unlock(lock) + +// hhybridmutex_ +#define CHAN_LOCK_T hhybridmutex_t +#define chan_lock_init(lock) hhybridmutex_init(lock) +#define chan_lock_destroy(lock) hhybridmutex_destroy(lock) +#define chan_lock(lock) hhybridmutex_lock(lock) +#define chan_unlock(lock) hhybridmutex_unlock(lock) #endif // ------------------------------------------------------------------------- @@ -154,485 +167,473 @@ typedef struct Thr Thr; // Thr holds thread-specific data and is owned by thread-local storage struct Thr { - size_t id; - bool init; - atomic_bool closed; - LSema sema; - Thr* next ATTR_ALIGNED_LINE_CACHE; // list link - _Atomic(void*) elemptr; + size_t id; + bool init; + atomic_bool closed; + hlsem_t sema; + Thr* next ATTR_ALIGNED_LINE_CACHE; // list link + _Atomic(void*) elemptr; }; typedef struct WaitQ { - _Atomic(Thr*) first; // head of linked list of parked threads - _Atomic(Thr*) last; // tail of linked list of parked threads + _Atomic(Thr*) first; // head of linked list of parked threads + _Atomic(Thr*) last; // tail of linked list of parked threads } WaitQ; -typedef struct Chan { - // These fields don't change after ChanOpen - uintptr_t memptr; // memory allocation pointer - Mem mem; // memory allocator this belongs to (immutable) - size_t elemsize; // size in bytes of elements sent on the channel - uint32_t qcap; // size of the circular queue buf (immutable) - - // These fields are frequently accessed and stored to. - // There's a perf opportunity here with a different more cache-efficient layout. - atomic_u32 qlen; // number of messages currently queued in buf - atomic_bool closed; // one way switch (once it becomes true, never becomes false again) - CHAN_LOCK_T lock; // guards the Chan struct - - // sendq is accessed on every call to chan_recv and only in some cases by chan_send, - // when parking a thread when there's no waiting receiver nor queued message. - // recvq is accessed on every call to chan_send and like sendq, only when parking a thread - // in chan_recv. - WaitQ sendq; // list of waiting send callers - WaitQ recvq; // list of waiting recv callers - - // sendx & recvx are likely to be falsely shared between threads. - // - sendx is loaded & stored by both chan_send and chan_recv - // - chan_send for buffered channels when no receiver is waiting - // - chan_recv when there's a waiting sender - // - recvx is only used by chan_recv - // So we make sure recvx ends up on a separate cache line. - atomic_u32 sendx; // send index in buf - atomic_u32 recvx ATTR_ALIGNED_LINE_CACHE; // receive index in buf - - // uint8_t pad[LINE_CACHE_SIZE]; - uint8_t buf[]; // queue storage -} ATTR_ALIGNED_LINE_CACHE Chan; - +typedef struct hchan_s { + // These fields don't change after hchan_Open + uintptr_t memptr; // memory allocation pointer + size_t elemsize; // size in bytes of elements sent on the channel + uint32_t qcap; // size of the circular queue buf (immutable) + + // These fields are frequently accessed and stored to. + // There's a perf opportunity here with a different more cache-efficient layout. + atomic_uint32_t qlen; // number of messages currently queued in buf + atomic_bool closed; // one way switch (once it becomes true, never becomes false again) + CHAN_LOCK_T lock; // guards the hchan_t struct + + // sendq is accessed on every call to chan_recv and only in some cases by chan_send, + // when parking a thread when there's no waiting receiver nor queued message. + // recvq is accessed on every call to chan_send and like sendq, only when parking a thread + // in chan_recv. + WaitQ sendq; // list of waiting send callers + WaitQ recvq; // list of waiting recv callers + + // sendx & recvx are likely to be falsely shared between threads. + // - sendx is loaded & stored by both chan_send and chan_recv + // - chan_send for buffered channels when no receiver is waiting + // - chan_recv when there's a waiting sender + // - recvx is only used by chan_recv + // So we make sure recvx ends up on a separate cache line. + atomic_uint32_t sendx; // send index in buf + atomic_uint32_t recvx ATTR_ALIGNED_LINE_CACHE; // receive index in buf + + // uint8_t pad[LINE_CACHE_SIZE]; + uint8_t buf[]; // queue storage +} ATTR_ALIGNED_LINE_CACHE hchan_t; static void thr_init(Thr* t) { - static atomic_size _thread_id_counter = ATOMIC_VAR_INIT(0); + static atomic_size_t _thread_id_counter = ATOMIC_VAR_INIT(0); - t->id = AtomicAdd(&_thread_id_counter, 1); - t->init = true; - LSemaInit(&t->sema, 0); // TODO: SemaDispose? + t->id = atomic_fetch_add_explicit(&_thread_id_counter, 1, memory_order_relaxed); + t->init = true; + LSemaInit(&t->sema, 0); // TODO: Semadestroy? } - inline static Thr* thr_current() { - static thread_local Thr _thr = {0}; + static thread_local Thr _thr = {0}; - Thr* t = &_thr; - if (R_UNLIKELY(!t->init)) - thr_init(t); - return t; + Thr* t = &_thr; + if (!t->init) thr_init(t); + return t; } - inline static void thr_signal(Thr* t) { - LSemaSignal(&t->sema, 1); // wake + LSemaSignal(&t->sema, 1); // wake } - inline static void thr_wait(Thr* t) { - dlog_chan("thr_wait ..."); - LSemaWait(&t->sema); // sleep + dlog_chan("thr_wait ..."); + LSemaWait(&t->sema); // sleep } - static void wq_enqueue(WaitQ* wq, Thr* t) { - // note: atomic loads & stores for cache reasons, not thread safety; c->lock is held. - if (AtomicLoadAcq(&wq->first)) { - // Note: compare first instead of last as we don't clear wq->last in wq_dequeue - AtomicLoadAcq(&wq->last)->next = t; - } else { - AtomicStoreRel(&wq->first, t); - } - AtomicStoreRel(&wq->last, t); + // note: atomic loads & stores for cache reasons, not thread safety; c->lock is held. + if (atomic_load_explicit(&wq->first, memory_order_acquire)) { + // Note: compare first instead of last as we don't clear wq->last in wq_dequeue + atomic_load_explicit(&wq->last, memory_order_acquire)->next = t; + } + else { + atomic_store_explicit(&wq->first, t, memory_order_release); + } + atomic_store_explicit(&wq->last, t, memory_order_release); } - -inline static Thr* nullable wq_dequeue(WaitQ* wq) { - Thr* t = AtomicLoadAcq(&wq->first); - if (t) { - AtomicStoreRel(&wq->first, t->next); - t->next = NULL; - // Note: intentionally not clearing wq->last in case wq->first==wq->last as we can - // avoid that branch by not checking wq->last in wq_enqueue. - } - return t; +static inline Thr* wq_dequeue(WaitQ* wq) { + Thr* t = atomic_load_explicit(&wq->first, memory_order_acquire); + if (t) { + atomic_store_explicit(&wq->first, t->next, memory_order_release); + t->next = NULL; + // Note: intentionally not clearing wq->last in case wq->first==wq->last as we can + // avoid that branch by not checking wq->last in wq_enqueue. + } + return t; } - // chan_bufptr returns the pointer to the i'th slot in the buffer -inline static void* chan_bufptr(Chan* c, uint32_t i) { - return (void*)&c->buf[(uintptr_t)i * (uintptr_t)c->elemsize]; +inline static void* chan_bufptr(hchan_t* c, uint32_t i) { + return (void*)&c->buf[(uintptr_t)i * (uintptr_t)c->elemsize]; } - // chan_park adds elemptr to wait queue wq, unlocks channel c and blocks the calling thread -static Thr* chan_park(Chan* c, WaitQ* wq, void* elemptr) { - // caller must hold lock on channel that owns wq - auto t = thr_current(); - AtomicStore(&t->elemptr, elemptr); - dlog_chan("park: elemptr %p", elemptr); - wq_enqueue(wq, t); - chan_unlock(&c->lock); - thr_wait(t); - return t; +static Thr* chan_park(hchan_t* c, WaitQ* wq, void* elemptr) { + // caller must hold lock on channel that owns wq + Thr* t = thr_current(); + atomic_store_explicit(&t->elemptr, elemptr, memory_order_relaxed); + dlog_chan("park: elemptr %p", elemptr); + wq_enqueue(wq, t); + chan_unlock(&c->lock); + thr_wait(t); + return t; } - -inline static bool chan_full(Chan* c) { - // c.qcap is immutable (never written after the channel is created) - // so it is safe to read at any time during channel operation. - if (c->qcap == 0) - return AtomicLoad(&c->recvq.first) == NULL; - return AtomicLoad(&c->qlen) == c->qcap; +inline static bool chan_full(hchan_t* c) { + // c.qcap is immutable (never written after the channel is created) + // so it is safe to read at any time during channel operation. + if (c->qcap == 0) return atomic_load_explicit(&c->recvq.first, memory_order_relaxed) == NULL; + return atomic_load_explicit(&c->qlen, memory_order_relaxed) == c->qcap; } +static bool chan_send_direct(hchan_t* c, void* srcelemptr, Thr* recvt) { + // chan_send_direct processes a send operation on an empty channel c. + // element sent by the sender is copied to the receiver recvt. + // The receiver is then woken up to go on its merry way. + // hchan_nel c must be empty and locked. This function unlocks c with chan_unlock. + // recvt must already be dequeued from c. + + void* dstelemptr = atomic_load_explicit(&recvt->elemptr, memory_order_acquire); + assert(dstelemptr != NULL); + dlog_send("direct send of srcelemptr %p to [%zu] (dstelemptr %p)", srcelemptr, recvt->id, dstelemptr); + // store to address provided with chan_recv call + memcpy(dstelemptr, srcelemptr, c->elemsize); + atomic_store_explicit(&recvt->elemptr, NULL, memory_order_relaxed); // clear pointer (TODO: is this really needed?) -static bool chan_send_direct(Chan* c, void* srcelemptr, Thr* recvt) { - // chan_send_direct processes a send operation on an empty channel c. - // element sent by the sender is copied to the receiver recvt. - // The receiver is then woken up to go on its merry way. - // Channel c must be empty and locked. This function unlocks c with chan_unlock. - // recvt must already be dequeued from c. - - void* dstelemptr = AtomicLoadAcq(&recvt->elemptr); - assertnotnull(dstelemptr); - dlog_send("direct send of srcelemptr %p to [%zu] (dstelemptr %p)", - srcelemptr, recvt->id, dstelemptr); - // store to address provided with chan_recv call - memcpy(dstelemptr, srcelemptr, c->elemsize); - AtomicStore(&recvt->elemptr, NULL); // clear pointer (TODO: is this really needed?) - - chan_unlock(&c->lock); - thr_signal(recvt); // wake up chan_recv caller - return true; + chan_unlock(&c->lock); + thr_signal(recvt); // wake up chan_recv caller + return true; } +inline static bool chan_send(hchan_t* c, void* srcelemptr, bool* closed) { + bool block = closed == NULL; + dlog_send("srcelemptr %p", srcelemptr); -inline static bool chan_send(Chan* c, void* srcelemptr, bool* nullable closed) { - bool block = closed == NULL; - dlog_send("srcelemptr %p", srcelemptr); - - // fast path for non-blocking send on full channel - // - // From Go's chan implementation from which this logic is borrowed: - // After observing that the channel is not closed, we observe that the channel is - // not ready for sending. Each of these observations is a single word-sized read - // (first c.closed and second chan_full()). - // Because a closed channel cannot transition from 'ready for sending' to - // 'not ready for sending', even if the channel is closed between the two observations, - // they imply a moment between the two when the channel was both not yet closed - // and not ready for sending. We behave as if we observed the channel at that moment, - // and report that the send cannot proceed. - // - // It is okay if the reads are reordered here: if we observe that the channel is not - // ready for sending and then observe that it is not closed, that implies that the - // channel wasn't closed during the first observation. However, nothing here - // guarantees forward progress. We rely on the side effects of lock release in - // chan_recv() and ChanClose() to update this thread's view of c.closed and chan_full(). - if (!block && !c->closed && chan_full(c)) - return false; + // fast path for non-blocking send on full channel + // + // From Go's chan implementation from which this logic is borrowed: + // After observing that the channel is not closed, we observe that the channel is + // not ready for sending. Each of these observations is a single word-sized read + // (first c.closed and second chan_full()). + // Because a closed channel cannot transition from 'ready for sending' to + // 'not ready for sending', even if the channel is closed between the two observations, + // they imply a moment between the two when the channel was both not yet closed + // and not ready for sending. We behave as if we observed the channel at that moment, + // and report that the send cannot proceed. + // + // It is okay if the reads are reordered here: if we observe that the channel is not + // ready for sending and then observe that it is not closed, that implies that the + // channel wasn't closed during the first observation. However, nothing here + // guarantees forward progress. We rely on the side effects of lock release in + // chan_recv() and hchan_Close() to update this thread's view of c.closed and chan_full(). + if (!block && !c->closed && chan_full(c)) return false; + + chan_lock(&c->lock); + + if (atomic_load_explicit(&c->closed, memory_order_relaxed)) { + chan_unlock(&c->lock); + if (block) { + fprintf(stderr, "send on closed channel"); + exit(1); + } + else { + *closed = true; + } + return false; + } - chan_lock(&c->lock); + Thr* recvt = wq_dequeue(&c->recvq); + if (recvt) { + // Found a waiting receiver. recvt is blocked, waiting in chan_recv. + // We pass the value we want to send directly to the receiver, + // bypassing the channel buffer (if any). + // Note that chan_send_direct calls chan_unlock(&c->lock). + assert(recvt->init); + return chan_send_direct(c, srcelemptr, recvt); + } - if (R_UNLIKELY(AtomicLoad(&c->closed))) { - chan_unlock(&c->lock); - if (block) { - panic("send on closed channel"); - } else { - *closed = true; + if (atomic_load_explicit(&c->qlen, memory_order_relaxed) < c->qcap) { + // space available in message buffer -- enqueue + uint32_t i = atomic_fetch_add_explicit(&c->sendx, 1, memory_order_relaxed); + // copy *srcelemptr -> *dstelemptr + void* dstelemptr = chan_bufptr(c, i); + memcpy(dstelemptr, srcelemptr, c->elemsize); + dlog_send("enqueue elemptr %p at buf[%u]", srcelemptr, i); + if (i == c->qcap - 1) atomic_store_explicit(&c->sendx, 0, memory_order_relaxed); + atomic_fetch_add_explicit(&c->qlen, 1, memory_order_relaxed); + chan_unlock(&c->lock); + return true; } - return false; - } - - Thr* recvt = wq_dequeue(&c->recvq); - if (recvt) { - // Found a waiting receiver. recvt is blocked, waiting in chan_recv. - // We pass the value we want to send directly to the receiver, - // bypassing the channel buffer (if any). - // Note that chan_send_direct calls chan_unlock(&c->lock). - assert(recvt->init); - return chan_send_direct(c, srcelemptr, recvt); - } - - if (AtomicLoad(&c->qlen) < c->qcap) { - // space available in message buffer -- enqueue - uint32_t i = AtomicAdd(&c->sendx, 1); - // copy *srcelemptr -> *dstelemptr - void* dstelemptr = chan_bufptr(c, i); - memcpy(dstelemptr, srcelemptr, c->elemsize); - dlog_send("enqueue elemptr %p at buf[%u]", srcelemptr, i); - if (i == c->qcap - 1) - AtomicStore(&c->sendx, 0); - AtomicAdd(&c->qlen, 1); - chan_unlock(&c->lock); - return true; - } - // buffer is full and there is no waiting receiver - if (!block) { - chan_unlock(&c->lock); - return false; - } - - // park the calling thread. Some recv caller will wake us up. - // Note that chan_park calls chan_unlock(&c->lock) - dlog_send("wait... (elemptr %p)", srcelemptr); - chan_park(c, &c->sendq, srcelemptr); - dlog_send("woke up -- sent elemptr %p", srcelemptr); - return true; -} + // buffer is full and there is no waiting receiver + if (!block) { + chan_unlock(&c->lock); + return false; + } + // park the calling thread. Some recv caller will wake us up. + // Note that chan_park calls chan_unlock(&c->lock) + dlog_send("wait... (elemptr %p)", srcelemptr); + chan_park(c, &c->sendq, srcelemptr); + dlog_send("woke up -- sent elemptr %p", srcelemptr); + return true; +} // chan_empty reports whether a read from c would block (that is, the channel is empty). // It uses a single atomic read of mutable state. -inline static bool chan_empty(Chan* c) { - // Note: qcap is immutable - if (c->qcap == 0) - return AtomicLoad(&c->sendq.first) == NULL; - return AtomicLoad(&c->qlen) == 0; +inline static bool chan_empty(hchan_t* c) { + // Note: qcap is immutable + if (c->qcap == 0) return atomic_load_explicit(&c->sendq.first, memory_order_relaxed) == NULL; + return atomic_load_explicit(&c->qlen, memory_order_relaxed) == 0; } +static bool chan_recv_direct(hchan_t* c, void* dstelemptr, Thr* st); + +inline static bool chan_recv(hchan_t* c, void* dstelemptr, bool* closed) { + bool block = closed == NULL; // TODO: non-blocking path + dlog_recv("dstelemptr %p", dstelemptr); + + // Fast path: check for failed non-blocking operation without acquiring the lock. + if (!block && chan_empty(c)) { + // After observing that the channel is not ready for receiving, we observe whether the + // channel is closed. + // + // Reordering of these checks could lead to incorrect behavior when racing with a close. + // For example, if the channel was open and not empty, was closed, and then drained, + // reordered reads could incorrectly indicate "open and empty". To prevent reordering, + // we use atomic loads for both checks, and rely on emptying and closing to happen in + // separate critical sections under the same lock. This assumption fails when closing + // an unbuffered channel with a blocked send, but that is an error condition anyway. + if (atomic_load_explicit(&c->closed, memory_order_relaxed) == false) { + // Because a channel cannot be reopened, the later observation of the channel + // being not closed implies that it was also not closed at the moment of the + // first observation. We behave as if we observed the channel at that moment + // and report that the receive cannot proceed. + return false; + } + // The channel is irreversibly closed. Re-check whether the channel has any pending data + // to receive, which could have arrived between the empty and closed checks above. + // Sequential consistency is also required here, when racing with such a send. + if (chan_empty(c)) { + // The channel is irreversibly closed and empty + memset(dstelemptr, 0, c->elemsize); + *closed = true; + return false; + } + } -static bool chan_recv_direct(Chan* c, void* dstelemptr, Thr* st); - - -inline static bool chan_recv(Chan* c, void* dstelemptr, bool* nullable closed) { - bool block = closed == NULL; // TODO: non-blocking path - dlog_recv("dstelemptr %p", dstelemptr); + chan_lock(&c->lock); - // Fast path: check for failed non-blocking operation without acquiring the lock. - if (!block && chan_empty(c)) { - // After observing that the channel is not ready for receiving, we observe whether the - // channel is closed. - // - // Reordering of these checks could lead to incorrect behavior when racing with a close. - // For example, if the channel was open and not empty, was closed, and then drained, - // reordered reads could incorrectly indicate "open and empty". To prevent reordering, - // we use atomic loads for both checks, and rely on emptying and closing to happen in - // separate critical sections under the same lock. This assumption fails when closing - // an unbuffered channel with a blocked send, but that is an error condition anyway. - if (AtomicLoad(&c->closed) == false) { - // Because a channel cannot be reopened, the later observation of the channel - // being not closed implies that it was also not closed at the moment of the - // first observation. We behave as if we observed the channel at that moment - // and report that the receive cannot proceed. - return false; + if (atomic_load_explicit(&c->closed, memory_order_relaxed) && atomic_load_explicit(&c->qlen, memory_order_relaxed) == 0) { + // channel is closed and the buffer queue is empty + dlog_recv("channel closed & empty queue"); + chan_unlock(&c->lock); + memset(dstelemptr, 0, c->elemsize); + if (closed) *closed = true; + return false; } - // The channel is irreversibly closed. Re-check whether the channel has any pending data - // to receive, which could have arrived between the empty and closed checks above. - // Sequential consistency is also required here, when racing with such a send. - if (chan_empty(c)) { - // The channel is irreversibly closed and empty - memset(dstelemptr, 0, c->elemsize); - *closed = true; - return false; + + Thr* t = wq_dequeue(&c->sendq); + if (t) { + // Found a waiting sender. + // If buffer is size 0, receive value directly from sender. + // Otherwise, receive from head of queue and add sender's value to the tail of the queue + // (both map to the same buffer slot because the queue is full). + // Note that chan_recv_direct calls chan_unlock(&c->lock). + assert(t->init); + return chan_recv_direct(c, dstelemptr, t); } - } - chan_lock(&c->lock); + if (atomic_load_explicit(&c->qlen, memory_order_relaxed) > 0) { + // Receive directly from queue + uint32_t i = atomic_fetch_add_explicit(&c->recvx, 1, memory_order_relaxed); + if (i == c->qcap - 1) atomic_store_explicit(&c->recvx, 0, memory_order_relaxed); + atomic_fetch_sub_explicit(&c->qlen, 1, memory_order_relaxed); + + // copy *srcelemptr -> *dstelemptr + void* srcelemptr = chan_bufptr(c, i); + memcpy(dstelemptr, srcelemptr, c->elemsize); +#ifdef DEBUG + memset(srcelemptr, 0, c->elemsize); // zero buffer memory +#endif - if (AtomicLoad(&c->closed) && AtomicLoad(&c->qlen) == 0) { - // channel is closed and the buffer queue is empty - dlog_recv("channel closed & empty queue"); - chan_unlock(&c->lock); - memset(dstelemptr, 0, c->elemsize); - if (closed) - *closed = true; - return false; - } - - Thr* t = wq_dequeue(&c->sendq); - if (t) { - // Found a waiting sender. - // If buffer is size 0, receive value directly from sender. - // Otherwise, receive from head of queue and add sender's value to the tail of the queue - // (both map to the same buffer slot because the queue is full). - // Note that chan_recv_direct calls chan_unlock(&c->lock). - assert(t->init); - return chan_recv_direct(c, dstelemptr, t); - } - - if (AtomicLoad(&c->qlen) > 0) { - // Receive directly from queue - uint32_t i = AtomicAdd(&c->recvx, 1); - if (i == c->qcap - 1) - AtomicStore(&c->recvx, 0); - AtomicSub(&c->qlen, 1); - - // copy *srcelemptr -> *dstelemptr - void* srcelemptr = chan_bufptr(c, i); - memcpy(dstelemptr, srcelemptr, c->elemsize); - #ifdef DEBUG - memset(srcelemptr, 0, c->elemsize); // zero buffer memory - #endif + dlog_recv("dequeue elemptr %p from buf[%u]", srcelemptr, i); - dlog_recv("dequeue elemptr %p from buf[%u]", srcelemptr, i); + chan_unlock(&c->lock); + return true; + } - chan_unlock(&c->lock); - return true; - } + // No message available -- nothing queued and no waiting senders + if (!block) { + chan_unlock(&c->lock); + return false; + } - // No message available -- nothing queued and no waiting senders - if (!block) { - chan_unlock(&c->lock); - return false; - } + // Check if the channel is closed. + if (atomic_load_explicit(&c->closed, memory_order_relaxed)) { + chan_unlock(&c->lock); + goto ret_closed; + } - // Check if the channel is closed. - if (AtomicLoad(&c->closed)) { - chan_unlock(&c->lock); - goto ret_closed; - } - - // Block by parking the thread. Some send caller will wake us up. - // Note that chan_park calls chan_unlock(&c->lock) - dlog_recv("wait... (elemptr %p)", dstelemptr); - t = chan_park(c, &c->recvq, dstelemptr); - - // woken up by sender or close call - if (AtomicLoad(&t->closed)) { - // Note that we check "closed" on the Thr, not the Chan. - // This is important since c->closed may be true even as we receive a message. - dlog_recv("woke up -- channel closed"); - goto ret_closed; - } - - // message was delivered by storing to elemptr by some sender - dlog_recv("woke up -- received to elemptr %p", dstelemptr); - return true; + // Block by parking the thread. Some send caller will wake us up. + // Note that chan_park calls chan_unlock(&c->lock) + dlog_recv("wait... (elemptr %p)", dstelemptr); + t = chan_park(c, &c->recvq, dstelemptr); + + // woken up by sender or close call + if (atomic_load_explicit(&t->closed, memory_order_relaxed)) { + // Note that we check "closed" on the Thr, not the hchan_t. + // This is important since c->closed may be true even as we receive a message. + dlog_recv("woke up -- channel closed"); + goto ret_closed; + } + + // message was delivered by storing to elemptr by some sender + dlog_recv("woke up -- received to elemptr %p", dstelemptr); + return true; ret_closed: - dlog_recv("channel closed"); - memset(dstelemptr, 0, c->elemsize); - return false; + dlog_recv("channel closed"); + memset(dstelemptr, 0, c->elemsize); + return false; } - // chan_recv_direct processes a receive operation on a full channel c -static bool chan_recv_direct(Chan* c, void* dstelemptr, Thr* sendert) { - // There are 2 parts: - // 1) The value sent by the sender sg is put into the channel and the sender - // is woken up to go on its merry way. - // 2) The value received by the receiver (the current G) is written to ep. - // For synchronous (unbuffered) channels, both values are the same. - // For asynchronous (buffered) channels, the receiver gets its data from - // the channel buffer and the sender's data is put in the channel buffer. - // Channel c must be full and locked. - // sendert must already be dequeued from c.sendq. - bool ok = true; - - if (AtomicLoad(&c->qlen) == 0) { - // Copy data from sender - void* srcelemptr = AtomicLoadx(&sendert->elemptr, memory_order_consume); - dlog_recv("direct recv of srcelemptr %p from [%zu] (dstelemptr %p, buffer empty)", - srcelemptr, sendert->id, dstelemptr); - assertnotnull(srcelemptr); - memcpy(dstelemptr, srcelemptr, c->elemsize); - } else { - // Queue is full. Take the item at the head of the queue. - // Make the sender enqueue its item at the tail of the queue. - // Since the queue is full, those are both the same slot. - dlog_recv("direct recv from [%zu] (dstelemptr %p, buffer full)", sendert->id, dstelemptr); - //assert_debug(AtomicLoad(&c->qlen) == c->qcap); // queue is full - - // copy element from queue to receiver - uint32_t i = AtomicAdd(&c->recvx, 1); - if (i == c->qcap - 1) { - AtomicStore(&c->recvx, 0); - AtomicStore(&c->sendx, 0); - } else { - AtomicStore(&c->sendx, i + 1); +static bool chan_recv_direct(hchan_t* c, void* dstelemptr, Thr* sendert) { + // There are 2 parts: + // 1) The value sent by the sender sg is put into the channel and the sender + // is woken up to go on its merry way. + // 2) The value received by the receiver (the current G) is written to ep. + // For synchronous (unbuffered) channels, both values are the same. + // For asynchronous (buffered) channels, the receiver gets its data from + // the channel buffer and the sender's data is put in the channel buffer. + // hchan_nel c must be full and locked. + // sendert must already be dequeued from c.sendq. + bool ok = true; + + if (atomic_load_explicit(&c->qlen, memory_order_relaxed) == 0) { + // Copy data from sender + void* srcelemptr = atomic_load_explicit(&sendert->elemptr, memory_order_consume); + dlog_recv("direct recv of srcelemptr %p from [%zu] (dstelemptr %p, buffer empty)", srcelemptr, sendert->id, dstelemptr); + assert(srcelemptr != NULL); + memcpy(dstelemptr, srcelemptr, c->elemsize); + } + else { + // Queue is full. Take the item at the head of the queue. + // Make the sender enqueue its item at the tail of the queue. + // Since the queue is full, those are both the same slot. + dlog_recv("direct recv from [%zu] (dstelemptr %p, buffer full)", sendert->id, dstelemptr); + // assert_debug(atomic_load_explicit(&c->qlen) == c->qcap); // queue is full + + // copy element from queue to receiver + uint32_t i = atomic_fetch_add_explicit(&c->recvx, 1, memory_order_relaxed); + if (i == c->qcap - 1) { + atomic_store_explicit(&c->recvx, 0, memory_order_relaxed); + atomic_store_explicit(&c->sendx, 0, memory_order_relaxed); + } + else { + atomic_store_explicit(&c->sendx, i + 1, memory_order_relaxed); + } + + // copy c->buf[i] -> *dstelemptr + void* bufelemptr = chan_bufptr(c, i); + assert(bufelemptr != NULL); + memcpy(dstelemptr, bufelemptr, c->elemsize); + dlog_recv("dequeue srcelemptr %p from buf[%u]", bufelemptr, i); + + // copy *sendert->elemptr -> c->buf[i] + void* srcelemptr = atomic_load_explicit(&sendert->elemptr, memory_order_consume); + assert(srcelemptr != NULL); + memcpy(bufelemptr, srcelemptr, c->elemsize); + dlog_recv("enqueue srcelemptr %p to buf[%u]", srcelemptr, i); } - // copy c->buf[i] -> *dstelemptr - void* bufelemptr = chan_bufptr(c, i); - assertnotnull(bufelemptr); - memcpy(dstelemptr, bufelemptr, c->elemsize); - dlog_recv("dequeue srcelemptr %p from buf[%u]", bufelemptr, i); - - // copy *sendert->elemptr -> c->buf[i] - void* srcelemptr = AtomicLoadx(&sendert->elemptr, memory_order_consume); - assertnotnull(srcelemptr); - memcpy(bufelemptr, srcelemptr, c->elemsize); - dlog_recv("enqueue srcelemptr %p to buf[%u]", srcelemptr, i); - } - - chan_unlock(&c->lock); - thr_signal(sendert); // wake up chan_send caller - return ok; + chan_unlock(&c->lock); + thr_signal(sendert); // wake up chan_send caller + return ok; } +hchan_t* hchan_Open(size_t elemsize, uint32_t bufcap) { + int64_t memsize = (int64_t)sizeof(hchan_t) + ((int64_t)bufcap * (int64_t)elemsize); -Chan* nullable ChanOpen(Mem mem, size_t elemsize, uint32_t bufcap) { - i64 memsize = (i64)sizeof(Chan) + ((i64)bufcap * (i64)elemsize); - - // ensure we have enough space to offset the allocation by line cache (for alignment) - memsize = align2(memsize + ((LINE_CACHE_SIZE+1) / 2), LINE_CACHE_SIZE); + // ensure we have enough space to offset the allocation by line cache (for alignment) + memsize = align2(memsize + ((LINE_CACHE_SIZE + 1) / 2), LINE_CACHE_SIZE); - // check for overflow - if (memsize < (i64)sizeof(Chan)) - panic("buffer size out of range"); + // check for overflow + if (memsize < (int64_t)sizeof(hchan_t)) { + fprintf(stderr, "buffer size out of range"); + exit(1); + } - // allocate memory, placing Chan at a line cache address boundary - uintptr_t ptr = (uintptr_t)memalloc(mem, memsize); + // allocate memory, placing hchan_t at a line cache address boundary + uintptr_t ptr = (uintptr_t)malloc(memsize); - // align c to line cache boundary - Chan* c = (Chan*)align2(ptr, LINE_CACHE_SIZE); + // align c to line cache boundary + hchan_t* c = (hchan_t*)align2(ptr, LINE_CACHE_SIZE); - c->memptr = ptr; - c->mem = mem; - c->elemsize = elemsize; - c->qcap = bufcap; - chan_lock_init(&c->lock); + c->memptr = ptr; + c->elemsize = elemsize; + c->qcap = bufcap; + chan_lock_init(&c->lock); - // make sure that the thread setting up the channel gets a low thread_id - #ifdef DEBUG_CHAN_LOG - thread_id(); - #endif +// make sure that the thread setting up the channel gets a low thread_id +#ifdef DEBUG_CHAN_LOG + thread_id(); +#endif - return c; + return c; } +void hchan_Close(hchan_t* c) { + dlog_chan("--- close ---"); -void ChanClose(Chan* c) { - dlog_chan("--- close ---"); - - chan_lock(&c->lock); - dlog_chan("close: channel locked"); - - if (atomic_exchange_explicit(&c->closed, 1, memory_order_acquire) != 0) - panic("close of closed channel"); - atomic_thread_fence(memory_order_seq_cst); - - Thr* t = AtomicLoadAcq(&c->recvq.first); - while (t) { - dlog_chan("close: wake recv [%zu]", t->id); - Thr* next = t->next; - AtomicStore(&t->closed, true); - thr_signal(t); - t = next; - } - - t = AtomicLoadAcq(&c->sendq.first); - while (t) { - dlog_chan("close: wake send [%zu]", t->id); - Thr* next = t->next; - AtomicStore(&t->closed, true); - thr_signal(t); - t = next; - } - - chan_unlock(&c->lock); - dlog_chan("close: done"); -} + chan_lock(&c->lock); + dlog_chan("close: channel locked"); + if (atomic_exchange_explicit(&c->closed, 1, memory_order_acquire) != 0) { + fprintf(stderr, "close of closed channel"); + exit(1); + } + atomic_thread_fence(memory_order_seq_cst); + + Thr* t = atomic_load_explicit(&c->recvq.first, memory_order_acquire); + while (t) { + dlog_chan("close: wake recv [%zu]", t->id); + Thr* next = t->next; + atomic_store_explicit(&t->closed, true, memory_order_relaxed); + thr_signal(t); + t = next; + } -void ChanFree(Chan* c) { - assert(AtomicLoadAcq(&c->closed)); // must close channel before freeing its memory - chan_lock_dispose(&c->lock); - memfree(c->mem, (void*)c->memptr); -} + t = atomic_load_explicit(&c->sendq.first, memory_order_acquire); + while (t) { + dlog_chan("close: wake send [%zu]", t->id); + Thr* next = t->next; + atomic_store_explicit(&t->closed, true, memory_order_relaxed); + thr_signal(t); + t = next; + } + chan_unlock(&c->lock); + dlog_chan("close: done"); +} -uint32_t ChanCap(const Chan* c) { return c->qcap; } -bool ChanSend(Chan* c, void* elemptr) { return chan_send(c, elemptr, NULL); } -bool ChanRecv(Chan* c, void* elemptr) { return chan_recv(c, elemptr, NULL); } -bool ChanTrySend(Chan* c, void* elemptr, bool* closed) { return chan_send(c, elemptr, closed); } -bool ChanTryRecv(Chan* c, void* elemptr, bool* closed) { return chan_recv(c, elemptr, closed); } +void hchan_Free(hchan_t* c) { + assert(atomic_load_explicit(&c->closed, memory_order_acquire)); // must close channel before freeing its memory + chan_lock_destroy(&c->lock); + free((void*)c->memptr); +} +uint32_t hchan_Cap(const hchan_t* c) { + return c->qcap; +} +bool hchan_Send(hchan_t* c, void* elemptr) { + return chan_send(c, elemptr, NULL); +} +bool hchan_Recv(hchan_t* c, void* elemptr) { + return chan_recv(c, elemptr, NULL); +} +bool hchan_TrySend(hchan_t* c, void* elemptr, bool* closed) { + return chan_send(c, elemptr, closed); +} +bool hchan_TryRecv(hchan_t* c, void* elemptr, bool* closed) { + return chan_recv(c, elemptr, closed); +} diff --git a/ww/eventloop/base/hchan.h b/ww/eventloop/base/hchan.h index 2d4f8cce..3f07b1ca 100644 --- a/ww/eventloop/base/hchan.h +++ b/ww/eventloop/base/hchan.h @@ -1,5 +1,7 @@ #pragma once -#include "stdint.h" +#include +#include +#include // hchan_t is an optionally-buffered messaging channel for CSP-like processing. // Example: @@ -24,7 +26,7 @@ typedef struct hchan_s hchan_t; // opaque // hchannel_open creates and initializes a new channel which holds elements of elemsize byte. // If bufcap>0 then a buffered channel with the capacity to hold bufcap elements is created. -hchan_t* hchannel_open(Mem mem, size_t elemsize, uint32_t bufcap); +hchan_t* hchannel_open(size_t elemsize, uint32_t bufcap); // hchan_close cancels any waiting senders and receivers. // Messages sent before this call are guaranteed to be delivered, assuming there are diff --git a/ww/eventloop/base/hlsem.c b/ww/eventloop/base/hlsem.c new file mode 100644 index 00000000..7ec3969d --- /dev/null +++ b/ww/eventloop/base/hlsem.c @@ -0,0 +1,292 @@ +#include "hmutex.h" + +// This implementation is based on of Jeff Preshing's "lightweight semaphore" +// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h +// zlib license: +// +// Copyright (c) 2015 Jeff Preshing +// +// This software is provided 'as-is', without any express or implied +// warranty. In no event will the authors be held liable for any damages +// arising from the use of this software. +// +// Permission is granted to anyone to use this software for any purpose, +// including commercial applications, and to alter it and redistribute it +// freely, subject to the following restrictions: +// +// 1. The origin of this software must not be misrepresented; you must not +// claim that you wrote the original software. If you use this software +// in a product, an acknowledgement in the product documentation would be +// appreciated but is not required. +// 2. Altered source versions must be plainly marked as such, and must not be +// misrepresented as being the original software. +// 3. This notice may not be removed or altered from any source distribution. + +//#define USE_UNIX_SEMA + +#if defined(_WIN32) && !defined(USE_UNIX_SEMA) + #include + #undef min + #undef max +#elif defined(__MACH__) && !defined(USE_UNIX_SEMA) + #undef panic // mach/mach.h defines a function called panic() + #include + // redefine panic + #define panic(fmt, ...) _panic(__FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#elif defined(__unix__) || defined(USE_UNIX_SEMA) + #include +#else + #error Unsupported platform +#endif + +ASSUME_NONNULL_BEGIN + +#define USECS_IN_1_SEC 1000000 +#define NSECS_IN_1_SEC 1000000000 + + +//--------------------------------------------------------------------------------------------- +#if defined(_WIN32) && !defined(USE_UNIX_SEMA) + +static bool SemaInit(Sema* sp, u32 initcount) { + assert(initcount <= 0x7fffffff); + *sp = (Sema)CreateSemaphoreW(NULL, (int)initcount, 0x7fffffff, NULL); + return *sp != NULL; +} + +static void SemaDispose(Sema* sp) { + CloseHandle(*sp); +} + +static bool SemaWait(Sema* sp) { + const unsigned long infinite = 0xffffffff; + return WaitForSingleObject(*sp, infinite) == 0; +} + +static bool SemaTryWait(Sema* sp) { + return WaitForSingleObject(*sp, 0) == 0; +} + +static bool SemaTimedWait(Sema* sp, u64 timeout_usecs) { + return WaitForSingleObject(*sp, (unsigned long)(timeout_usecs / 1000)) == 0; +} + +static bool SemaSignal(Sema* sp, u32 count) { + assert(count > 0); + // while (!ReleaseSemaphore(*sp, count, NULL)) { + // } + return ReleaseSemaphore(*sp, count, NULL); +} + +//--------------------------------------------------------------------------------------------- +#elif defined(__MACH__) && !defined(USE_UNIX_SEMA) +// Can't use POSIX semaphores due to +// https://web.archive.org/web/20140109214515/ +// http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html + +static bool SemaInit(Sema* sp, u32 initcount) { + assert(initcount <= 0x7fffffff); + kern_return_t rc = + semaphore_create(mach_task_self(), (semaphore_t*)sp, SYNC_POLICY_FIFO, (int)initcount); + return rc == KERN_SUCCESS; +} + +static void SemaDispose(Sema* sp) { + semaphore_destroy(mach_task_self(), *(semaphore_t*)sp); +} + +static bool SemaWait(Sema* sp) { + semaphore_t s = *(semaphore_t*)sp; + while (1) { + kern_return_t rc = semaphore_wait(s); + if (rc != KERN_ABORTED) + return rc == KERN_SUCCESS; + } +} + +static bool SemaTryWait(Sema* sp) { + return SemaTimedWait(sp, 0); +} + +static bool SemaTimedWait(Sema* sp, u64 timeout_usecs) { + mach_timespec_t ts; + ts.tv_sec = (u32)(timeout_usecs / USECS_IN_1_SEC); + ts.tv_nsec = (int)((timeout_usecs % USECS_IN_1_SEC) * 1000); + // Note: + // semaphore_wait_deadline was introduced in macOS 10.6 + // semaphore_timedwait was introduced in macOS 10.10 + // https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/ + // APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html + semaphore_t s = *(semaphore_t*)sp; + while (1) { + kern_return_t rc = semaphore_timedwait(s, ts); + if (rc != KERN_ABORTED) + return rc == KERN_SUCCESS; + // TODO: update ts; subtract time already waited and retry (loop). + // For now, let's just return with an error: + return false; + } +} + +static bool SemaSignal(Sema* sp, u32 count) { + assert(count > 0); + semaphore_t s = *(semaphore_t*)sp; + kern_return_t rc = 0; // KERN_SUCCESS + while (count-- > 0) { + rc += semaphore_signal(s); // == ... + // auto rc1 = semaphore_signal(s); + // if (rc1 != KERN_SUCCESS) { + // rc = rc1; + // } + } + return rc == KERN_SUCCESS; +} + +//--------------------------------------------------------------------------------------------- +#elif defined(__unix__) || defined(USE_UNIX_SEMA) + +// TODO: implementation based on futex (for Linux and OpenBSD). See "__TBB_USE_FUTEX" of oneTBB + +static bool SemaInit(Sema* sp, u32 initcount) { + return sem_init((sem_t*)sp, 0, initcount) == 0; +} + +static void SemaDispose(Sema* sp) { + sem_destroy((sem_t*)sp); +} + +static bool SemaWait(Sema* sp) { + // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error + int rc; + do { + rc = sem_wait((sem_t*)sp); + } while (rc == -1 && errno == EINTR); + return rc == 0; +} + +static bool SemaTryWait(Sema* sp) { + int rc; + do { + rc = sem_trywait((sem_t*)sp); + } while (rc == -1 && errno == EINTR); + return rc == 0; +} + +static bool SemaTimedWait(Sema* sp, u64 timeout_usecs) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += (time_t)(timeout_usecs / USECS_IN_1_SEC); + ts.tv_nsec += (long)(timeout_usecs % USECS_IN_1_SEC) * 1000; + // sem_timedwait bombs if you have more than 1e9 in tv_nsec + // so we have to clean things up before passing it in + if (ts.tv_nsec >= NSECS_IN_1_SEC) { + ts.tv_nsec -= NSECS_IN_1_SEC; + ++ts.tv_sec; + } + int rc; + do { + rc = sem_timedwait((sem_t*)sp, &ts); + } while (rc == -1 && errno == EINTR); + return rc == 0; +} + +static bool SemaSignal(Sema* sp, u32 count) { + assert(count > 0); + while (count-- > 0) { + while (sem_post((sem_t*)sp) == -1) { + return false; + } + } + return true; +} + +//--------------------------------------------------------------------------------------------- +#endif /* system */ +// end of Sema implementations + +//--------------------------------------------------------------------------------------------- +// hlsem_t + +// LSEMA_MAX_SPINS is the upper limit of how many times to retry a CAS while spinning. +// After LSEMA_MAX_SPINS CAS attempts has failed (not gotten a signal), the implementation +// falls back on calling hlsem_Wait. +// +// The number 10000 has been choosen by looking at contention between a few threads competing +// for signal & wait on macOS 10.15 x86_64. In most observed cases two threads with zero overhead +// racing to wait usually spends around 200–3000 loop cycles before succeeding. (clang -O0) +// +#define LSEMA_MAX_SPINS 10000 + +static bool _hlsem_waitpartialspin(hlsem_t* s, uint64_t timeout_usecs) { + ssize_t oldCount; + int spin = LSEMA_MAX_SPINS; + while (--spin >= 0) { + oldCount = atomic_load_explicit(&s->count, memory_order_relaxed); + if (oldCount > 0 && atomic_compare_exchange_strong_explicit(&s->count, &oldCount, oldCount - 1, memory_order_acq_rel, memory_order_release)) + return true; + // Prevent the compiler from collapsing the loop + // [rsms]: Is this really needed? Find out. I think both clang and gcc will avoid + // messing with loops that contain atomic ops, + //__asm__ volatile("" ::: "memory"); + atomic_signal_fence(memory_order_acquire); + } + oldCount = atomic_fetch_sub_explicit(&s->count, 1, memory_order_acquire); + if (oldCount > 0) return true; + if (timeout_usecs == 0) { + if (hlsem_Wait(&s->sema)) return true; + } + if (timeout_usecs > 0 && hlsem_timedwait(&s->sema, timeout_usecs)) return true; + // At this point, we've timed out waiting for the semaphore, but the + // count is still decremented indicating we may still be waiting on + // it. So we have to re-adjust the count, but only if the semaphore + // wasn't signaled enough times for us too since then. If it was, we + // need to release the semaphore too. + while (1) { + oldCount = atomic_load_explicit(&s->count, memory_order_acquire); + if (oldCount >= 0 && hlsem_trywait(&s->sema)) return true; + if (oldCount < 0 && atomic_compare_exchange_strong_explicit(&s->count, &oldCount, oldCount + 1,memory_order_relaxed,memory_order_relaxed)) return false; + + + } +} + +bool hlsem_init(hlsem_t* s, uint32_t initcount) { + s->count = ATOMIC_VAR_INIT(initcount); + return hlsem_init(&s->sema, initcount); +} + +void hlsem_destroy(hlsem_t* s) { + hlsem_destroy(&s->sema); +} + +bool hlsem_wait(hlsem_t* s) { + return hlsem_trywait(s) || _hlsem_waitpartialspin(s, 0); +} + +bool hlsem_trywait(hlsem_t* s) { + ssize_t oldCount = atomic_load_explicit(&s->count, memory_order_relaxed); + while (oldCount > 0) { + if (atomic_compare_exchange_weak_explicit(&s->count, &oldCount, oldCount - 1, memory_order_acquire, memory_order_relaxed)) { + return true; + } + } + return false; +} + +bool hlsem_timedwait(hlsem_t* s, uint64_t timeout_usecs) { + return hlsem_TryWait(s) || _hlsem_WaitPartialSpin(s, timeout_usecs); +} + +void hlsem_signal(hlsem_t* s, uint32_t count) { + assert(count > 0); + ssize_t oldCount = atomic_fetch_add_explicit(&s->count, (ssize_t)count, memory_order_release); + ssize_t toRelease = -oldCount < count ? -oldCount : (ssize_t)count; + if (toRelease > 0) hlsem_signal(&s->sema, (uint32_t)toRelease); +} + +size_t hlsem_ApproxAvail(hlsem_t* s) { + ssize_t count = atomic_load_explicit(&s->count, memory_order_relaxed); + return count > 0 ? (size_t)(count) : 0; +} + + diff --git a/ww/eventloop/base/hmutex.h b/ww/eventloop/base/hmutex.h index cc472b1f..ba917b05 100644 --- a/ww/eventloop/base/hmutex.h +++ b/ww/eventloop/base/hmutex.h @@ -232,6 +232,26 @@ static inline int hsem_wait_for(hsem_t* sem, unsigned int ms) { #endif +// hlsem_t is a "light-weight" semaphore which is more efficient than Sema under +// high-contention condition, by avoiding syscalls. +// Waiting when there's already a signal available is extremely cheap and involves +// no syscalls. If there's no signal the implementation will retry by spinning for +// a short while before eventually falling back to Sema. +typedef struct hlsem_s { + atomic_llong count; + hsem_t sema; +} hlsem_t; + +// returns false if system impl failed (rare) +bool hlsem_init(hlsem_t*, uint32_t initcount); +void hlsem_destroy(hlsem_t*); +bool hlsem_wait(hlsem_t*); +bool hlsem_trywait(hlsem_t*); +bool hlsem_timedwait(hlsem_t*, uint64_t timeout_usecs); +void hlsem_signal(hlsem_t*, uint32_t count /*must be >0*/); +size_t hlsem_approxavail(hlsem_t*); + + #define kYieldProcessorTries 1000 diff --git a/ww/managers/node_manager.c b/ww/managers/node_manager.c index 1aff1a26..b336dee9 100644 --- a/ww/managers/node_manager.c +++ b/ww/managers/node_manager.c @@ -243,7 +243,7 @@ static void startParsingFiles() exit(1); } getStringFromJsonObject(&(new_node->next), node_json, "next"); - getIntFromJsonObjectOrDefault(&(new_node->version), node_json, "version", 0); + getIntFromJsonObjectOrDefault((int*)&(new_node->version), node_json, "version", 0); registerNode(new_node, cJSON_GetObjectItemCaseSensitive(node_json, "settings")); } cycleProcess(); diff --git a/ww/shiftbuffer.h b/ww/shiftbuffer.h index 46cdfb24..fb7351ca 100644 --- a/ww/shiftbuffer.h +++ b/ww/shiftbuffer.h @@ -28,13 +28,13 @@ struct shift_buffer_s { + unsigned int *refc; + char *pbuf; unsigned int calc_len; unsigned int lenpos; unsigned int curpos; unsigned int cap; // half of full cap unsigned int full_cap; - unsigned int *refc; - char *pbuf; }; typedef struct shift_buffer_s shift_buffer_t;