From 5a098a355c6eccb814ba30d4062c1f36180489e6 Mon Sep 17 00:00:00 2001 From: Andrew Aladjev Date: Sat, 18 Jul 2020 00:51:28 +0300 Subject: [PATCH] refactored stream reader, cosmetics --- lib/zstds/stream/raw/abstract.rb | 2 +- lib/zstds/stream/raw/compressor.rb | 6 +- lib/zstds/stream/raw/decompressor.rb | 2 +- lib/zstds/stream/reader.rb | 105 ++++++++++++++++----------- lib/zstds/stream/writer.rb | 5 ++ test/stream/reader.test.rb | 92 ++++++++++++++++++++++- 6 files changed, 162 insertions(+), 50 deletions(-) diff --git a/lib/zstds/stream/raw/abstract.rb b/lib/zstds/stream/raw/abstract.rb index bbb3eb4..7ff382d 100644 --- a/lib/zstds/stream/raw/abstract.rb +++ b/lib/zstds/stream/raw/abstract.rb @@ -23,7 +23,7 @@ def flush(&writer) nil end - protected def flush_destination_buffer(&writer) + protected def more_destination(&writer) result_bytesize = write_result(&writer) raise NotEnoughDestinationError, "not enough destination" if result_bytesize.zero? end diff --git a/lib/zstds/stream/raw/compressor.rb b/lib/zstds/stream/raw/compressor.rb index a216442..97e3bf1 100644 --- a/lib/zstds/stream/raw/compressor.rb +++ b/lib/zstds/stream/raw/compressor.rb @@ -39,7 +39,7 @@ def write(source, &writer) if need_more_destination source = source.byteslice bytes_written, source.bytesize - bytes_written - flush_destination_buffer(&writer) + more_destination(&writer) next end @@ -65,7 +65,7 @@ def flush(&writer) need_more_destination = @native_stream.flush if need_more_destination - flush_destination_buffer(&writer) + more_destination(&writer) next end @@ -84,7 +84,7 @@ def close(&writer) need_more_destination = @native_stream.finish if need_more_destination - flush_destination_buffer(&writer) + more_destination(&writer) next end diff --git a/lib/zstds/stream/raw/decompressor.rb b/lib/zstds/stream/raw/decompressor.rb index 341ab3b..4fd5f9a 100644 --- a/lib/zstds/stream/raw/decompressor.rb +++ b/lib/zstds/stream/raw/decompressor.rb @@ -34,7 +34,7 @@ def read(source, &writer) if need_more_destination source = source.byteslice bytes_read, source.bytesize - bytes_read - flush_destination_buffer(&writer) + more_destination(&writer) next end diff --git a/lib/zstds/stream/reader.rb b/lib/zstds/stream/reader.rb index 8c50c5a..17b4d10 100644 --- a/lib/zstds/stream/reader.rb +++ b/lib/zstds/stream/reader.rb @@ -20,6 +20,7 @@ def initialize(source_io, options = {}, *args) initialize_source_buffer_length reset_io_remainder + reset_need_to_flush @lineno = 0 end @@ -42,57 +43,37 @@ def initialize(source_io, options = {}, *args) @io_remainder = ::String.new :encoding => ::Encoding::BINARY end + protected def reset_need_to_flush + @need_to_flush = false + end + # -- synchronous -- def read(bytes_to_read = nil, out_buffer = nil) Validation.validate_not_negative_integer bytes_to_read unless bytes_to_read.nil? Validation.validate_string out_buffer unless out_buffer.nil? - return ::String.new :encoding => ::Encoding::BINARY if !bytes_to_read.nil? && bytes_to_read.zero? - unless bytes_to_read.nil? + return ::String.new :encoding => ::Encoding::BINARY if bytes_to_read.zero? return nil if eof? - read_more_to_buffer until @buffer.bytesize >= bytes_to_read || @io.eof? + append_io_data @io.read(@source_buffer_length) while @buffer.bytesize < bytes_to_read && !@io.eof? + flush_io_data if @buffer.bytesize < bytes_to_read return read_bytes_from_buffer bytes_to_read, out_buffer end - read_more_to_buffer until @io.eof? - - result = @buffer - reset_buffer - @pos += result.bytesize + append_io_data @io.read(@source_buffer_length) until @io.eof? + flush_io_data - result.force_encoding @external_encoding unless @external_encoding.nil? - result = transcode_to_internal result - result = out_buffer.replace result unless out_buffer.nil? - - result - end - - protected def read_more_to_buffer - io_data = @io.read @source_buffer_length - append_io_data_to_buffer io_data - end - - def readpartial(bytes_to_read = nil, out_buffer = nil) - raise ::EOFError if eof? - - readpartial_to_buffer until @buffer.bytesize >= bytes_to_read || @io.eof? - - read_bytes_from_buffer bytes_to_read, out_buffer - end - - protected def readpartial_to_buffer - io_data = @io.readpartial @source_buffer_length - append_io_data_to_buffer io_data + read_buffer out_buffer end def rewind raw_wrapper :close reset_io_remainder + reset_need_to_flush super end @@ -104,32 +85,60 @@ def close end def eof? - @buffer.bytesize.zero? && @io.eof? + empty? && @io.eof? end # -- asynchronous -- - def read_nonblock(bytes_to_read, out_buffer = nil, *options) - read_more_to_buffer_nonblock(*options) until @buffer.bytesize >= bytes_to_read || @io.eof? + def readpartial(bytes_to_read, out_buffer = nil) + read_more_nonblock(bytes_to_read, out_buffer) { @io.readpartial @source_buffer_length } + end - read_bytes_from_buffer bytes_to_read, out_buffer + def read_nonblock(bytes_to_read, out_buffer = nil, *options) + read_more_nonblock(bytes_to_read, out_buffer) { @io.read_nonblock(@source_buffer_length, *options) } end - protected def read_more_to_buffer_nonblock(*options) - io_data = @io.read_nonblock @source_buffer_length, *options - append_io_data_to_buffer io_data + protected def read_more_nonblock(bytes_to_read, out_buffer, &_block) + Validation.validate_not_negative_integer bytes_to_read + Validation.validate_string out_buffer unless out_buffer.nil? + + return ::String.new :encoding => ::Encoding::BINARY if bytes_to_read.zero? + + io_provided_eof_error = false + + if @buffer.bytesize < bytes_to_read + begin + append_io_data yield + rescue ::EOFError + io_provided_eof_error = true + end + end + + flush_io_data if @buffer.bytesize < bytes_to_read + raise ::EOFError if empty? && io_provided_eof_error + + read_bytes_from_buffer bytes_to_read, out_buffer end # -- common -- - protected def append_io_data_to_buffer(io_data) + protected def append_io_data(io_data) io_portion = @io_remainder + io_data bytes_read = raw_wrapper :read, io_portion @io_remainder = io_portion.byteslice bytes_read, io_portion.bytesize - bytes_read - # We should just ignore case when "io.eof?" appears but "io_remainder" is not empty. - # Ancient compress implementations can write bytes from not initialized buffer parts to output. - raw_wrapper :flush if @io.eof? + # Even empty io data may require flush. + @need_to_flush = true + end + + protected def flush_io_data + raw_wrapper :flush + + @need_to_flush = false + end + + protected def empty? + !@need_to_flush && @buffer.bytesize.zero? end protected def read_bytes_from_buffer(bytes_to_read, out_buffer) @@ -144,6 +153,18 @@ def read_nonblock(bytes_to_read, out_buffer = nil, *options) result end + protected def read_buffer(out_buffer) + result = @buffer + reset_buffer + @pos += result.bytesize + + result.force_encoding @external_encoding unless @external_encoding.nil? + result = transcode_to_internal result + + result = out_buffer.replace result unless out_buffer.nil? + result + end + protected def transcode_to_internal(data) data = data.encode @internal_encoding, **@transcode_options unless @internal_encoding.nil? data diff --git a/lib/zstds/stream/writer.rb b/lib/zstds/stream/writer.rb index 717691e..c253140 100644 --- a/lib/zstds/stream/writer.rb +++ b/lib/zstds/stream/writer.rb @@ -77,6 +77,11 @@ def close # -- asynchronous -- + # IO write nonblock can raise wait writable error. + # After resolving this error user may provide same content again. + # It is not possible to revert accepted content after error. + # So we have to accept content after processing IO write nonblock. + # It means that first write nonblock won't call IO write nonblock. def write_nonblock(object, *options) return 0 unless write_remaining_buffer_nonblock(*options) diff --git a/test/stream/reader.test.rb b/test/stream/reader.test.rb index 499c09a..d2f95ed 100644 --- a/test/stream/reader.test.rb +++ b/test/stream/reader.test.rb @@ -62,8 +62,8 @@ def test_invalid_read end end - corrupted_compressed_io = ::StringIO.new String.compress("1111").reverse - instance = target.new corrupted_compressed_io + corrupted_compressed_text = String.compress("1111").reverse + instance = target.new ::StringIO.new(corrupted_compressed_text) assert_raises DecompressorCorruptedSourceError do instance.read @@ -87,6 +87,8 @@ def test_read assert_equal result, "" loop do + prev_eof = instance.eof? + result = if with_buffer instance.read portion_length, prev_result @@ -94,7 +96,12 @@ def test_read instance.read portion_length end - break if result.nil? + if result.nil? + assert instance.eof? + break + end + + refute prev_eof unless archive.bytesize.zero? assert_equal result, prev_result if with_buffer decompressed_text << result @@ -118,6 +125,8 @@ def test_read decompressed_text = nil begin + prev_eof = instance.eof? + if with_buffer decompressed_text = instance.read nil, prev_result assert_equal decompressed_text, prev_result @@ -125,6 +134,9 @@ def test_read decompressed_text = instance.read end + assert instance.eof? + refute prev_eof unless archive.bytesize.zero? + assert_equal instance.pos, decompressed_text.bytesize assert_equal instance.pos, instance.tell ensure @@ -246,9 +258,69 @@ def test_rewind end end + def test_eof + compressed_text = String.compress "ab" + instance = target.new ::StringIO.new(compressed_text) + + refute instance.eof? + + byte = instance.read 1 + refute instance.eof? + assert_equal byte, "a" + + byte = instance.read 1 + assert instance.eof? + assert_equal byte, "b" + end + # -- asynchronous -- + def test_invalid_readpartial_and_read_nonblock + instance = target.new ::StringIO.new + + Validation::INVALID_NOT_NEGATIVE_INTEGERS.each do |invalid_integer| + assert_raises ValidateError do + instance.readpartial invalid_integer + end + assert_raises ValidateError do + instance.read_nonblock invalid_integer + end + end + + (Validation::INVALID_STRINGS - [nil]).each do |invalid_string| + assert_raises ValidateError do + instance.readpartial 0, invalid_string + end + assert_raises ValidateError do + instance.read_nonblock 0, invalid_string + end + end + + corrupted_compressed_text = String.compress("1111").reverse + + instance = target.new ::StringIO.new(corrupted_compressed_text) + + assert_raises DecompressorCorruptedSourceError do + instance.readpartial 1 + end + + instance = target.new ::StringIO.new(corrupted_compressed_text) + + assert_raises DecompressorCorruptedSourceError do + instance.read_nonblock 1 + end + end + def test_readpartial + IO.pipe do |read_io, write_io| + instance = target.new read_io + write_io.close + + assert_raises ::EOFError do + instance.readpartial 1 + end + end + start_server do |server| TEXTS.each do |text| PORTION_LENGTHS.each do |portion_length| @@ -283,6 +355,20 @@ def test_readpartial end def test_read_nonblock + IO.pipe do |read_io, write_io| + instance = target.new read_io + + assert_raises ::IO::WaitReadable do + instance.read_nonblock 1 + end + + write_io.close + + assert_raises ::EOFError do + instance.read_nonblock 1 + end + end + start_server do |server| TEXTS.each do |text| PORTION_LENGTHS.each do |portion_length|