diff --git a/fixtures/protocol/http/body/a_writable_body.rb b/fixtures/protocol/http/body/a_writable_body.rb new file mode 100644 index 0000000..dc58deb --- /dev/null +++ b/fixtures/protocol/http/body/a_writable_body.rb @@ -0,0 +1,110 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2019-2023, by Samuel Williams. + +require 'protocol/http/body/deflate' + +module Protocol + module HTTP + module Body + AWritableBody = Sus::Shared("a writable body") do + it "can write and read data" do + 3.times do |i| + body.write("Hello World #{i}") + expect(body.read).to be == "Hello World #{i}" + end + end + + it "can buffer data in order" do + 3.times do |i| + body.write("Hello World #{i}") + end + + 3.times do |i| + expect(body.read).to be == "Hello World #{i}" + end + end + + with '#join' do + it "can join chunks" do + 3.times do |i| + body.write("#{i}") + end + + body.close + + expect(body.join).to be == "012" + end + end + + with '#each' do + it "can read all data in order" do + 3.times do |i| + body.write("Hello World #{i}") + end + + body.close + + 3.times do |i| + chunk = body.read + expect(chunk).to be == "Hello World #{i}" + end + end + + # it "can propagate failures" do + # reactor.async do + # expect do + # body.each do |chunk| + # raise RuntimeError.new("It was too big!") + # end + # end.to raise_exception(RuntimeError, message: be =~ /big/) + # end + + # expect{ + # body.write("Beep boop") # This will cause a failure. + # ::Async::Task.current.yield + # body.write("Beep boop") # This will fail. + # }.to raise_exception(RuntimeError, message: be =~ /big/) + # end + + # it "can propagate failures in nested bodies" do + # nested = ::Protocol::HTTP::Body::Deflate.for(body) + + # reactor.async do + # expect do + # nested.each do |chunk| + # raise RuntimeError.new("It was too big!") + # end + # end.to raise_exception(RuntimeError, message: be =~ /big/) + # end + + # expect{ + # body.write("Beep boop") # This will cause a failure. + # ::Async::Task.current.yield + # body.write("Beep boop") # This will fail. + # }.to raise_exception(RuntimeError, message: be =~ /big/) + # end + + # it "will stop after finishing" do + # output_task = reactor.async do + # body.each do |chunk| + # expect(chunk).to be == "Hello World!" + # end + # end + + # body.write("Hello World!") + # body.close + + # expect(body).not.to be(:empty?) + + # ::Async::Task.current.yield + + # expect(output_task).to be(:finished?) + # expect(body).to be(:empty?) + # end + end + end + end + end +end diff --git a/lib/protocol/http/body/completable.rb b/lib/protocol/http/body/completable.rb index cac31ac..e540805 100644 --- a/lib/protocol/http/body/completable.rb +++ b/lib/protocol/http/body/completable.rb @@ -32,15 +32,6 @@ def rewind false end - def finish - super.tap do - if @callback - @callback.call - @callback = nil - end - end - end - def close(error = nil) super.tap do if @callback diff --git a/lib/protocol/http/body/deflate.rb b/lib/protocol/http/body/deflate.rb index 74f7983..9c68472 100644 --- a/lib/protocol/http/body/deflate.rb +++ b/lib/protocol/http/body/deflate.rb @@ -62,12 +62,6 @@ def self.for(body, window_size = GZIP, level = DEFAULT_LEVEL) self.new(body, Zlib::Deflate.new(level, window_size)) end - def stream? - # We might want to revisit this design choice. - # We could wrap the streaming body in a Deflate stream, but that would require an extra stream wrapper which we don't have right now. See also `Digestable#stream?`. - false - end - def read return if @stream.finished? diff --git a/lib/protocol/http/body/digestable.rb b/lib/protocol/http/body/digestable.rb index df04d6a..f4ff966 100644 --- a/lib/protocol/http/body/digestable.rb +++ b/lib/protocol/http/body/digestable.rb @@ -38,10 +38,6 @@ def etag(weak: false) end end - def stream? - false - end - def read if chunk = super @digest.update(chunk) diff --git a/lib/protocol/http/body/file.rb b/lib/protocol/http/body/file.rb index 00b4ace..996463b 100644 --- a/lib/protocol/http/body/file.rb +++ b/lib/protocol/http/body/file.rb @@ -56,10 +56,6 @@ def rewind @remaining = @length end - def stream? - false - end - def read if @remaining > 0 amount = [@remaining, @block_size].min @@ -72,6 +68,14 @@ def read end end + def stream? + true + end + + def call(stream) + IO.copy_stream(@file, stream, @remaining) + end + def join return "" if @remaining == 0 diff --git a/lib/protocol/http/body/inflate.rb b/lib/protocol/http/body/inflate.rb index c29e55a..12d940d 100644 --- a/lib/protocol/http/body/inflate.rb +++ b/lib/protocol/http/body/inflate.rb @@ -15,10 +15,6 @@ def self.for(body, encoding = GZIP) self.new(body, Zlib::Inflate.new(encoding)) end - def stream? - false - end - def read return if @stream.finished? diff --git a/lib/protocol/http/body/readable.rb b/lib/protocol/http/body/readable.rb index 024115b..226c897 100644 --- a/lib/protocol/http/body/readable.rb +++ b/lib/protocol/http/body/readable.rb @@ -7,9 +7,15 @@ module Protocol module HTTP module Body - # An interface for reading data from a body. + # Represents a readable input streams. # # Typically, you'd override `#read` to return chunks of data. + # + # I n general, you read chunks of data from a body until it is empty and returns `nil`. Upon reading `nil`, the body is considered consumed and should not be read from again. + # + # Reading can also fail, for example if the body represents a streaming upload, and the connection is lost. In this case, the body will raise some kind of error. + # + # If you don't want to read from a stream, and instead want to close it immediately, you can call `close` on the body. If the body is already completely consumed, `close` will do nothing, but if there is still data to be read, it will cause the underlying stream to be reset (and possibly closed). class Readable # Close the stream immediately. def close(error = nil) @@ -29,63 +35,46 @@ def ready? false end + # Whether the stream can be rewound using {rewind}. def rewindable? false end + # Rewind the stream to the beginning. + # @returns [Boolean] Whether the stream was successfully rewound. def rewind false end + # The total length of the body, if known. + # @returns [Integer | Nil] The total length of the body, or `nil` if the length is unknown. def length nil end # Read the next available chunk. # @returns [String | Nil] The chunk of data, or `nil` if the stream has finished. + # @raises [StandardError] If an error occurs while reading. def read nil end - # Should the internal mechanism prefer to use {call}? - # @returns [Boolean] - def stream? - false - end - - # Write the body to the given stream. - def call(stream) - while chunk = self.read - stream.write(chunk) - - # Flush the stream unless we are immediately expecting more data: - unless self.ready? - stream.flush - end - end - end - - # Read all remaining chunks into a buffered body and close the underlying input. - # @returns [Buffered] The buffered body. - def finish - # Internally, this invokes `self.each` which then invokes `self.close`. - Buffered.read(self) - end - # Enumerate all chunks until finished, then invoke `#close`. # + # Closes the stream when finished or if an error occurs. + # # @yields {|chunk| ...} The block to call with each chunk of data. # @parameter chunk [String | Nil] The chunk of data, or `nil` if the stream has finished. def each - return to_enum(:each) unless block_given? + return to_enum unless block_given? - begin - while chunk = self.read - yield chunk - end - ensure - self.close($!) + while chunk = self.read + yield chunk end + rescue => error + raise + ensure + self.close(error) end # Read all remaining chunks into a single binary string using `#each`. @@ -105,6 +94,35 @@ def join end end + def stream? + false + end + + # Write the body to the given stream. + # + # In some cases, the stream may also be readable, such as when hijacking an HTTP/1 connection. In that case, it may be acceptable to read and write to the stream directly. + # + # If the stream is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs. + # + def call(stream) + self.each do |chunk| + stream.write(chunk) + + # Flush the stream unless we are immediately expecting more data: + unless self.ready? + stream.flush + end + end + end + + # Read all remaining chunks into a buffered body and close the underlying input. + # + # @returns [Buffered] The buffered body. + def finish + # Internally, this invokes `self.each` which then invokes `self.close`. + Buffered.read(self) + end + def as_json(...) { class: self.class.name, diff --git a/lib/protocol/http/body/rewindable.rb b/lib/protocol/http/body/rewindable.rb index 438a0f9..3436c2a 100644 --- a/lib/protocol/http/body/rewindable.rb +++ b/lib/protocol/http/body/rewindable.rb @@ -43,10 +43,6 @@ def buffered Buffered.new(@chunks) end - def stream? - false - end - def read if @index < @chunks.size chunk = @chunks[@index] diff --git a/lib/protocol/http/body/streamable.rb b/lib/protocol/http/body/streamable.rb new file mode 100644 index 0000000..9e8f641 --- /dev/null +++ b/lib/protocol/http/body/streamable.rb @@ -0,0 +1,113 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2022, by Samuel Williams. + +require_relative 'readable' +require_relative 'stream' + +module Protocol + module HTTP + module Body + # A body that invokes a block that can read and write to a stream. + # + # In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement `stream?` and return `true`. When `stream?` returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using `#each`. + # + # When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server. + class Streamable < Readable + class Closed < StandardError + end + + def initialize(block, input = nil) + @block = block + @input = input + @output = nil + end + + # Closing a stream indicates we are no longer interested in reading from it. + def close(error = nil) + if @input + @input.close + @input = nil + end + + if @output + @output.close(error) + end + end + + attr :block + + class Output + def initialize(input, block) + stream = Stream.new(input, self) + + @from = nil + + @fiber = Fiber.new do |from| + @from = from + block.call(stream) + rescue Closed + # Ignore. + ensure + @fiber = nil + + # No more chunks will be generated: + if from = @from + @from = nil + from.transfer(nil) + end + end + end + + # Can be invoked by the block to write to the stream. + def write(chunk) + if from = @from + @from = nil + @from = from.transfer(chunk) + else + raise RuntimeError, "Stream is not being read!" + end + end + + # Can be invoked by the block to close the stream. + def close(error = nil) + if from = @from + @from = nil + from.transfer(nil) + elsif @fiber + @fiber.raise(error || Closed) + end + end + + def read + raise RuntimeError, "Stream is already being read!" if @from + + @fiber&.transfer(Fiber.current) + end + end + + # Invokes the block in a fiber which yields chunks when they are available. + def read + @output ||= Output.new(@input, @block) + + return @output.read + end + + def stream? + true + end + + def call(stream) + raise "Streaming body has already been read!" if @output + + @block.call(stream) + rescue => error + raise + ensure + self.close(error) + end + end + end + end +end diff --git a/lib/protocol/http/body/wrapper.rb b/lib/protocol/http/body/wrapper.rb index 82bb252..7c4189b 100644 --- a/lib/protocol/http/body/wrapper.rb +++ b/lib/protocol/http/body/wrapper.rb @@ -27,15 +27,11 @@ def initialize(body) # The wrapped body. attr :body - # Buffer any remaining body. - def finish - @body.finish - end - def close(error = nil) @body.close(error) - super + # It's a no-op: + # super end def empty? @@ -77,14 +73,6 @@ def to_json(...) def inspect @body.inspect end - - def stream? - @body.stream? - end - - def call(stream) - @body.call(stream) - end end end end diff --git a/lib/protocol/http/body/writable.rb b/lib/protocol/http/body/writable.rb new file mode 100644 index 0000000..c9bf874 --- /dev/null +++ b/lib/protocol/http/body/writable.rb @@ -0,0 +1,103 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2018-2023, by Samuel Williams. + +require_relative 'readable' + +module Protocol + module HTTP + module Body + # A dynamic body which you can write to and read from. + class Writable < Readable + class Closed < StandardError + end + + # @param [Integer] length The length of the response body if known. + # @param [Async::Queue] queue Specify a different queue implementation, e.g. `Async::LimitedQueue.new(8)` to enable back-pressure streaming. + def initialize(length = nil, queue: Thread::Queue.new) + @queue = queue + + @length = length + + @count = 0 + + @finished = false + + @closed = false + @error = nil + end + + def length + @length + end + + # Stop generating output; cause the next call to write to fail with the given error. Does not prevent existing chunks from being read. In other words, this indicates both that no more data will be or should be written to the body. + def close(error = nil) + unless @closed + @queue.close + + @closed = true + @error = error + end + + super + end + + def closed? + @closed + end + + def ready? + !@queue.empty? || @queue.closed? + end + + # Has the producer called #finish and has the reader consumed the nil token? + def empty? + @queue.empty? && @queue.closed? + end + + # Read the next available chunk. + def read + @queue.pop + end + + # Write a single chunk to the body. Signal completion by calling `#finish`. + def write(chunk) + # If the reader breaks, the writer will break. + # The inverse of this is less obvious (*) + if @closed + raise(@error || Closed) + end + + @count += 1 + @queue.push(chunk) + end + + alias << write + + def inspect + "\#<#{self.class} #{@count} chunks written, #{status}>" + end + + private + + def status + if @queue.empty? + if @queue.closed? + 'closed' + else + 'waiting' + end + else + if @queue.closed? + 'closing' + else + 'ready' + end + end + end + end + end + end +end diff --git a/releases.md b/releases.md index 2c00a08..39e6113 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,10 @@ # Releases +## Unreleased + +- Clarify behaviour of streaming bodies and copy `Protocol::Rack::Body::Streaming` to `Protocol::HTTP::Body::Streamable`. +- Copy `Async::HTTP::Body::Writable` to `Protocol::HTTP::Body::Writable`. + ## v0.31.0 - Ensure chunks are flushed if required, when streaming. diff --git a/test/protocol/http/body/deflate.rb b/test/protocol/http/body/deflate.rb index 9f191bb..75e93f3 100644 --- a/test/protocol/http/body/deflate.rb +++ b/test/protocol/http/body/deflate.rb @@ -15,11 +15,6 @@ let(:compressed_body) {Protocol::HTTP::Body::Deflate.for(body)} let(:decompressed_body) {Protocol::HTTP::Body::Inflate.for(compressed_body)} - it "should not be a stream" do - expect(compressed_body).not.to be(:stream?) - expect(decompressed_body).not.to be(:stream?) - end - it "should round-trip data" do body.write("Hello World!") body.close diff --git a/test/protocol/http/body/digestable.rb b/test/protocol/http/body/digestable.rb index 65a03bc..046b3b6 100644 --- a/test/protocol/http/body/digestable.rb +++ b/test/protocol/http/body/digestable.rb @@ -10,10 +10,6 @@ let(:source) {Protocol::HTTP::Body::Buffered.new} let(:body) {subject.new(source)} - it "should not be a stream" do - expect(body).not.to be(:stream?) - end - with '.wrap' do let(:source) {Protocol::HTTP::Body::Buffered.wrap("HelloWorld")} let(:message) {Protocol::HTTP::Request.new(nil, nil, 'GET', '/', nil, Protocol::HTTP::Headers.new, body)} diff --git a/test/protocol/http/body/file.rb b/test/protocol/http/body/file.rb index 101d282..e54d090 100644 --- a/test/protocol/http/body/file.rb +++ b/test/protocol/http/body/file.rb @@ -9,8 +9,10 @@ let(:path) {File.expand_path('file_spec.txt', __dir__)} let(:body) {subject.open(path)} - it "should not be a stream" do - expect(body).not.to be(:stream?) + with '#stream?' do + it "should be streamable" do + expect(body).to be(:stream?) + end end with '#join' do @@ -74,4 +76,26 @@ expect(body.read).to be == "ll" end end + + with "#call" do + let(:output) {StringIO.new} + + it "can stream output" do + body.call(output) + + expect(output.string).to be == "Hello World" + end + + with "/dev/zero" do + it "can stream partial output" do + skip unless File.exist?('/dev/zero') + + body = subject.open('/dev/zero', 0...10) + + body.call(output) + + expect(output.string).to be == "\x00" * 10 + end + end + end end diff --git a/test/protocol/http/body/readable.rb b/test/protocol/http/body/readable.rb index 35d5022..fadba64 100644 --- a/test/protocol/http/body/readable.rb +++ b/test/protocol/http/body/readable.rb @@ -17,10 +17,6 @@ expect(body).not.to be(:ready?) end - it "should not be a stream" do - expect(body).not.to be(:stream?) - end - with '#finish' do it "should return empty buffered representation" do expect(body.finish).to be(:empty?) diff --git a/test/protocol/http/body/rewindable.rb b/test/protocol/http/body/rewindable.rb index b2fbb66..37fd440 100644 --- a/test/protocol/http/body/rewindable.rb +++ b/test/protocol/http/body/rewindable.rb @@ -10,10 +10,6 @@ let(:source) {Protocol::HTTP::Body::Buffered.new} let(:body) {subject.new(source)} - it "should not be a stream" do - expect(body).not.to be(:stream?) - end - it "can write and read data" do 3.times do |i| source.write("Hello World #{i}") diff --git a/test/protocol/http/body/streamable.rb b/test/protocol/http/body/streamable.rb new file mode 100644 index 0000000..2f4e91f --- /dev/null +++ b/test/protocol/http/body/streamable.rb @@ -0,0 +1,155 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2022, by Samuel Williams. + +require 'protocol/http/body/streamable' + +describe Protocol::HTTP::Body::Streamable do + let(:block) do + proc do |stream| + stream.write("Hello") + stream.write("World") + stream.close + end + end + + let(:input) {nil} + let(:body) {subject.new(block, input)} + + with "#stream?" do + it "should be streamable" do + expect(body).to be(:stream?) + end + end + + with '#block' do + it "should wrap block" do + expect(body.block).to be == block + end + end + + with '#read' do + it "can read the body" do + expect(body.read).to be == "Hello" + expect(body.read).to be == "World" + expect(body.read).to be == nil + end + + with "block that doesn't close" do + let(:block) do + proc do |stream| + stream.write("Hello") + stream.write("World") + end + end + + it "can read the body" do + expect(body.read).to be == "Hello" + expect(body.read).to be == "World" + expect(body.read).to be == nil + end + end + + with "a block that allows stream to escape" do + let(:block) do + proc do |stream| + @stream = stream + end + end + + it "can read the body" do + expect(body.read).to be == nil + + expect do + @stream.write("!") + end.to raise_exception(RuntimeError, message: be =~ /Stream is not being read!/) + end + end + end + + with '#each' do + it "can read the body" do + chunks = [] + body.each{|chunk| chunks << chunk} + expect(chunks).to be == ["Hello", "World"] + end + end + + with '#call' do + it "can read the body" do + stream = StringIO.new + body.call(stream) + expect(stream.string).to be == "HelloWorld" + end + + with "a block that raises an error" do + let(:block) do + proc do |stream| + stream.write("Hello") + + raise "Oh no... a wild error appeared!" + end + end + + it "closes the stream if an error occurs" do + stream = StringIO.new + expect(body).to receive(:close) + + expect do + body.call(stream) + end.to raise_exception(RuntimeError, message: be =~ /Oh no... a wild error appeared!/) + + expect(stream.string).to be == "Hello" + end + end + end + + with '#close' do + it "can close the body" do + expect(body.read).to be == "Hello" + body.close + end + end + + with "nested fiber" do + let(:block) do + proc do |stream| + Fiber.new do + stream.write("Hello") + end.resume + end + end + + it "can read a chunk" do + expect(body.read).to be == "Hello" + end + end + + with "buffered input" do + let(:input) {Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"])} + + let(:block) do + proc do |stream| + while chunk = stream.read_partial + stream.write(chunk) + end + end + end + + it "can read from input" do + expect(body.read).to be == "Hello" + expect(body.read).to be == " " + expect(body.read).to be == "World" + end + + it "can stream to output" do + output = StringIO.new + stream = Protocol::HTTP::Body::Stream.new(input, output) + + body.call(stream) + + expect(output.string).to be == "Hello World" + end + end +end diff --git a/test/protocol/http/body/wrapper.rb b/test/protocol/http/body/wrapper.rb index 55bdf15..991225a 100644 --- a/test/protocol/http/body/wrapper.rb +++ b/test/protocol/http/body/wrapper.rb @@ -3,15 +3,21 @@ # Released under the MIT License. # Copyright, 2023-2024, by Samuel Williams. -require 'protocol/http/body/readable' +require 'protocol/http/body/wrapper' +require 'protocol/http/body/buffered' +require 'protocol/http/request' + +require 'json' +require 'stringio' describe Protocol::HTTP::Body::Wrapper do let(:source) {Protocol::HTTP::Body::Buffered.new} let(:body) {subject.new(source)} - it "should proxy finish" do - expect(source).to receive(:finish).and_return(nil) - body.finish + with '#stream?' do + it "should not be streamable" do + expect(body).not.to be(:stream?) + end end it "should proxy close" do @@ -34,11 +40,6 @@ expect(body.length).to be == 1 end - it "should proxy stream?" do - expect(source).to receive(:stream?).and_return(true) - expect(body.stream?).to be == true - end - it "should proxy read" do expect(source).to receive(:read).and_return("!") expect(body.read).to be == "!" @@ -46,12 +47,7 @@ it "should proxy inspect" do expect(source).to receive(:inspect).and_return("!") - expect(body.inspect).to be == "!" - end - - it "should proxy call" do - expect(source).to receive(:call).and_return(nil) - body.call(nil) + expect(body.inspect).to be(:include?, "!") end with '.wrap' do @@ -90,4 +86,22 @@ expect(JSON.dump(body)).to be == body.to_json end end + + with "#each" do + it "should invoke close correctly" do + expect(body).to receive(:close) + + body.each{} + end + end + + with "#stream" do + let(:stream) {StringIO.new} + + it "should invoke close correctly" do + expect(body).to receive(:close) + + body.call(stream) + end + end end diff --git a/test/protocol/http/body/writable.rb b/test/protocol/http/body/writable.rb new file mode 100644 index 0000000..3c6ea4d --- /dev/null +++ b/test/protocol/http/body/writable.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2018-2023, by Samuel Williams. + +require 'protocol/http/body/writable' +require 'protocol/http/body/a_writable_body' + +describe Protocol::HTTP::Body::Writable do + let(:body) {subject.new} + + it_behaves_like Protocol::HTTP::Body::AWritableBody + + with "#length" do + it "should be unspecified by default" do + expect(body.length).to be_nil + end + end + + with "#closed?" do + it "should not be closed by default" do + expect(body).not.to be(:closed?) + end + end + + with "#ready?" do + it "should be ready if chunks are available" do + expect(body).not.to be(:ready?) + + body.write("Hello") + + expect(body).to be(:ready?) + end + + it "should be ready if closed" do + body.close + + expect(body).to be(:ready?) + end + end + + with "#empty?" do + it "should be empty if closed with no pending chunks" do + expect(body).not.to be(:empty?) + + body.close + + expect(body).to be(:empty?) + end + + it "should become empty when pending chunks are read" do + body.write("Hello") + body.close + + expect(body).not.to be(:empty?) + body.read + expect(body).to be(:empty?) + end + + it "should not be empty if chunks are available" do + body.write("Hello") + expect(body).not.to be(:empty?) + end + end + + with "#write" do + it "should write chunks" do + body.write("Hello") + body.write("World") + + expect(body.read).to be == "Hello" + expect(body.read).to be == "World" + end + + it "can't write to closed body" do + body.close + + expect do + body.write("Hello") + end.to raise_exception(Protocol::HTTP::Body::Writable::Closed) + end + end +end