Skip to content

Commit

Permalink
Split out the request and response body implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 7, 2024
1 parent a975411 commit af0a266
Showing 1 changed file with 86 additions and 76 deletions.
162 changes: 86 additions & 76 deletions lib/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,12 @@ module Body
# In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement `stream?` and return `true`. When `stream?` returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using `#each`.
#
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
class Streamable < Readable
def self.response(request, &block)
self.new(block, request.body)
end

def self.request(&block)
self.new(block)
end

class Closed < StandardError
end

def initialize(block, input = nil)
@block = block

if input
@input = input
@finishing = true
else
# If input is nil, it means we are on the client side.
@input = Writable.new
@finishing = false
end

@output = nil
end

attr :block

module Streamable
# Represents an output wrapper around a stream, that can invoke a fiber when `#read`` is called.
#
# This behaves a little bit like a generator or lazy enumerator, in that it can be used to generate chunks of data on demand.
#
# When closing the the output, the block is invoked one last time with `nil` to indicate the end of the stream.
class Output
def initialize(input, block)
stream = Stream.new(input, self)
Expand Down Expand Up @@ -100,70 +77,103 @@ def read
end
end

# Invokes the block in a fiber which yields chunks when they are available.
def read
if @output.nil?
class Body < Readable
def initialize(block, input = nil)
@block = block
@input = input
@output = nil
end

attr :block

def stream?
true
end

# Invokes the block in a fiber which yields chunks when they are available.
def read
if @output.nil?
if @block.nil?
raise "Streaming body has already been consumed!"
end

@output = Output.new(@input, @block)
@block = nil
end

@output.read
end

# Invoke the block with the given stream.
#
# The block can read and write to the stream, and must close the stream when finishing.
def call(stream)
if @block.nil?
raise "Streaming body has already been consumed!"
raise "Streaming block has already been consumed!"
end

@output = Output.new(@input, @block)
@block = nil
block = @block

@input = @output = @block = nil

# Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream.
block.call(stream)
rescue => error
# If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here:
stream.close
raise
end

@output.read
# Closing a stream indicates we are no longer interested in reading from it.
def close(error = nil)
if output = @output
@output = nil
output.close(error)
end

if input = @input
@input = nil
input.close(error)
end
end
end

# Closing a stream indicates we are no longer interested in reading from it.
def close(error = nil)
return unless @finishing
# A request body has an extra `stream` method which can be used to stream data into the body, as the response body won't be available until the request has been sent.
class RequestBody < Body
def initialize(block)
super(block, Writable.new)
@finishing = false
end

if output = @output
@output = nil
output.close(error)
def close(error = nil)
return unless @finishing
super
end

if input = @input
@input = nil
input.close(error)
# Stream the response body into the block's input.
def stream(input)
input&.each do |chunk|
@input&.write(chunk)
end
rescue => error
raise
ensure
@finishing = true
@input&.close

self.close(error)
end
end

def stream?
true
def self.request(&block)
RequestBody.new(block)
end

# Invoke the block with the given stream.
#
# The block can read and write to the stream, and must close the stream when finishing.
def call(stream)
if @block.nil?
raise "Streaming block has already been consumed!"
end

block = @block

@input = @output = @block = nil

# Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream.
block.call(stream)
rescue => error
# If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here:
stream.close
raise
class ResponseBody < Body
end

def stream(input)
input&.each do |chunk|
@input&.write(chunk)
end
rescue => error
raise
ensure
@finishing = true
@input&.close

self.close(error)
def self.response(request, &block)
ResponseBody.new(block, request.body)
end
end
end
Expand Down

0 comments on commit af0a266

Please sign in to comment.