-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pipe_push blocking #6
Comments
As far as I know, pipe_push does block if full. Do you have an example showing it doesn't? |
Hi @cgaebel, Here's an example where
Furthermore, if you increase the
Then this assertion is thrown:
|
Any news? |
I reiterate that the example above does not block when built against |
The reason to use the branch `lock_free` is because the `master` has the issue of not blocking on `pipe_push()` when a pipe is full. See cgaebel/pipe#6 (comment)
Was this going to be fixed? |
I found the issue is caused by there is a minimum capacity of the pipe which is 32, defined by |
It still issues an assertion error even if using larger value (for example 32). I've modified the unit test: DEF_TEST(issue_5)
{
// static const int NUM = 32;
#define NUM 64
pipe_t* pipe = pipe_new(sizeof(int), 32);
pipe_producer_t* p = pipe_producer_new(pipe);
pipe_consumer_t* c = pipe_consumer_new(pipe);
pipe_free(pipe);
int data[NUM];
for(int i=0; i < NUM; ++i)
{
data[i] = i;
pipe_push(p, data, 1);
}
// pipe_push(p, data, NUM);
pipe_producer_free(p);
int buf[NUM];
size_t ret = pipe_pop(c, buf, NUM);
assert(ret == NUM);
for(int i=0; i < NUM; ++i)
assert(buf[i] == data[i]);
pipe_consumer_free(c);
} Stack failure:
|
The following patch:
diff --git a/pipe.c b/pipe.c
index 9af421a..e3c8c26 100644
--- a/pipe.c
+++ b/pipe.c
@@ -514,7 +514,7 @@ static inline void check_invariants(pipe_t* p)
if(s.begin == s.end)
assertume(bytes_in_use(s) == capacity(s));
- assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
+ //assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
assertume(in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap));
}
@@ -743,6 +743,8 @@ static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
elem_size = __pipe_elem_size(p);
assertume(new_size >= bytes_in_use(make_snapshot(p)));
+ assertume(new_size + elem_size > new_size); // overflow
+ new_size += elem_size; // include sentinel
if(unlikely(new_size >= max_cap))
new_size = max_cap;
@@ -750,13 +752,13 @@ static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
if(new_size <= min_cap)
return make_snapshot(p);
- char* new_buf = malloc(new_size + elem_size);
+ char* new_buf = malloc(new_size);
p->end = copy_pipe_into_new_buf(make_snapshot(p), new_buf);
p->begin =
p->buffer = (free(p->buffer), new_buf);
- p->bufend = new_buf + new_size + elem_size;
+ p->bufend = new_buf + new_size;
check_invariants(p);
@@ -782,7 +784,7 @@ static inline snapshot_t validate_size(pipe_t* p,
size_t elems_needed = bytes_needed / elem_size;
if(likely(bytes_needed > cap))
- s = resize_buffer(p, next_pow2(elems_needed+1)*elem_size);
+ s = resize_buffer(p, next_pow2(elems_needed)*elem_size);
}
// Unlock the pipe if requested.
@@ -844,9 +846,11 @@ static inline snapshot_t wait_for_room(pipe_t* p, size_t* max_cap)
size_t consumer_refcount = p->consumer_refcount;
+ size_t elem_size = __pipe_elem_size(p);
+
*max_cap = p->max_cap;
- for(; unlikely(bytes_used == *max_cap) && likely(consumer_refcount > 0);
+ for(; unlikely(bytes_used + elem_size >= *max_cap) && likely(consumer_refcount > 0);
s = make_snapshot(p),
bytes_used = bytes_in_use(s),
consumer_refcount = p->consumer_refcount,
@@ -883,7 +887,7 @@ void __pipe_push(pipe_t* p,
// Finally, we can now begin with pushing as many elements into the
// queue as possible.
p->end = process_push(s, elems,
- pushed = min(count, max_cap - bytes_in_use(s)));
+ pushed = min(count, capacity(s) - bytes_in_use(s)));
} mutex_unlock(&p->end_lock);
assertume(pushed > 0);
diff --git a/pipe_test.c b/pipe_test.c
index 6c8781d..57d160b 100644
--- a/pipe_test.c
+++ b/pipe_test.c
@@ -29,6 +29,10 @@
#include <stdlib.h>
#include <string.h>
+#ifdef __GNUC__
+#pragma GCC diagnostic ignored "-Wunused-result"
+#endif
+
#define UNUSED_PARAMETER(var) (var) = (var)
// All this hackery is just to get asserts to work in release build.
@@ -245,6 +249,145 @@ DEF_TEST(issue_5)
pipe_consumer_free(c);
}
+// set max cap (not infinite)
+DEF_TEST(issue_6_a)
+{
+ static const int NUM = 32;
+ pipe_t* pipe = pipe_new(sizeof(int), NUM);
+ pipe_producer_t* p = pipe_producer_new(pipe);
+ pipe_consumer_t* c = pipe_consumer_new(pipe);
+ pipe_free(pipe);
+
+ int data[NUM];
+ for(int i=0; i < NUM; ++i)
+ data[i] = i;
+ pipe_push(p, data, NUM);
+ pipe_producer_free(p);
+
+ int buf[NUM];
+ size_t ret = pipe_pop(c, buf, NUM);
+ assert(ret == NUM);
+ for(int i=0; i < NUM; ++i)
+ assert(buf[i] == data[i]);
+
+ pipe_consumer_free(c);
+}
+
+// set smaller min cap
+DEF_TEST(issue_6_b)
+{
+ static const int NUM = 16;
+ pipe_t* pipe = pipe_new(sizeof(int), NUM * 2);
+ pipe_reserve(PIPE_GENERIC(pipe), NUM);
+ pipe_producer_t* p = pipe_producer_new(pipe);
+ pipe_consumer_t* c = pipe_consumer_new(pipe);
+ pipe_free(pipe);
+
+ int data[NUM];
+ for(int i=0; i < NUM; ++i)
+ data[i] = i;
+ pipe_push(p, data, NUM);
+ pipe_producer_free(p);
+
+ int buf[NUM];
+ size_t ret = pipe_pop(c, buf, NUM);
+ assert(ret == NUM);
+ for(int i=0; i < NUM; ++i)
+ assert(buf[i] == data[i]);
+
+ pipe_consumer_free(c);
+}
+
+#ifdef _WIN32 // use the native win32 API on Windows
+
+#include <windows.h>
+
+#define thread_create(f, p) CloseHandle( \
+ CreateThread(NULL, \
+ 0, \
+ (LPTHREAD_START_ROUTINE)(f), \
+ (p), \
+ 0, \
+ NULL))
+
+#define thread_sleep(s) Sleep((s) * 1000)
+
+#else // fall back on pthreads
+
+#include <pthread.h>
+#include <unistd.h>
+
+static inline void thread_create(void *(*f) (void*), void* p)
+{
+ pthread_t t;
+ pthread_create(&t, NULL, f, p);
+}
+
+#define thread_sleep(s) sleep(s)
+
+#endif
+
+typedef struct __issue_6_t {
+ pipe_consumer_t* c;
+ int writing;
+ int read;
+} issue_6_t;
+
+static void* process_pipe_issue_6_c(void* param)
+{
+ static const int NUM = 32;
+ issue_6_t* v = (issue_6_t *)param;
+
+ //printf("Consumer waiting for a bit ...\n");
+ //printf("Consumer starts to read pipe ...\n");
+ thread_sleep(1);
+ assert(v->writing); // producer still writing, blocked from finishing
+ int buf[NUM];
+ size_t ret = pipe_pop(v->c, buf, NUM);
+ assert(ret == NUM);
+ for(int i=0; i < NUM; ++i)
+ assert(buf[i] == i);
+
+ v->read = NUM;
+ pipe_consumer_free(v->c);
+ return NULL;
+}
+
+// producer blocking on push
+// Note: pipe rounds up to power of 2 so initial
+// value of 32 is rounded up to 64 so we block on
+// writing 64 values, not 32
+DEF_TEST(issue_6_c)
+{
+ static const int NUM = 32;
+ pipe_t* pipe = pipe_new(sizeof(int), NUM);
+ pipe_producer_t* p = pipe_producer_new(pipe);
+
+ issue_6_t* params = malloc(sizeof(*params));
+ memset(params, 0, sizeof(*params));
+ params->c = pipe_consumer_new(pipe);
+
+ pipe_free(pipe);
+
+ thread_create(&process_pipe_issue_6_c, params);
+
+ int data[NUM];
+ for(int i=0; i < NUM; ++i)
+ data[i] = i;
+ //printf("Producer pushing ok ...\n");
+ params->writing = 1;
+ pipe_push(p, data, NUM);
+ //printf("Producer pushing should be blocked ...\n");
+ pipe_push(p, data, NUM);
+ //printf("Producer unblocked ...\n");
+ params->writing = 0;
+ thread_sleep(1);
+ assert(params->read == NUM);
+
+ free(params);
+ pipe_producer_free(p);
+}
+
/*
// This test is only legal if DEFAULT_MINCAP is less than or equal to 8.
//
@@ -307,6 +450,9 @@ void pipe_run_test_suite(void)
RUN_TEST(parallel_multiplier);
RUN_TEST(issue_4);
RUN_TEST(issue_5);
+ RUN_TEST(issue_6_a);
+ RUN_TEST(issue_6_b);
+ RUN_TEST(issue_6_c);
/*
#ifdef PIPE_DEBUG
RUN_TEST(clobbering); |
applied in https://github.com/mgood7123/pipe also see #11 you do not join your threads, which also creates a memory leak |
There is a commit on https://github.com/git-blame/pipe. |
Any way of blocking pipe_push if the pipe is full?
The text was updated successfully, but these errors were encountered: