Skip to content

Commit

Permalink
refactored stream reader, cosmetics
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew-aladev committed Jul 17, 2020
1 parent 8ad5c3a commit 5a098a3
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 50 deletions.
2 changes: 1 addition & 1 deletion lib/zstds/stream/raw/abstract.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def flush(&writer)
nil
end

protected def flush_destination_buffer(&writer)
protected def more_destination(&writer)
result_bytesize = write_result(&writer)
raise NotEnoughDestinationError, "not enough destination" if result_bytesize.zero?
end
Expand Down
6 changes: 3 additions & 3 deletions lib/zstds/stream/raw/compressor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def write(source, &writer)

if need_more_destination
source = source.byteslice bytes_written, source.bytesize - bytes_written
flush_destination_buffer(&writer)
more_destination(&writer)
next
end

Expand All @@ -65,7 +65,7 @@ def flush(&writer)
need_more_destination = @native_stream.flush

if need_more_destination
flush_destination_buffer(&writer)
more_destination(&writer)
next
end

Expand All @@ -84,7 +84,7 @@ def close(&writer)
need_more_destination = @native_stream.finish

if need_more_destination
flush_destination_buffer(&writer)
more_destination(&writer)
next
end

Expand Down
2 changes: 1 addition & 1 deletion lib/zstds/stream/raw/decompressor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def read(source, &writer)

if need_more_destination
source = source.byteslice bytes_read, source.bytesize - bytes_read
flush_destination_buffer(&writer)
more_destination(&writer)
next
end

Expand Down
105 changes: 63 additions & 42 deletions lib/zstds/stream/reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def initialize(source_io, options = {}, *args)

initialize_source_buffer_length
reset_io_remainder
reset_need_to_flush

@lineno = 0
end
Expand All @@ -42,57 +43,37 @@ def initialize(source_io, options = {}, *args)
@io_remainder = ::String.new :encoding => ::Encoding::BINARY
end

protected def reset_need_to_flush
@need_to_flush = false
end

# -- synchronous --

def read(bytes_to_read = nil, out_buffer = nil)
Validation.validate_not_negative_integer bytes_to_read unless bytes_to_read.nil?
Validation.validate_string out_buffer unless out_buffer.nil?

return ::String.new :encoding => ::Encoding::BINARY if !bytes_to_read.nil? && bytes_to_read.zero?

unless bytes_to_read.nil?
return ::String.new :encoding => ::Encoding::BINARY if bytes_to_read.zero?
return nil if eof?

read_more_to_buffer until @buffer.bytesize >= bytes_to_read || @io.eof?
append_io_data @io.read(@source_buffer_length) while @buffer.bytesize < bytes_to_read && !@io.eof?
flush_io_data if @buffer.bytesize < bytes_to_read

return read_bytes_from_buffer bytes_to_read, out_buffer
end

read_more_to_buffer until @io.eof?

result = @buffer
reset_buffer
@pos += result.bytesize
append_io_data @io.read(@source_buffer_length) until @io.eof?
flush_io_data

result.force_encoding @external_encoding unless @external_encoding.nil?
result = transcode_to_internal result
result = out_buffer.replace result unless out_buffer.nil?

result
end

protected def read_more_to_buffer
io_data = @io.read @source_buffer_length
append_io_data_to_buffer io_data
end

def readpartial(bytes_to_read = nil, out_buffer = nil)
raise ::EOFError if eof?

readpartial_to_buffer until @buffer.bytesize >= bytes_to_read || @io.eof?

read_bytes_from_buffer bytes_to_read, out_buffer
end

protected def readpartial_to_buffer
io_data = @io.readpartial @source_buffer_length
append_io_data_to_buffer io_data
read_buffer out_buffer
end

def rewind
raw_wrapper :close

reset_io_remainder
reset_need_to_flush

super
end
Expand All @@ -104,32 +85,60 @@ def close
end

def eof?
@buffer.bytesize.zero? && @io.eof?
empty? && @io.eof?
end

# -- asynchronous --

def read_nonblock(bytes_to_read, out_buffer = nil, *options)
read_more_to_buffer_nonblock(*options) until @buffer.bytesize >= bytes_to_read || @io.eof?
def readpartial(bytes_to_read, out_buffer = nil)
read_more_nonblock(bytes_to_read, out_buffer) { @io.readpartial @source_buffer_length }
end

read_bytes_from_buffer bytes_to_read, out_buffer
def read_nonblock(bytes_to_read, out_buffer = nil, *options)
read_more_nonblock(bytes_to_read, out_buffer) { @io.read_nonblock(@source_buffer_length, *options) }
end

protected def read_more_to_buffer_nonblock(*options)
io_data = @io.read_nonblock @source_buffer_length, *options
append_io_data_to_buffer io_data
protected def read_more_nonblock(bytes_to_read, out_buffer, &_block)
Validation.validate_not_negative_integer bytes_to_read
Validation.validate_string out_buffer unless out_buffer.nil?

return ::String.new :encoding => ::Encoding::BINARY if bytes_to_read.zero?

io_provided_eof_error = false

if @buffer.bytesize < bytes_to_read
begin
append_io_data yield
rescue ::EOFError
io_provided_eof_error = true
end
end

flush_io_data if @buffer.bytesize < bytes_to_read
raise ::EOFError if empty? && io_provided_eof_error

read_bytes_from_buffer bytes_to_read, out_buffer
end

# -- common --

protected def append_io_data_to_buffer(io_data)
protected def append_io_data(io_data)
io_portion = @io_remainder + io_data
bytes_read = raw_wrapper :read, io_portion
@io_remainder = io_portion.byteslice bytes_read, io_portion.bytesize - bytes_read

# We should just ignore case when "io.eof?" appears but "io_remainder" is not empty.
# Ancient compress implementations can write bytes from not initialized buffer parts to output.
raw_wrapper :flush if @io.eof?
# Even empty io data may require flush.
@need_to_flush = true
end

protected def flush_io_data
raw_wrapper :flush

@need_to_flush = false
end

protected def empty?
!@need_to_flush && @buffer.bytesize.zero?
end

protected def read_bytes_from_buffer(bytes_to_read, out_buffer)
Expand All @@ -144,6 +153,18 @@ def read_nonblock(bytes_to_read, out_buffer = nil, *options)
result
end

protected def read_buffer(out_buffer)
result = @buffer
reset_buffer
@pos += result.bytesize

result.force_encoding @external_encoding unless @external_encoding.nil?
result = transcode_to_internal result

result = out_buffer.replace result unless out_buffer.nil?
result
end

protected def transcode_to_internal(data)
data = data.encode @internal_encoding, **@transcode_options unless @internal_encoding.nil?
data
Expand Down
5 changes: 5 additions & 0 deletions lib/zstds/stream/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def close

# -- asynchronous --

# IO write nonblock can raise wait writable error.
# After resolving this error user may provide same content again.
# It is not possible to revert accepted content after error.
# So we have to accept content after processing IO write nonblock.
# It means that first write nonblock won't call IO write nonblock.
def write_nonblock(object, *options)
return 0 unless write_remaining_buffer_nonblock(*options)

Expand Down
92 changes: 89 additions & 3 deletions test/stream/reader.test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def test_invalid_read
end
end

corrupted_compressed_io = ::StringIO.new String.compress("1111").reverse
instance = target.new corrupted_compressed_io
corrupted_compressed_text = String.compress("1111").reverse
instance = target.new ::StringIO.new(corrupted_compressed_text)

assert_raises DecompressorCorruptedSourceError do
instance.read
Expand All @@ -87,14 +87,21 @@ def test_read
assert_equal result, ""

loop do
prev_eof = instance.eof?

result =
if with_buffer
instance.read portion_length, prev_result
else
instance.read portion_length
end

break if result.nil?
if result.nil?
assert instance.eof?
break
end

refute prev_eof unless archive.bytesize.zero?

assert_equal result, prev_result if with_buffer
decompressed_text << result
Expand All @@ -118,13 +125,18 @@ def test_read
decompressed_text = nil

begin
prev_eof = instance.eof?

if with_buffer
decompressed_text = instance.read nil, prev_result
assert_equal decompressed_text, prev_result
else
decompressed_text = instance.read
end

assert instance.eof?
refute prev_eof unless archive.bytesize.zero?

assert_equal instance.pos, decompressed_text.bytesize
assert_equal instance.pos, instance.tell
ensure
Expand Down Expand Up @@ -246,9 +258,69 @@ def test_rewind
end
end

def test_eof
compressed_text = String.compress "ab"
instance = target.new ::StringIO.new(compressed_text)

refute instance.eof?

byte = instance.read 1
refute instance.eof?
assert_equal byte, "a"

byte = instance.read 1
assert instance.eof?
assert_equal byte, "b"
end

# -- asynchronous --

def test_invalid_readpartial_and_read_nonblock
instance = target.new ::StringIO.new

Validation::INVALID_NOT_NEGATIVE_INTEGERS.each do |invalid_integer|
assert_raises ValidateError do
instance.readpartial invalid_integer
end
assert_raises ValidateError do
instance.read_nonblock invalid_integer
end
end

(Validation::INVALID_STRINGS - [nil]).each do |invalid_string|
assert_raises ValidateError do
instance.readpartial 0, invalid_string
end
assert_raises ValidateError do
instance.read_nonblock 0, invalid_string
end
end

corrupted_compressed_text = String.compress("1111").reverse

instance = target.new ::StringIO.new(corrupted_compressed_text)

assert_raises DecompressorCorruptedSourceError do
instance.readpartial 1
end

instance = target.new ::StringIO.new(corrupted_compressed_text)

assert_raises DecompressorCorruptedSourceError do
instance.read_nonblock 1
end
end

def test_readpartial
IO.pipe do |read_io, write_io|
instance = target.new read_io
write_io.close

assert_raises ::EOFError do
instance.readpartial 1
end
end

start_server do |server|
TEXTS.each do |text|
PORTION_LENGTHS.each do |portion_length|
Expand Down Expand Up @@ -283,6 +355,20 @@ def test_readpartial
end

def test_read_nonblock
IO.pipe do |read_io, write_io|
instance = target.new read_io

assert_raises ::IO::WaitReadable do
instance.read_nonblock 1
end

write_io.close

assert_raises ::EOFError do
instance.read_nonblock 1
end
end

start_server do |server|
TEXTS.each do |text|
PORTION_LENGTHS.each do |portion_length|
Expand Down

0 comments on commit 5a098a3

Please sign in to comment.