From af0a266d5f086eace239b974ff57c41c4d079777 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 7 Sep 2024 12:02:36 +1200 Subject: [PATCH] Split out the request and response body implementations. --- lib/protocol/http/body/streamable.rb | 162 ++++++++++++++------------- 1 file changed, 86 insertions(+), 76 deletions(-) diff --git a/lib/protocol/http/body/streamable.rb b/lib/protocol/http/body/streamable.rb index c9ef9c8..f695f54 100644 --- a/lib/protocol/http/body/streamable.rb +++ b/lib/protocol/http/body/streamable.rb @@ -16,35 +16,12 @@ module Body # In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement `stream?` and return `true`. When `stream?` returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using `#each`. # # When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server. - class Streamable < Readable - def self.response(request, &block) - self.new(block, request.body) - end - - def self.request(&block) - self.new(block) - end - - class Closed < StandardError - end - - def initialize(block, input = nil) - @block = block - - if input - @input = input - @finishing = true - else - # If input is nil, it means we are on the client side. - @input = Writable.new - @finishing = false - end - - @output = nil - end - - attr :block - + module Streamable + # Represents an output wrapper around a stream, that can invoke a fiber when `#read`` is called. + # + # This behaves a little bit like a generator or lazy enumerator, in that it can be used to generate chunks of data on demand. + # + # When closing the the output, the block is invoked one last time with `nil` to indicate the end of the stream. class Output def initialize(input, block) stream = Stream.new(input, self) @@ -100,70 +77,103 @@ def read end end - # Invokes the block in a fiber which yields chunks when they are available. - def read - if @output.nil? + class Body < Readable + def initialize(block, input = nil) + @block = block + @input = input + @output = nil + end + + attr :block + + def stream? + true + end + + # Invokes the block in a fiber which yields chunks when they are available. + def read + if @output.nil? + if @block.nil? + raise "Streaming body has already been consumed!" + end + + @output = Output.new(@input, @block) + @block = nil + end + + @output.read + end + + # Invoke the block with the given stream. + # + # The block can read and write to the stream, and must close the stream when finishing. + def call(stream) if @block.nil? - raise "Streaming body has already been consumed!" + raise "Streaming block has already been consumed!" end - @output = Output.new(@input, @block) - @block = nil + block = @block + + @input = @output = @block = nil + + # Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream. + block.call(stream) + rescue => error + # If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here: + stream.close + raise end - @output.read + # Closing a stream indicates we are no longer interested in reading from it. + def close(error = nil) + if output = @output + @output = nil + output.close(error) + end + + if input = @input + @input = nil + input.close(error) + end + end end - # Closing a stream indicates we are no longer interested in reading from it. - def close(error = nil) - return unless @finishing + # A request body has an extra `stream` method which can be used to stream data into the body, as the response body won't be available until the request has been sent. + class RequestBody < Body + def initialize(block) + super(block, Writable.new) + @finishing = false + end - if output = @output - @output = nil - output.close(error) + def close(error = nil) + return unless @finishing + super end - if input = @input - @input = nil - input.close(error) + # Stream the response body into the block's input. + def stream(input) + input&.each do |chunk| + @input&.write(chunk) + end + rescue => error + raise + ensure + @finishing = true + @input&.close + + self.close(error) end end - def stream? - true + def self.request(&block) + RequestBody.new(block) end - # Invoke the block with the given stream. - # - # The block can read and write to the stream, and must close the stream when finishing. - def call(stream) - if @block.nil? - raise "Streaming block has already been consumed!" - end - - block = @block - - @input = @output = @block = nil - - # Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream. - block.call(stream) - rescue => error - # If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here: - stream.close - raise + class ResponseBody < Body end - def stream(input) - input&.each do |chunk| - @input&.write(chunk) - end - rescue => error - raise - ensure - @finishing = true - @input&.close - - self.close(error) + def self.response(request, &block) + ResponseBody.new(block, request.body) end end end