Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better semantics for invoking #close. #66

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions fixtures/protocol/http/body/a_writable_body.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2023, by Samuel Williams.

require 'protocol/http/body/deflate'

module Protocol
module HTTP
module Body
AWritableBody = Sus::Shared("a writable body") do
it "can write and read data" do
3.times do |i|
body.write("Hello World #{i}")
expect(body.read).to be == "Hello World #{i}"
end
end

it "can buffer data in order" do
3.times do |i|
body.write("Hello World #{i}")
end

3.times do |i|
expect(body.read).to be == "Hello World #{i}"
end
end

with '#join' do
it "can join chunks" do
3.times do |i|
body.write("#{i}")
end

body.close

expect(body.join).to be == "012"
end
end

with '#each' do
it "can read all data in order" do
3.times do |i|
body.write("Hello World #{i}")
end

body.close

3.times do |i|
chunk = body.read
expect(chunk).to be == "Hello World #{i}"
end
end

# it "can propagate failures" do
# reactor.async do
# expect do
# body.each do |chunk|
# raise RuntimeError.new("It was too big!")
# end
# end.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# expect{
# body.write("Beep boop") # This will cause a failure.
# ::Async::Task.current.yield
# body.write("Beep boop") # This will fail.
# }.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# it "can propagate failures in nested bodies" do
# nested = ::Protocol::HTTP::Body::Deflate.for(body)

# reactor.async do
# expect do
# nested.each do |chunk|
# raise RuntimeError.new("It was too big!")
# end
# end.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# expect{
# body.write("Beep boop") # This will cause a failure.
# ::Async::Task.current.yield
# body.write("Beep boop") # This will fail.
# }.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# it "will stop after finishing" do
# output_task = reactor.async do
# body.each do |chunk|
# expect(chunk).to be == "Hello World!"
# end
# end

# body.write("Hello World!")
# body.close

# expect(body).not.to be(:empty?)

# ::Async::Task.current.yield

# expect(output_task).to be(:finished?)
# expect(body).to be(:empty?)
# end
end
end
end
end
end
9 changes: 0 additions & 9 deletions lib/protocol/http/body/completable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@ def rewind
false
end

def finish
super.tap do
if @callback
@callback.call
@callback = nil
end
end
end

def close(error = nil)
super.tap do
if @callback
Expand Down
6 changes: 0 additions & 6 deletions lib/protocol/http/body/deflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ def self.for(body, window_size = GZIP, level = DEFAULT_LEVEL)
self.new(body, Zlib::Deflate.new(level, window_size))
end

def stream?
# We might want to revisit this design choice.
# We could wrap the streaming body in a Deflate stream, but that would require an extra stream wrapper which we don't have right now. See also `Digestable#stream?`.
false
end

def read
return if @stream.finished?

Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/digestable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ def etag(weak: false)
end
end

def stream?
false
end

def read
if chunk = super
@digest.update(chunk)
Expand Down
12 changes: 8 additions & 4 deletions lib/protocol/http/body/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ def rewind
@remaining = @length
end

def stream?
false
end

def read
if @remaining > 0
amount = [@remaining, @block_size].min
Expand All @@ -72,6 +68,14 @@ def read
end
end

def stream?
true
end

def call(stream)
IO.copy_stream(@file, stream, @remaining)
end

def join
return "" if @remaining == 0

Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/inflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ def self.for(body, encoding = GZIP)
self.new(body, Zlib::Inflate.new(encoding))
end

def stream?
false
end

def read
return if @stream.finished?

Expand Down
84 changes: 51 additions & 33 deletions lib/protocol/http/body/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
module Protocol
module HTTP
module Body
# An interface for reading data from a body.
# Represents a readable input streams.
#
# Typically, you'd override `#read` to return chunks of data.
#
# I n general, you read chunks of data from a body until it is empty and returns `nil`. Upon reading `nil`, the body is considered consumed and should not be read from again.
#
# Reading can also fail, for example if the body represents a streaming upload, and the connection is lost. In this case, the body will raise some kind of error.
#
# If you don't want to read from a stream, and instead want to close it immediately, you can call `close` on the body. If the body is already completely consumed, `close` will do nothing, but if there is still data to be read, it will cause the underlying stream to be reset (and possibly closed).
class Readable
# Close the stream immediately.
def close(error = nil)
Expand All @@ -29,63 +35,46 @@ def ready?
false
end

# Whether the stream can be rewound using {rewind}.
def rewindable?
false
end

# Rewind the stream to the beginning.
# @returns [Boolean] Whether the stream was successfully rewound.
def rewind
false
end

# The total length of the body, if known.
# @returns [Integer | Nil] The total length of the body, or `nil` if the length is unknown.
def length
nil
end

# Read the next available chunk.
# @returns [String | Nil] The chunk of data, or `nil` if the stream has finished.
# @raises [StandardError] If an error occurs while reading.
def read
nil
end

# Should the internal mechanism prefer to use {call}?
# @returns [Boolean]
def stream?
false
end

# Write the body to the given stream.
def call(stream)
while chunk = self.read
stream.write(chunk)

# Flush the stream unless we are immediately expecting more data:
unless self.ready?
stream.flush
end
end
end

# Read all remaining chunks into a buffered body and close the underlying input.
# @returns [Buffered] The buffered body.
def finish
# Internally, this invokes `self.each` which then invokes `self.close`.
Buffered.read(self)
end

# Enumerate all chunks until finished, then invoke `#close`.
#
# Closes the stream when finished or if an error occurs.
#
# @yields {|chunk| ...} The block to call with each chunk of data.
# @parameter chunk [String | Nil] The chunk of data, or `nil` if the stream has finished.
def each
return to_enum(:each) unless block_given?
return to_enum unless block_given?

begin
while chunk = self.read
yield chunk
end
ensure
self.close($!)
while chunk = self.read
yield chunk
end
rescue => error
raise
ensure
self.close(error)
end

# Read all remaining chunks into a single binary string using `#each`.
Expand All @@ -105,6 +94,35 @@ def join
end
end

def stream?
false
end

# Write the body to the given stream.
#
# In some cases, the stream may also be readable, such as when hijacking an HTTP/1 connection. In that case, it may be acceptable to read and write to the stream directly.
#
# If the stream is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs.
#
def call(stream)
self.each do |chunk|
stream.write(chunk)

# Flush the stream unless we are immediately expecting more data:
unless self.ready?
stream.flush
end
end
end

# Read all remaining chunks into a buffered body and close the underlying input.
#
# @returns [Buffered] The buffered body.
def finish
# Internally, this invokes `self.each` which then invokes `self.close`.
Buffered.read(self)
end

def as_json(...)
{
class: self.class.name,
Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/rewindable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ def buffered
Buffered.new(@chunks)
end

def stream?
false
end

def read
if @index < @chunks.size
chunk = @chunks[@index]
Expand Down
Loading
Loading