From d128f752d7c4c7a2fa62d4d532163f4690d466d3 Mon Sep 17 00:00:00 2001 From: SpringMT Date: Fri, 12 Apr 2024 21:55:13 +0900 Subject: [PATCH 1/8] feat: unlock GVL for compress --- benchmarks/multi_thread_comporess.rb | 9 +++- benchmarks/zstd_compress_memory.rb | 2 +- benchmarks/zstd_decompress_memory.rb | 2 +- benchmarks/zstd_streaming_compress_memory.rb | 2 +- .../zstd_streaming_decompress_memory.rb | 2 +- examples/sinatra/Gemfile.lock | 2 +- ext/zstdruby/common.h | 42 +++++++++++++++++-- ext/zstdruby/extconf.rb | 2 +- 8 files changed, 52 insertions(+), 11 deletions(-) diff --git a/benchmarks/multi_thread_comporess.rb b/benchmarks/multi_thread_comporess.rb index 50ccde2..42517e5 100644 --- a/benchmarks/multi_thread_comporess.rb +++ b/benchmarks/multi_thread_comporess.rb @@ -14,12 +14,19 @@ json_string = File.read("./samples/#{sample_file_name}") queue = Queue.new +# queue = [] GUESSES.times { queue << json_string } +# stream = Zstd::StreamingCompress.new(thread_num: THREADS) THREADS.times { queue << nil } THREADS.times.map { Thread.new { while str = queue.pop - Zstd.compress(str) + # stream = Zstd::StreamingCompress.new(thread_num: THREADS) + #stream << str + #stream << str + #stream << str + #stream.flush + Zstd.compress(str, thread_num: 1) end } }.each(&:join) diff --git a/benchmarks/zstd_compress_memory.rb b/benchmarks/zstd_compress_memory.rb index d18e6e9..7edfb20 100644 --- a/benchmarks/zstd_compress_memory.rb +++ b/benchmarks/zstd_compress_memory.rb @@ -10,7 +10,7 @@ sample_file_name = ARGV[0] -json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true) +json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true) json_string = json_data.to_json i = 0 diff --git a/benchmarks/zstd_decompress_memory.rb b/benchmarks/zstd_decompress_memory.rb index 3b35b4f..95aee0c 100644 --- a/benchmarks/zstd_decompress_memory.rb +++ b/benchmarks/zstd_decompress_memory.rb @@ -9,7 +9,7 @@ p "#{ObjectSpace.memsize_of_all/1000} #{ObjectSpace.count_objects} #{`ps -o rss= -p #{Process.pid}`.to_i}" sample_file_name = ARGV[0] -json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true) +json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true) json_string = json_data.to_json i = 0 diff --git a/benchmarks/zstd_streaming_compress_memory.rb b/benchmarks/zstd_streaming_compress_memory.rb index 3ae99cf..82306f4 100644 --- a/benchmarks/zstd_streaming_compress_memory.rb +++ b/benchmarks/zstd_streaming_compress_memory.rb @@ -10,7 +10,7 @@ sample_file_name = ARGV[0] -json_string = IO.read("./samples/#{sample_file_name}") +json_string = File.read("./samples/#{sample_file_name}") i = 0 start_time = Time.now diff --git a/benchmarks/zstd_streaming_decompress_memory.rb b/benchmarks/zstd_streaming_decompress_memory.rb index 166bd98..1c791bc 100644 --- a/benchmarks/zstd_streaming_decompress_memory.rb +++ b/benchmarks/zstd_streaming_decompress_memory.rb @@ -10,7 +10,7 @@ sample_file_name = ARGV[0] -cstr = IO.read("./results/#{sample_file_name}.zstd") +cstr = File.read("./results/#{sample_file_name}.zstd") i = 0 start_time = Time.now while true do diff --git a/examples/sinatra/Gemfile.lock b/examples/sinatra/Gemfile.lock index 8708d15..7f1fa19 100644 --- a/examples/sinatra/Gemfile.lock +++ b/examples/sinatra/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: ../.. specs: - zstd-ruby (1.5.6.1) + zstd-ruby (1.5.6.2) GEM remote: https://rubygems.org/ diff --git a/ext/zstdruby/common.h b/ext/zstdruby/common.h index 8bc953a..a2169e2 100644 --- a/ext/zstdruby/common.h +++ b/ext/zstdruby/common.h @@ -2,6 +2,9 @@ #define ZSTD_RUBY_H 1 #include +#ifdef HAVE_RUBY_THREAD_H +#include +#endif #include "./libzstd/zstd.h" static int convert_compression_level(VALUE compression_level_value) @@ -12,18 +15,40 @@ static int convert_compression_level(VALUE compression_level_value) return NUM2INT(compression_level_value); } +struct compress_params { + ZSTD_CCtx* ctx; + ZSTD_outBuffer* output; + ZSTD_inBuffer* input; + ZSTD_EndDirective endOp; + size_t ret; +}; + +static void* compress_wrapper(void* args) +{ + struct compress_params* params = args; + params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp); + return NULL; +} + static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { - return ZSTD_compressStream2(ctx, output, input, endOp); +#ifdef HAVE_RUBY_THREAD_H + struct compress_params params = { ctx, output, input, endOp }; + rb_thread_call_without_gvl(compress_wrapper, ¶ms, RUBY_UBF_IO, NULL); + return params.ret; +#else + return ZSTD_compressStream2(ctx, output, input, endOp); +#endif } static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VALUE kwargs) { - ID kwargs_keys[2]; + ID kwargs_keys[3]; kwargs_keys[0] = rb_intern("level"); kwargs_keys[1] = rb_intern("dict"); - VALUE kwargs_values[2]; - rb_get_kwargs(kwargs, kwargs_keys, 0, 2, kwargs_values); + kwargs_keys[2] = rb_intern("thread_num"); + VALUE kwargs_values[3]; + rb_get_kwargs(kwargs, kwargs_keys, 0, 3, kwargs_values); int compression_level = ZSTD_CLEVEL_DEFAULT; if (kwargs_values[0] != Qundef && kwargs_values[0] != Qnil) { @@ -43,6 +68,15 @@ static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VAL rb_raise(rb_eRuntimeError, "%s", "ZSTD_CCtx_loadDictionary failed"); } } + + if (kwargs_values[2] != Qundef && kwargs_values[2] != Qnil) { + int thread_num = NUM2INT(kwargs_values[2]); + size_t const r = ZSTD_CCtx_setParameter(ctx, ZSTD_c_nbWorkers, thread_num); + if (ZSTD_isError(r)) { + rb_warn("Note: the linked libzstd library doesn't support multithreading.Reverting to single-thread mode. \n"); + } + // ZSTD_CCtx_setParameter(ctx, ZSTD_c_jobSize, thread_num); + } } static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs) diff --git a/ext/zstdruby/extconf.rb b/ext/zstdruby/extconf.rb index 4575bd3..c470bfe 100644 --- a/ext/zstdruby/extconf.rb +++ b/ext/zstdruby/extconf.rb @@ -2,7 +2,7 @@ have_func('rb_gc_mark_movable') -$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY' +$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY -DZSTD_MULTITHREAD -pthread' $CPPFLAGS += " -fdeclspec" if CONFIG['CXX'] =~ /clang/ Dir.chdir File.expand_path('..', __FILE__) do From e399267f64821d57af516936708a657063bef4aa Mon Sep 17 00:00:00 2001 From: SpringMT Date: Fri, 12 Apr 2024 23:56:28 +0900 Subject: [PATCH 2/8] fix: remove thread_num --- benchmarks/multi_thread_comporess.rb | 12 ++++-------- ext/zstdruby/common.h | 16 +++------------- ext/zstdruby/extconf.rb | 2 +- 3 files changed, 8 insertions(+), 22 deletions(-) diff --git a/benchmarks/multi_thread_comporess.rb b/benchmarks/multi_thread_comporess.rb index 42517e5..473f22a 100644 --- a/benchmarks/multi_thread_comporess.rb +++ b/benchmarks/multi_thread_comporess.rb @@ -4,6 +4,7 @@ require 'objspace' require 'zstd-ruby' require 'thread' +require "zstds" GUESSES = (ENV['GUESSES'] || 1000).to_i THREADS = (ENV['THREADS'] || 1).to_i @@ -14,19 +15,14 @@ json_string = File.read("./samples/#{sample_file_name}") queue = Queue.new -# queue = [] GUESSES.times { queue << json_string } -# stream = Zstd::StreamingCompress.new(thread_num: THREADS) THREADS.times { queue << nil } THREADS.times.map { Thread.new { while str = queue.pop - # stream = Zstd::StreamingCompress.new(thread_num: THREADS) - #stream << str - #stream << str - #stream << str - #stream.flush - Zstd.compress(str, thread_num: 1) + stream = Zstd::StreamingCompress.new + stream << str + stream.finish end } }.each(&:join) diff --git a/ext/zstdruby/common.h b/ext/zstdruby/common.h index a2169e2..a503878 100644 --- a/ext/zstdruby/common.h +++ b/ext/zstdruby/common.h @@ -43,12 +43,11 @@ static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_i static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VALUE kwargs) { - ID kwargs_keys[3]; + ID kwargs_keys[2]; kwargs_keys[0] = rb_intern("level"); kwargs_keys[1] = rb_intern("dict"); - kwargs_keys[2] = rb_intern("thread_num"); - VALUE kwargs_values[3]; - rb_get_kwargs(kwargs, kwargs_keys, 0, 3, kwargs_values); + VALUE kwargs_values[2]; + rb_get_kwargs(kwargs, kwargs_keys, 0, 2, kwargs_values); int compression_level = ZSTD_CLEVEL_DEFAULT; if (kwargs_values[0] != Qundef && kwargs_values[0] != Qnil) { @@ -68,15 +67,6 @@ static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VAL rb_raise(rb_eRuntimeError, "%s", "ZSTD_CCtx_loadDictionary failed"); } } - - if (kwargs_values[2] != Qundef && kwargs_values[2] != Qnil) { - int thread_num = NUM2INT(kwargs_values[2]); - size_t const r = ZSTD_CCtx_setParameter(ctx, ZSTD_c_nbWorkers, thread_num); - if (ZSTD_isError(r)) { - rb_warn("Note: the linked libzstd library doesn't support multithreading.Reverting to single-thread mode. \n"); - } - // ZSTD_CCtx_setParameter(ctx, ZSTD_c_jobSize, thread_num); - } } static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs) diff --git a/ext/zstdruby/extconf.rb b/ext/zstdruby/extconf.rb index c470bfe..f153b7e 100644 --- a/ext/zstdruby/extconf.rb +++ b/ext/zstdruby/extconf.rb @@ -2,7 +2,7 @@ have_func('rb_gc_mark_movable') -$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY -DZSTD_MULTITHREAD -pthread' +$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY -DZSTD_MULTITHREAD -pthread -DDEBUGLEVEL=0' $CPPFLAGS += " -fdeclspec" if CONFIG['CXX'] =~ /clang/ Dir.chdir File.expand_path('..', __FILE__) do From 3be703a8c1afed4f1d2f7ba629abf266534d4a5f Mon Sep 17 00:00:00 2001 From: SpringMT Date: Sat, 13 Apr 2024 22:49:53 +0900 Subject: [PATCH 3/8] feat: decompress reactoring --- ext/zstdruby/common.h | 27 ++++++++++++- ext/zstdruby/streaming_decompress.c | 2 +- ext/zstdruby/zstdruby.c | 63 +++++++++++++---------------- 3 files changed, 54 insertions(+), 38 deletions(-) diff --git a/ext/zstdruby/common.h b/ext/zstdruby/common.h index a503878..9574191 100644 --- a/ext/zstdruby/common.h +++ b/ext/zstdruby/common.h @@ -34,7 +34,7 @@ static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_i { #ifdef HAVE_RUBY_THREAD_H struct compress_params params = { ctx, output, input, endOp }; - rb_thread_call_without_gvl(compress_wrapper, ¶ms, RUBY_UBF_IO, NULL); + rb_thread_call_without_gvl(compress_wrapper, ¶ms, NULL, NULL); return params.ret; #else return ZSTD_compressStream2(ctx, output, input, endOp); @@ -69,6 +69,31 @@ static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VAL } } +struct decompress_params { + ZSTD_DCtx* dctx; + ZSTD_outBuffer* output; + ZSTD_inBuffer* input; + size_t ret; +}; + +static void* decompress_wrapper(void* args) +{ + struct decompress_params* params = args; + params->ret = ZSTD_decompressStream(params->dctx, params->output, params->input); + return NULL; +} + +static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +{ +#ifdef HAVE_RUBY_THREAD_H + struct decompress_params params = { dctx, output, input }; + rb_thread_call_without_gvl(decompress_wrapper, ¶ms, NULL, NULL); + return params.ret; +#else + return ZSTD_decompressStream(dctx, output, input); +#endif +} + static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs) { ID kwargs_keys[1]; diff --git a/ext/zstdruby/streaming_decompress.c b/ext/zstdruby/streaming_decompress.c index 6152d21..6f696cd 100644 --- a/ext/zstdruby/streaming_decompress.c +++ b/ext/zstdruby/streaming_decompress.c @@ -104,7 +104,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src) VALUE result = rb_str_new(0, 0); while (input.pos < input.size) { ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 }; - size_t const ret = ZSTD_decompressStream(sd->dctx, &output, &input); + size_t const ret = zstd_decompress(sd->dctx, &output, &input); if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret)); } diff --git a/ext/zstdruby/zstdruby.c b/ext/zstdruby/zstdruby.c index ba2fcd6..3ba1fd0 100644 --- a/ext/zstdruby/zstdruby.c +++ b/ext/zstdruby/zstdruby.c @@ -26,6 +26,7 @@ static VALUE rb_compress(int argc, VALUE *argv, VALUE self) char* input_data = RSTRING_PTR(input_value); size_t input_size = RSTRING_LEN(input_value); ZSTD_inBuffer input = { input_data, input_size, 0 }; + // ZSTD_compressBound causes SEGV under multi-thread size_t max_compressed_size = ZSTD_compressBound(input_size); VALUE buf = rb_str_new(NULL, max_compressed_size); char* output_data = RSTRING_PTR(buf); @@ -87,19 +88,8 @@ static VALUE rb_compress_using_dict(int argc, VALUE *argv, VALUE self) } -static VALUE decompress_buffered(const char* input_data, size_t input_size) +static VALUE decompress_buffered(ZSTD_DCtx* dctx, const char* input_data, size_t input_size) { - ZSTD_DStream* const dstream = ZSTD_createDStream(); - if (dstream == NULL) { - rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDStream failed"); - } - - size_t initResult = ZSTD_initDStream(dstream); - if (ZSTD_isError(initResult)) { - ZSTD_freeDStream(dstream); - rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_initDStream failed", ZSTD_getErrorName(initResult)); - } - VALUE output_string = rb_str_new(NULL, 0); ZSTD_outBuffer output = { NULL, 0, 0 }; @@ -109,15 +99,14 @@ static VALUE decompress_buffered(const char* input_data, size_t input_size) rb_str_resize(output_string, output.size); output.dst = RSTRING_PTR(output_string); - size_t readHint = ZSTD_decompressStream(dstream, &output, &input); - if (ZSTD_isError(readHint)) { - ZSTD_freeDStream(dstream); - rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(readHint)); + size_t ret = ZSTD_decompressStream(dctx, &output, &input); + if (ZSTD_isError(ret)) { + ZSTD_freeDCtx(dctx); + rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(ret)); } } - - ZSTD_freeDStream(dstream); rb_str_resize(output_string, output.pos); + ZSTD_freeDCtx(dctx); return output_string; } @@ -129,6 +118,11 @@ static VALUE rb_decompress(int argc, VALUE *argv, VALUE self) StringValue(input_value); char* input_data = RSTRING_PTR(input_value); size_t input_size = RSTRING_LEN(input_value); + ZSTD_DCtx* const dctx = ZSTD_createDCtx(); + if (dctx == NULL) { + rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDCtx failed"); + } + set_decompress_params(dctx, kwargs); unsigned long long const uncompressed_size = ZSTD_getFrameContentSize(input_data, input_size); if (uncompressed_size == ZSTD_CONTENTSIZE_ERROR) { @@ -137,15 +131,9 @@ static VALUE rb_decompress(int argc, VALUE *argv, VALUE self) // ZSTD_decompressStream may be called multiple times when ZSTD_CONTENTSIZE_UNKNOWN, causing slowness. // Therefore, we will not standardize on ZSTD_decompressStream if (uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) { - return decompress_buffered(input_data, input_size); + return decompress_buffered(dctx, input_data, input_size); } - ZSTD_DCtx* const dctx = ZSTD_createDCtx(); - if (dctx == NULL) { - rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDCtx failed"); - } - set_decompress_params(dctx, kwargs); - VALUE output = rb_str_new(NULL, uncompressed_size); char* output_data = RSTRING_PTR(output); @@ -167,15 +155,6 @@ static VALUE rb_decompress_using_dict(int argc, VALUE *argv, VALUE self) StringValue(input_value); char* input_data = RSTRING_PTR(input_value); size_t input_size = RSTRING_LEN(input_value); - unsigned long long const uncompressed_size = ZSTD_getFrameContentSize(input_data, input_size); - if (uncompressed_size == ZSTD_CONTENTSIZE_ERROR) { - rb_raise(rb_eRuntimeError, "%s: %s", "not compressed by zstd", ZSTD_getErrorName(uncompressed_size)); - } - if (uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) { - return decompress_buffered(input_data, input_size); - } - VALUE output = rb_str_new(NULL, uncompressed_size); - char* output_data = RSTRING_PTR(output); char* dict_buffer = RSTRING_PTR(dict); size_t dict_size = RSTRING_LEN(dict); @@ -183,12 +162,11 @@ static VALUE rb_decompress_using_dict(int argc, VALUE *argv, VALUE self) if (ddict == NULL) { rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDDict failed"); } - unsigned const expected_dict_id = ZSTD_getDictID_fromDDict(ddict); unsigned const actual_dict_id = ZSTD_getDictID_fromFrame(input_data, input_size); if (expected_dict_id != actual_dict_id) { ZSTD_freeDDict(ddict); - rb_raise(rb_eRuntimeError, "%s: %s", "DictID mismatch", ZSTD_getErrorName(uncompressed_size)); + rb_raise(rb_eRuntimeError, "DictID mismatch"); } ZSTD_DCtx* const ctx = ZSTD_createDCtx(); @@ -196,6 +174,19 @@ static VALUE rb_decompress_using_dict(int argc, VALUE *argv, VALUE self) ZSTD_freeDDict(ddict); rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDCtx failed"); } + + unsigned long long const uncompressed_size = ZSTD_getFrameContentSize(input_data, input_size); + if (uncompressed_size == ZSTD_CONTENTSIZE_ERROR) { + ZSTD_freeDDict(ddict); + ZSTD_freeDCtx(ctx); + rb_raise(rb_eRuntimeError, "%s: %s", "not compressed by zstd", ZSTD_getErrorName(uncompressed_size)); + } + if (uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) { + return decompress_buffered(ctx, input_data, input_size); + } + + VALUE output = rb_str_new(NULL, uncompressed_size); + char* output_data = RSTRING_PTR(output); size_t const decompress_size = ZSTD_decompress_usingDDict(ctx, output_data, uncompressed_size, input_data, input_size, ddict); if (ZSTD_isError(decompress_size)) { ZSTD_freeDDict(ddict); From e754f091d5e712582b72f9994245fa79dcff50ee Mon Sep 17 00:00:00 2001 From: SpringMT Date: Sat, 13 Apr 2024 22:53:50 +0900 Subject: [PATCH 4/8] feat: unlock GVL for decompress --- ext/zstdruby/common.h | 70 ++++++++++++++++++++--------------------- ext/zstdruby/zstdruby.c | 2 +- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/ext/zstdruby/common.h b/ext/zstdruby/common.h index 9574191..813c0f3 100644 --- a/ext/zstdruby/common.h +++ b/ext/zstdruby/common.h @@ -15,32 +15,6 @@ static int convert_compression_level(VALUE compression_level_value) return NUM2INT(compression_level_value); } -struct compress_params { - ZSTD_CCtx* ctx; - ZSTD_outBuffer* output; - ZSTD_inBuffer* input; - ZSTD_EndDirective endOp; - size_t ret; -}; - -static void* compress_wrapper(void* args) -{ - struct compress_params* params = args; - params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp); - return NULL; -} - -static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) -{ -#ifdef HAVE_RUBY_THREAD_H - struct compress_params params = { ctx, output, input, endOp }; - rb_thread_call_without_gvl(compress_wrapper, ¶ms, NULL, NULL); - return params.ret; -#else - return ZSTD_compressStream2(ctx, output, input, endOp); -#endif -} - static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VALUE kwargs) { ID kwargs_keys[2]; @@ -69,28 +43,29 @@ static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VAL } } -struct decompress_params { - ZSTD_DCtx* dctx; +struct compress_params { + ZSTD_CCtx* ctx; ZSTD_outBuffer* output; ZSTD_inBuffer* input; + ZSTD_EndDirective endOp; size_t ret; }; -static void* decompress_wrapper(void* args) +static void* compress_wrapper(void* args) { - struct decompress_params* params = args; - params->ret = ZSTD_decompressStream(params->dctx, params->output, params->input); + struct compress_params* params = args; + params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp); return NULL; } -static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { #ifdef HAVE_RUBY_THREAD_H - struct decompress_params params = { dctx, output, input }; - rb_thread_call_without_gvl(decompress_wrapper, ¶ms, NULL, NULL); + struct compress_params params = { ctx, output, input, endOp }; + rb_thread_call_without_gvl(compress_wrapper, ¶ms, NULL, NULL); return params.ret; #else - return ZSTD_decompressStream(dctx, output, input); + return ZSTD_compressStream2(ctx, output, input, endOp); #endif } @@ -112,4 +87,29 @@ static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs) } } +struct decompress_params { + ZSTD_DCtx* dctx; + ZSTD_outBuffer* output; + ZSTD_inBuffer* input; + size_t ret; +}; + +static void* decompress_wrapper(void* args) +{ + struct decompress_params* params = args; + params->ret = ZSTD_decompressStream(params->dctx, params->output, params->input); + return NULL; +} + +static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +{ +#ifdef HAVE_RUBY_THREAD_H + struct decompress_params params = { dctx, output, input }; + rb_thread_call_without_gvl(decompress_wrapper, ¶ms, NULL, NULL); + return params.ret; +#else + return ZSTD_decompressStream(dctx, output, input); +#endif +} + #endif /* ZSTD_RUBY_H */ diff --git a/ext/zstdruby/zstdruby.c b/ext/zstdruby/zstdruby.c index 3ba1fd0..8f17f4d 100644 --- a/ext/zstdruby/zstdruby.c +++ b/ext/zstdruby/zstdruby.c @@ -99,7 +99,7 @@ static VALUE decompress_buffered(ZSTD_DCtx* dctx, const char* input_data, size_t rb_str_resize(output_string, output.size); output.dst = RSTRING_PTR(output_string); - size_t ret = ZSTD_decompressStream(dctx, &output, &input); + size_t ret = zstd_decompress(dctx, &output, &input); if (ZSTD_isError(ret)) { ZSTD_freeDCtx(dctx); rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(ret)); From 3bb926278097bcfb47e86b621921b412db8b7207 Mon Sep 17 00:00:00 2001 From: SpringMT Date: Sat, 13 Apr 2024 23:12:13 +0900 Subject: [PATCH 5/8] fix: add gvl flag --- ext/zstdruby/common.h | 24 ++++++++++++++++-------- ext/zstdruby/streaming_compress.c | 6 +++--- ext/zstdruby/streaming_decompress.c | 2 +- ext/zstdruby/zstdruby.c | 4 ++-- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/ext/zstdruby/common.h b/ext/zstdruby/common.h index 813c0f3..de0d0d1 100644 --- a/ext/zstdruby/common.h +++ b/ext/zstdruby/common.h @@ -58,12 +58,16 @@ static void* compress_wrapper(void* args) return NULL; } -static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) +static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp, bool gvl) { #ifdef HAVE_RUBY_THREAD_H - struct compress_params params = { ctx, output, input, endOp }; - rb_thread_call_without_gvl(compress_wrapper, ¶ms, NULL, NULL); - return params.ret; + if (gvl) { + return ZSTD_compressStream2(ctx, output, input, endOp); + } else { + struct compress_params params = { ctx, output, input, endOp }; + rb_thread_call_without_gvl(compress_wrapper, ¶ms, NULL, NULL); + return params.ret; + } #else return ZSTD_compressStream2(ctx, output, input, endOp); #endif @@ -101,12 +105,16 @@ static void* decompress_wrapper(void* args) return NULL; } -static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, bool gvl) { #ifdef HAVE_RUBY_THREAD_H - struct decompress_params params = { dctx, output, input }; - rb_thread_call_without_gvl(decompress_wrapper, ¶ms, NULL, NULL); - return params.ret; + if (gvl) { + return ZSTD_decompressStream(dctx, output, input); + } else { + struct decompress_params params = { dctx, output, input }; + rb_thread_call_without_gvl(decompress_wrapper, ¶ms, NULL, NULL); + return params.ret; + } #else return ZSTD_decompressStream(dctx, output, input); #endif diff --git a/ext/zstdruby/streaming_compress.c b/ext/zstdruby/streaming_compress.c index d6ca1cb..6628510 100644 --- a/ext/zstdruby/streaming_compress.c +++ b/ext/zstdruby/streaming_compress.c @@ -106,7 +106,7 @@ no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp) do { ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 }; - size_t const ret = zstd_compress(sc->ctx, &output, &input, endOp); + size_t const ret = zstd_compress(sc->ctx, &output, &input, endOp, false); if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret)); } @@ -130,7 +130,7 @@ rb_streaming_compress_compress(VALUE obj, VALUE src) VALUE result = rb_str_new(0, 0); while (input.pos < input.size) { ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 }; - size_t const ret = zstd_compress(sc->ctx, &output, &input, ZSTD_e_continue); + size_t const ret = zstd_compress(sc->ctx, &output, &input, ZSTD_e_continue, false); if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret)); } @@ -157,7 +157,7 @@ rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj) while (input.pos < input.size) { ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 }; - size_t const ret = zstd_compress(sc->ctx, &output, &input, ZSTD_e_continue); + size_t const ret = zstd_compress(sc->ctx, &output, &input, ZSTD_e_continue, false); if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret)); } diff --git a/ext/zstdruby/streaming_decompress.c b/ext/zstdruby/streaming_decompress.c index 6f696cd..56ff472 100644 --- a/ext/zstdruby/streaming_decompress.c +++ b/ext/zstdruby/streaming_decompress.c @@ -104,7 +104,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src) VALUE result = rb_str_new(0, 0); while (input.pos < input.size) { ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 }; - size_t const ret = zstd_decompress(sd->dctx, &output, &input); + size_t const ret = zstd_decompress(sd->dctx, &output, &input, false); if (ZSTD_isError(ret)) { rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret)); } diff --git a/ext/zstdruby/zstdruby.c b/ext/zstdruby/zstdruby.c index 8f17f4d..e43d266 100644 --- a/ext/zstdruby/zstdruby.c +++ b/ext/zstdruby/zstdruby.c @@ -32,7 +32,7 @@ static VALUE rb_compress(int argc, VALUE *argv, VALUE self) char* output_data = RSTRING_PTR(buf); ZSTD_outBuffer output = { (void*)output_data, max_compressed_size, 0 }; - size_t const ret = zstd_compress(ctx, &output, &input, ZSTD_e_end); + size_t const ret = zstd_compress(ctx, &output, &input, ZSTD_e_end, true); if (ZSTD_isError(ret)) { ZSTD_freeCCtx(ctx); rb_raise(rb_eRuntimeError, "%s: %s", "compress failed", ZSTD_getErrorName(ret)); @@ -99,7 +99,7 @@ static VALUE decompress_buffered(ZSTD_DCtx* dctx, const char* input_data, size_t rb_str_resize(output_string, output.size); output.dst = RSTRING_PTR(output_string); - size_t ret = zstd_decompress(dctx, &output, &input); + size_t ret = zstd_decompress(dctx, &output, &input, true); if (ZSTD_isError(ret)) { ZSTD_freeDCtx(dctx); rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(ret)); From 5d55741e0fa1a65c6d934924cb2ef909230cf84b Mon Sep 17 00:00:00 2001 From: SpringMT Date: Sat, 13 Apr 2024 23:20:47 +0900 Subject: [PATCH 6/8] feat: add stdbool.h --- ext/zstdruby/common.h | 1 + 1 file changed, 1 insertion(+) diff --git a/ext/zstdruby/common.h b/ext/zstdruby/common.h index de0d0d1..5971242 100644 --- a/ext/zstdruby/common.h +++ b/ext/zstdruby/common.h @@ -5,6 +5,7 @@ #ifdef HAVE_RUBY_THREAD_H #include #endif +#include #include "./libzstd/zstd.h" static int convert_compression_level(VALUE compression_level_value) From 82780e29e77cc0c873302da04e63d1d962cbb05f Mon Sep 17 00:00:00 2001 From: SpringMT Date: Sat, 13 Apr 2024 23:26:54 +0900 Subject: [PATCH 7/8] feat: gtihub actions run only PR --- .github/workflows/ruby.yml | 13 +++++++-- ...rb => multi_thread_streaming_comporess.rb} | 5 ++-- .../multi_thread_streaming_decomporess.rb | 28 +++++++++++++++++++ 3 files changed, 42 insertions(+), 4 deletions(-) rename benchmarks/{multi_thread_comporess.rb => multi_thread_streaming_comporess.rb} (88%) create mode 100644 benchmarks/multi_thread_streaming_decomporess.rb diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml index 8438c89..439b6a9 100644 --- a/.github/workflows/ruby.yml +++ b/.github/workflows/ruby.yml @@ -7,11 +7,20 @@ name: Ruby -on: [push, pull_request] +on: + push: + branches: + - main + paths-ignore: + - 'README.md' + pull_request: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true jobs: test: - runs-on: ubuntu-latest strategy: matrix: diff --git a/benchmarks/multi_thread_comporess.rb b/benchmarks/multi_thread_streaming_comporess.rb similarity index 88% rename from benchmarks/multi_thread_comporess.rb rename to benchmarks/multi_thread_streaming_comporess.rb index 473f22a..8fbe93c 100644 --- a/benchmarks/multi_thread_comporess.rb +++ b/benchmarks/multi_thread_streaming_comporess.rb @@ -4,7 +4,6 @@ require 'objspace' require 'zstd-ruby' require 'thread' -require "zstds" GUESSES = (ENV['GUESSES'] || 1000).to_i THREADS = (ENV['THREADS'] || 1).to_i @@ -22,7 +21,9 @@ while str = queue.pop stream = Zstd::StreamingCompress.new stream << str - stream.finish + res = stream.flush + stream << str + res << stream.finish end } }.each(&:join) diff --git a/benchmarks/multi_thread_streaming_decomporess.rb b/benchmarks/multi_thread_streaming_decomporess.rb new file mode 100644 index 0000000..2e516a2 --- /dev/null +++ b/benchmarks/multi_thread_streaming_decomporess.rb @@ -0,0 +1,28 @@ +require 'benchmark/ips' +$LOAD_PATH.unshift '../lib' +require 'json' +require 'objspace' +require 'zstd-ruby' +require 'thread' + +GUESSES = (ENV['GUESSES'] || 1000).to_i +THREADS = (ENV['THREADS'] || 1).to_i + +p GUESSES: GUESSES, THREADS: THREADS + +sample_file_name = ARGV[0] +json_string = File.read("./samples/#{sample_file_name}") +target = Zstd.compress(json_string) + +queue = Queue.new +GUESSES.times { queue << target } +THREADS.times { queue << nil } +THREADS.times.map { + Thread.new { + while str = queue.pop + stream = Zstd::StreamingDecompress.new + stream.decompress(str) + stream.decompress(str) + end + } +}.each(&:join) From 04d74fc816e0735a7cdbc645f29860ee5cd92cea Mon Sep 17 00:00:00 2001 From: SpringMT Date: Sun, 14 Apr 2024 00:00:52 +0900 Subject: [PATCH 8/8] fix: benchmark scripts --- benchmarks/multi_thread_streaming_comporess.rb | 3 --- benchmarks/multi_thread_streaming_decomporess.rb | 3 --- 2 files changed, 6 deletions(-) diff --git a/benchmarks/multi_thread_streaming_comporess.rb b/benchmarks/multi_thread_streaming_comporess.rb index 8fbe93c..fa59867 100644 --- a/benchmarks/multi_thread_streaming_comporess.rb +++ b/benchmarks/multi_thread_streaming_comporess.rb @@ -1,7 +1,4 @@ -require 'benchmark/ips' $LOAD_PATH.unshift '../lib' -require 'json' -require 'objspace' require 'zstd-ruby' require 'thread' diff --git a/benchmarks/multi_thread_streaming_decomporess.rb b/benchmarks/multi_thread_streaming_decomporess.rb index 2e516a2..e6dd9cf 100644 --- a/benchmarks/multi_thread_streaming_decomporess.rb +++ b/benchmarks/multi_thread_streaming_decomporess.rb @@ -1,7 +1,4 @@ -require 'benchmark/ips' $LOAD_PATH.unshift '../lib' -require 'json' -require 'objspace' require 'zstd-ruby' require 'thread'