Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipe_push blocking #6

Open
edwar64896 opened this issue Dec 23, 2016 · 10 comments
Open

pipe_push blocking #6

edwar64896 opened this issue Dec 23, 2016 · 10 comments

Comments

@edwar64896
Copy link

Any way of blocking pipe_push if the pipe is full?

@cgaebel
Copy link
Owner

cgaebel commented Dec 24, 2016

As far as I know, pipe_push does block if full. Do you have an example showing it doesn't?

@theojepsen
Copy link

Hi @cgaebel,

Here's an example where pipe_push doesn't block:

#include <stdio.h>
#include <stdlib.h>
#include "pipe.h"

int main() {
    pipe_t* pipe = pipe_new(sizeof(char), 10);
    pipe_producer_t* p = pipe_producer_new(pipe);

    char a[2000];
    for (int i = 0; i < 20; i++)
        pipe_push(p, a+i, 1);

    printf("Didn't block\n");

    return 0;
}

Furthermore, if you increase the limit in the code above, for example to 100:

    ...
    pipe_t* pipe = pipe_new(sizeof(char), 100);
    ...
    for (int i = 0; i < 200; i++)
    ...

Then this assertion is thrown:

push_block: pipe.c:518: check_invariants: Assertion `in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)' failed.

@pbtrung
Copy link

pbtrung commented Jan 6, 2018

Any news?
Thanks.

@1pakch
Copy link

1pakch commented Apr 30, 2018

I reiterate that the example above does not block when built against master. When built against single_mutex and lock_free branches the example above blocks, however.

1pakch added a commit to 1pakch/hice that referenced this issue May 1, 2018
The reason to use the branch `lock_free` is because the `master` has
the issue of not blocking on `pipe_push()` when a pipe is full. See
cgaebel/pipe#6 (comment)
@RSully
Copy link

RSully commented May 14, 2018

Was this going to be fixed?

@gpenghe
Copy link

gpenghe commented Jul 2, 2018

I found the issue is caused by there is a minimum capacity of the pipe which is 32, defined by DEFAULT_MINCAP. So even when the test code above sets the limit to 10, it's changed to 32 in pipe_new(). So if you change the push count from 20 to a number larger than 32, it will block as expected.

@git-blame
Copy link

It still issues an assertion error even if using larger value (for example 32). I've modified the unit test:

DEF_TEST(issue_5)
{
//  static const int NUM = 32;
#define NUM 64
  pipe_t* pipe = pipe_new(sizeof(int), 32);
  pipe_producer_t* p = pipe_producer_new(pipe);
  pipe_consumer_t* c = pipe_consumer_new(pipe);
  pipe_free(pipe);

  int data[NUM];
  for(int i=0; i < NUM; ++i)
  {
    data[i] = i;
    pipe_push(p, data, 1);
  }
//  pipe_push(p, data, NUM);
  pipe_producer_free(p);

  int buf[NUM];
  size_t ret = pipe_pop(c, buf, NUM);
  assert(ret == NUM);
  for(int i=0; i < NUM; ++i)
    assert(buf[i] == data[i]);

  pipe_consumer_free(c);
}

Stack failure:

233	  pipe_t* pipe = pipe_new(sizeof(int), 32);
(gdb) c
Continuing.
pipe_test: pipe.c:519: void check_invariants(pipe_t *): Assertion `in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)' failed.
issue_5 ->
Program received signal SIGABRT, Aborted.
__GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
51	../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
#1  0x00007ffff734942a in __GI_abort () at abort.c:89
#2  0x00007ffff7340e67 in __assert_fail_base (fmt=<optimized out>,
    assertion=assertion@entry=0x403e35 "in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)",
    file=file@entry=0x403b7f "pipe.c", line=line@entry=519,
    function=function@entry=0x403c9b "void check_invariants(pipe_t *)") at assert.c:92
#3  0x00007ffff7340f12 in __GI___assert_fail (
    assertion=0x403e35 "in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)", file=0x403b7f "pipe.c", line=519,
    function=0x403c9b "void check_invariants(pipe_t *)") at assert.c:101
#4  0x0000000000401d24 in check_invariants (p=0x606420) at pipe.c:519
#5  0x0000000000403289 in resize_buffer (p=0x606420, new_size=256) at pipe.c:762
#6  0x00000000004028b9 in validate_size (p=0x606420, s=..., new_bytes=4) at pipe.c:786
#7  0x0000000000402351 in __pipe_push (p=0x606420, elems=0x7fffffffe1d0, count=4) at pipe.c:883
#8  0x0000000000402b7f in pipe_push (p=0x606420, elems=0x7fffffffe1d0, count=4) at pipe.c:911
#9  0x000000000040139a in test_issue_5 () at pipe_test.c:242
#10 0x00000000004012ea in pipe_run_test_suite () at pipe_test.c:317
#11 0x0000000000400b84 in main () at main.c:5
(gdb) fr 9
#9  0x000000000040139a in test_issue_5 () at pipe_test.c:242
242	    pipe_push(p, data, 1);
(gdb) p i
$19 = 31

@git-blame
Copy link

The following patch:

  • fixes the assertion and non-blocking problem if a non-zero value is used when creating pipe with pipe_new
  • can reduce the mincap to less than DEFAULT_MINCAP with pipe_reserve
diff --git a/pipe.c b/pipe.c
index 9af421a..e3c8c26 100644
--- a/pipe.c
+++ b/pipe.c
@@ -514,7 +514,7 @@ static inline void check_invariants(pipe_t* p)
     if(s.begin == s.end)
         assertume(bytes_in_use(s) == capacity(s));
 
-    assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
+    //assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
     assertume(in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap));
 }
 
@@ -743,6 +743,8 @@ static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
                  elem_size = __pipe_elem_size(p);
 
     assertume(new_size >= bytes_in_use(make_snapshot(p)));
+    assertume(new_size + elem_size > new_size); // overflow
+    new_size += elem_size; // include sentinel
 
     if(unlikely(new_size >= max_cap))
         new_size = max_cap;
@@ -750,13 +752,13 @@ static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
     if(new_size <= min_cap)
         return make_snapshot(p);
 
-    char* new_buf = malloc(new_size + elem_size);
+    char* new_buf = malloc(new_size);
     p->end = copy_pipe_into_new_buf(make_snapshot(p), new_buf);
 
     p->begin  =
     p->buffer = (free(p->buffer), new_buf);
 
-    p->bufend = new_buf + new_size + elem_size;
+    p->bufend = new_buf + new_size;
 
     check_invariants(p);
 
@@ -782,7 +784,7 @@ static inline snapshot_t validate_size(pipe_t* p,
             size_t elems_needed = bytes_needed / elem_size;
 
             if(likely(bytes_needed > cap))
-                s = resize_buffer(p, next_pow2(elems_needed+1)*elem_size);
+                s = resize_buffer(p, next_pow2(elems_needed)*elem_size);
         }
 
         // Unlock the pipe if requested.
@@ -844,9 +846,11 @@ static inline snapshot_t wait_for_room(pipe_t* p, size_t* max_cap)
 
     size_t consumer_refcount = p->consumer_refcount;
 
+    size_t elem_size = __pipe_elem_size(p);
+
     *max_cap = p->max_cap;
 
-    for(; unlikely(bytes_used == *max_cap) && likely(consumer_refcount > 0);
+    for(; unlikely(bytes_used + elem_size >= *max_cap) && likely(consumer_refcount > 0);
           s                 = make_snapshot(p),
           bytes_used        = bytes_in_use(s),
           consumer_refcount = p->consumer_refcount,
@@ -883,7 +887,7 @@ void __pipe_push(pipe_t* p,
         // Finally, we can now begin with pushing as many elements into the
         // queue as possible.
         p->end = process_push(s, elems,
-                     pushed = min(count, max_cap - bytes_in_use(s)));
+                     pushed = min(count, capacity(s) - bytes_in_use(s)));
     } mutex_unlock(&p->end_lock);
 
     assertume(pushed > 0);
diff --git a/pipe_test.c b/pipe_test.c
index 6c8781d..57d160b 100644
--- a/pipe_test.c
+++ b/pipe_test.c
@@ -29,6 +29,10 @@
 #include <stdlib.h>
 #include <string.h>
 
+#ifdef __GNUC__
+#pragma GCC diagnostic ignored "-Wunused-result"
+#endif
+
 #define UNUSED_PARAMETER(var) (var) = (var)
 
 // All this hackery is just to get asserts to work in release build.
@@ -245,6 +249,145 @@ DEF_TEST(issue_5)
   pipe_consumer_free(c);
 }
 
+// set max cap (not infinite)
+DEF_TEST(issue_6_a)
+{
+  static const int NUM = 32;
+  pipe_t* pipe = pipe_new(sizeof(int), NUM);
+  pipe_producer_t* p = pipe_producer_new(pipe);
+  pipe_consumer_t* c = pipe_consumer_new(pipe);
+  pipe_free(pipe);
+
+  int data[NUM];
+  for(int i=0; i < NUM; ++i)
+    data[i] = i;
+  pipe_push(p, data, NUM);
+  pipe_producer_free(p);
+
+  int buf[NUM];
+  size_t ret = pipe_pop(c, buf, NUM);
+  assert(ret == NUM);
+  for(int i=0; i < NUM; ++i)
+    assert(buf[i] == data[i]);
+
+  pipe_consumer_free(c);
+}
+
+// set smaller min cap
+DEF_TEST(issue_6_b)
+{
+  static const int NUM = 16;
+  pipe_t* pipe = pipe_new(sizeof(int), NUM * 2);
+  pipe_reserve(PIPE_GENERIC(pipe), NUM);
+  pipe_producer_t* p = pipe_producer_new(pipe);
+  pipe_consumer_t* c = pipe_consumer_new(pipe);
+  pipe_free(pipe);
+
+  int data[NUM];
+  for(int i=0; i < NUM; ++i)
+    data[i] = i;
+  pipe_push(p, data, NUM);
+  pipe_producer_free(p);
+
+  int buf[NUM];
+  size_t ret = pipe_pop(c, buf, NUM);
+  assert(ret == NUM);
+  for(int i=0; i < NUM; ++i)
+    assert(buf[i] == data[i]);
+
+  pipe_consumer_free(c);
+}
+
+#ifdef _WIN32 // use the native win32 API on Windows
+
+#include <windows.h>
+
+#define thread_create(f, p) CloseHandle(            \
+        CreateThread(NULL,                          \
+                     0,                             \
+                     (LPTHREAD_START_ROUTINE)(f),   \
+                     (p),                           \
+                     0,                             \
+                     NULL))
+
+#define thread_sleep(s) Sleep((s) * 1000)
+
+#else // fall back on pthreads
+
+#include <pthread.h>
+#include <unistd.h>
+
+static inline void thread_create(void *(*f) (void*), void* p)
+{
+    pthread_t t;
+    pthread_create(&t, NULL, f, p);
+}
+
+#define thread_sleep(s) sleep(s)
+
+#endif
+
+typedef struct __issue_6_t {
+  pipe_consumer_t* c;
+  int writing;
+  int read;
+} issue_6_t;
+
+static void* process_pipe_issue_6_c(void* param)
+{
+  static const int NUM = 32;
+  issue_6_t* v = (issue_6_t *)param;
+
+  //printf("Consumer waiting for a bit ...\n");
+  //printf("Consumer starts to read pipe ...\n");
+  thread_sleep(1);
+  assert(v->writing); // producer still writing, blocked from finishing
+  int buf[NUM];
+  size_t ret = pipe_pop(v->c, buf, NUM);
+  assert(ret == NUM);
+  for(int i=0; i < NUM; ++i)
+    assert(buf[i] == i);
+
+  v->read = NUM;
+  pipe_consumer_free(v->c);
+  return NULL;
+}
+
+// producer blocking on push
+// Note: pipe rounds up to power of 2 so initial
+// value of 32 is rounded up to 64 so we block on
+// writing 64 values, not 32
+DEF_TEST(issue_6_c)
+{
+  static const int NUM = 32;
+  pipe_t* pipe = pipe_new(sizeof(int), NUM);
+  pipe_producer_t* p = pipe_producer_new(pipe);
+
+  issue_6_t* params = malloc(sizeof(*params));
+  memset(params, 0, sizeof(*params));
+  params->c = pipe_consumer_new(pipe);
+
+  pipe_free(pipe);
+
+  thread_create(&process_pipe_issue_6_c, params);
+
+  int data[NUM];
+  for(int i=0; i < NUM; ++i)
+    data[i] = i;
+  //printf("Producer pushing ok ...\n");
+  params->writing = 1;
+  pipe_push(p, data, NUM);
+  //printf("Producer pushing should be blocked ...\n");
+  pipe_push(p, data, NUM);
+  //printf("Producer unblocked ...\n");
+  params->writing = 0;
+  thread_sleep(1);
+  assert(params->read == NUM);
+
+  free(params);
+  pipe_producer_free(p);
+}
+
 /*
 // This test is only legal if DEFAULT_MINCAP is less than or equal to 8.
 //
@@ -307,6 +450,9 @@ void pipe_run_test_suite(void)
     RUN_TEST(parallel_multiplier);
     RUN_TEST(issue_4);
     RUN_TEST(issue_5);
+    RUN_TEST(issue_6_a);
+    RUN_TEST(issue_6_b);
+    RUN_TEST(issue_6_c);
 /*
 #ifdef PIPE_DEBUG
     RUN_TEST(clobbering);

@mgood7123
Copy link
Contributor

mgood7123 commented Aug 30, 2020

The following patch:

* fixes the assertion and non-blocking problem if a non-zero value is used when creating pipe with `pipe_new`

* can reduce the mincap to less than DEFAULT_MINCAP with `pipe_reserve`

applied in https://github.com/mgood7123/pipe

also see #11

you do not join your threads, which also creates a memory leak

@git-blame
Copy link

There is a commit on https://github.com/git-blame/pipe.
This returns thread handles so that in pipe_test.c you can join the child threads and clean up resources.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants