From efe339e8fe3031d68c859f8060c0ccfeb5c8a4f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sa=C3=BAl=20Ibarra=20Corretg=C3=A9?= Date: Mon, 15 Jul 2024 23:41:46 +0200 Subject: [PATCH 1/2] worker: allow sending huge messages When sending a payload, prefix it with its length. Then on the receiving end, we read the length and the entire buffer in chunks. --- src/worker.c | 124 +++++++++++++++++++++++++---- tests/helpers/worker-echo.js | 3 + tests/test-worker-large-payload.js | 26 ++++++ 3 files changed, 139 insertions(+), 14 deletions(-) create mode 100644 tests/test-worker-large-payload.js diff --git a/src/worker.c b/src/worker.c index 7e1414c3..01376a8c 100644 --- a/src/worker.c +++ b/src/worker.c @@ -26,6 +26,7 @@ #include "private.h" #include "tjs.h" +#include #include extern const uint8_t tjs__worker_bootstrap[]; @@ -46,12 +47,22 @@ typedef struct { uv_stream_t stream; uv_tcp_t tcp; } h; + struct { + bool partial; + uint8_t *data; + size_t total_len; + size_t read_len; + } reading; JSValue events[MSGPIPE_EVENT_MAX]; } TJSMessagePipe; typedef struct { uv_write_t req; uint8_t *data; + union { + uint64_t u64; + uint8_t u8[8]; + } data_size; } TJSMessagePipeWriteReq; static void uv__close_cb(uv_handle_t *handle) { @@ -113,12 +124,31 @@ static void emit_msgpipe_event(TJSMessagePipe *p, int event, JSValue arg) { CHECK_EQ(JS_EnqueueJob(ctx, emit_event, 2, (JSValue *) &args), 0); } +static void tjs_msgpipe_read_object(TJSMessagePipe *p, const uint8_t *data, size_t len) { + CHECK_NOT_NULL(p); + + JSContext *ctx = p->ctx; + + int flags = JS_READ_OBJ_REFERENCE; + JSValue obj = JS_ReadObject(ctx, data, len, flags); + if (JS_IsException(obj)) + emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx)); + else + emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj); + JS_FreeValue(ctx, obj); +} + static void uv__alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { TJSMessagePipe *p = handle->data; CHECK_NOT_NULL(p); - buf->base = js_malloc(p->ctx, suggested_size); - buf->len = suggested_size; + if (p->reading.partial) { + buf->base = (char *) p->reading.data + p->reading.read_len; + buf->len = p->reading.total_len - p->reading.read_len; + } else { + buf->base = js_malloc(p->ctx, suggested_size); + buf->len = suggested_size; + } } static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { @@ -129,7 +159,10 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) if (nread < 0) { uv_read_stop(&p->h.stream); - js_free(ctx, buf->base); + if (p->reading.partial) + js_free(ctx, p->reading.data); + else + js_free(ctx, buf->base); if (nread != UV_EOF) { JSValue error = tjs_new_error(ctx, nread); emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, error); @@ -138,15 +171,76 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) return; } - // TODO: the entire object might not have come in a single packet. - int flags = JS_READ_OBJ_REFERENCE; - JSValue obj = JS_ReadObject(ctx, (const uint8_t *) buf->base, buf->len, flags); - if (JS_IsException(obj)) - emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx)); - else - emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj); - JS_FreeValue(ctx, obj); - js_free(ctx, buf->base); + if (!p->reading.partial) { + union { + uint64_t u64; + uint8_t u8[8]; + } u; + size_t len_size = sizeof(u.u8); + + /* This is a bogus read, likely a zero-read. Just return the buffer. */ + if (nread < len_size) { + js_free(ctx, buf->base); + return; + } + + memcpy(u.u8, buf->base, len_size); + size_t total_len = u.u64; + + /* Check if the total length exceeds what we got in this single read operation. */ + if (total_len > nread - len_size) { + size_t read_len = nread - len_size; + + p->reading.data = js_malloc(ctx, total_len); + if (!p->reading.data) { + js_free(ctx, buf->base); + return; + } + + p->reading.partial = true; + p->reading.total_len = total_len; + p->reading.read_len = read_len; + memcpy(p->reading.data, buf->base + len_size, read_len); + js_free(ctx, buf->base); + + return; + } + + /* We managed to read the whole packet in a single read operation. */ + tjs_msgpipe_read_object(p, (const uint8_t *) buf->base + len_size, total_len); + size_t remaining = nread - len_size - total_len; + if (remaining > 0) { + char *data = js_malloc(ctx, remaining); + if (!data) { + goto fail1; + } + memcpy(data, buf->base + len_size + total_len, remaining); + uv_buf_t b = uv_buf_init(data, remaining); + uv__read_cb(handle, remaining, &b); + } + + fail1:; + js_free(ctx, buf->base); + + return; + } + + /* We are continuing a partial read. */ + size_t remaining = p->reading.total_len - p->reading.read_len; + size_t chunk_size = remaining <= nread ? remaining : nread; + memcpy(p->reading.data + p->reading.read_len, buf->base, chunk_size); + p->reading.read_len += chunk_size; + + if (p->reading.read_len < p->reading.total_len) { + /* We still need to read more. */ + + return; + } + + /* We have a complete buffer now. */ + tjs_msgpipe_read_object(p, (const uint8_t *) p->reading.data, p->reading.total_len); + js_free(ctx, p->reading.data); + memset(&p->reading, 0, sizeof(p->reading)); } static JSValue tjs_new_msgpipe(JSContext *ctx, uv_os_sock_t fd) { @@ -211,9 +305,11 @@ static JSValue tjs_msgpipe_postmessage(JSContext *ctx, JSValue this_val, int arg wr->req.data = wr; wr->data = buf; + wr->data_size.u64 = len; - uv_buf_t b = uv_buf_init((char *) buf, len); - int r = uv_write(&wr->req, &p->h.stream, &b, 1, uv__write_cb); + uv_buf_t bufs[2] = { uv_buf_init((char *) wr->data_size.u8, sizeof(wr->data_size.u8)), + uv_buf_init((char *) buf, len) }; + int r = uv_write(&wr->req, &p->h.stream, bufs, 2, uv__write_cb); if (r != 0) { js_free(ctx, buf); js_free(ctx, wr); diff --git a/tests/helpers/worker-echo.js b/tests/helpers/worker-echo.js index 5dd2e859..10def5d9 100644 --- a/tests/helpers/worker-echo.js +++ b/tests/helpers/worker-echo.js @@ -2,3 +2,6 @@ addEventListener('message', function(e) { postMessage(e.data); }); +addEventListener('messageerror', function(e) { + throw new Error(`Opps! ${e}`); +}); diff --git a/tests/test-worker-large-payload.js b/tests/test-worker-large-payload.js new file mode 100644 index 00000000..ea68fb47 --- /dev/null +++ b/tests/test-worker-large-payload.js @@ -0,0 +1,26 @@ +import assert from 'tjs:assert'; +import path from 'tjs:path'; + + +const data = { + x: new Array(65536).fill('x').join(''), + y: new Array(65536).fill('y').join(''), + z: 1234 +}; +const w = new Worker(path.join(import.meta.dirname, 'helpers', 'worker-echo.js')); +const timer = setTimeout(() => { + w.terminate(); + assert.fail('Timeout out waiting for worker'); +}, 10000); +w.onmessage = event => { + clearTimeout(timer); + const recvData = event.data; + assert.eq(data.x, recvData.x, 'Message received matches'); + assert.eq(data.y, recvData.y, 'Message received matches'); + assert.eq(data.z, recvData.z, 'Message received matches'); + w.terminate(); +}; +w.onmessageerror = event => { + assert.fail(`Error receiving message from worker: ${event}`); +}; +w.postMessage(data); From 9eec75c6e3a05371db2f57283db5a3688db0f4c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sa=C3=BAl=20Ibarra=20Corretg=C3=A9?= Date: Tue, 16 Jul 2024 00:28:50 +0200 Subject: [PATCH 2/2] fixup! simplify --- src/worker.c | 109 +++++++++++++++------------------------------------ 1 file changed, 32 insertions(+), 77 deletions(-) diff --git a/src/worker.c b/src/worker.c index 01376a8c..0c3bd45f 100644 --- a/src/worker.c +++ b/src/worker.c @@ -48,10 +48,12 @@ typedef struct { uv_tcp_t tcp; } h; struct { - bool partial; + union { + uint64_t u64; + uint8_t u8[8]; + } total_size; uint8_t *data; - size_t total_len; - size_t read_len; + uint64_t nread; } reading; JSValue events[MSGPIPE_EVENT_MAX]; } TJSMessagePipe; @@ -124,30 +126,17 @@ static void emit_msgpipe_event(TJSMessagePipe *p, int event, JSValue arg) { CHECK_EQ(JS_EnqueueJob(ctx, emit_event, 2, (JSValue *) &args), 0); } -static void tjs_msgpipe_read_object(TJSMessagePipe *p, const uint8_t *data, size_t len) { - CHECK_NOT_NULL(p); - - JSContext *ctx = p->ctx; - - int flags = JS_READ_OBJ_REFERENCE; - JSValue obj = JS_ReadObject(ctx, data, len, flags); - if (JS_IsException(obj)) - emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx)); - else - emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj); - JS_FreeValue(ctx, obj); -} - static void uv__alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { TJSMessagePipe *p = handle->data; CHECK_NOT_NULL(p); - if (p->reading.partial) { - buf->base = (char *) p->reading.data + p->reading.read_len; - buf->len = p->reading.total_len - p->reading.read_len; + if (p->reading.data) { + buf->base = (char *) p->reading.data + p->reading.nread; + uint64_t remaining = p->reading.total_size.u64 - p->reading.nread; + buf->len = remaining > suggested_size ? suggested_size : remaining; } else { - buf->base = js_malloc(p->ctx, suggested_size); - buf->len = suggested_size; + buf->base = (char *) p->reading.total_size.u8; + buf->len = sizeof(p->reading.total_size.u8); } } @@ -159,10 +148,9 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) if (nread < 0) { uv_read_stop(&p->h.stream); - if (p->reading.partial) + if (p->reading.data) js_free(ctx, p->reading.data); - else - js_free(ctx, buf->base); + memset(&p->reading, 0, sizeof(p->reading)); if (nread != UV_EOF) { JSValue error = tjs_new_error(ctx, nread); emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, error); @@ -171,74 +159,41 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) return; } - if (!p->reading.partial) { - union { - uint64_t u64; - uint8_t u8[8]; - } u; - size_t len_size = sizeof(u.u8); + if (!p->reading.data) { + size_t len_size = sizeof(p->reading.total_size.u8); /* This is a bogus read, likely a zero-read. Just return the buffer. */ - if (nread < len_size) { - js_free(ctx, buf->base); + if (nread != len_size) return; - } - - memcpy(u.u8, buf->base, len_size); - size_t total_len = u.u64; - - /* Check if the total length exceeds what we got in this single read operation. */ - if (total_len > nread - len_size) { - size_t read_len = nread - len_size; - - p->reading.data = js_malloc(ctx, total_len); - if (!p->reading.data) { - js_free(ctx, buf->base); - return; - } - - p->reading.partial = true; - p->reading.total_len = total_len; - p->reading.read_len = read_len; - memcpy(p->reading.data, buf->base + len_size, read_len); - js_free(ctx, buf->base); - - return; - } - - /* We managed to read the whole packet in a single read operation. */ - tjs_msgpipe_read_object(p, (const uint8_t *) buf->base + len_size, total_len); - size_t remaining = nread - len_size - total_len; - if (remaining > 0) { - char *data = js_malloc(ctx, remaining); - if (!data) { - goto fail1; - } - memcpy(data, buf->base + len_size + total_len, remaining); - uv_buf_t b = uv_buf_init(data, remaining); - uv__read_cb(handle, remaining, &b); - } - fail1:; - js_free(ctx, buf->base); + uint64_t total_size = p->reading.total_size.u64; + CHECK_GE(total_size, 0); + p->reading.data = js_malloc(ctx, total_size); return; } /* We are continuing a partial read. */ - size_t remaining = p->reading.total_len - p->reading.read_len; - size_t chunk_size = remaining <= nread ? remaining : nread; - memcpy(p->reading.data + p->reading.read_len, buf->base, chunk_size); - p->reading.read_len += chunk_size; + uint64_t total_size = p->reading.total_size.u64; + p->reading.nread += nread; - if (p->reading.read_len < p->reading.total_len) { + if (p->reading.nread < total_size) { /* We still need to read more. */ return; } + CHECK_EQ(p->reading.nread, total_size); + /* We have a complete buffer now. */ - tjs_msgpipe_read_object(p, (const uint8_t *) p->reading.data, p->reading.total_len); + int flags = JS_READ_OBJ_REFERENCE; + JSValue obj = JS_ReadObject(ctx, (const uint8_t *) p->reading.data, total_size, flags); + if (JS_IsException(obj)) + emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx)); + else + emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj); + JS_FreeValue(ctx, obj); + js_free(ctx, p->reading.data); memset(&p->reading, 0, sizeof(p->reading)); }