Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 9, 2024
1 parent 0bb3b0a commit 079cdd2
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 20 deletions.
13 changes: 13 additions & 0 deletions examples/streaming/bidirectional.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
output = Protocol::HTTP::Body::Streamable.response(request) do |stream|
# Simple echo server:
while chunk = stream.readpartial(1024)
$stderr.puts "Server chunk: #{chunk.inspect}"
stream.write(chunk)
end
rescue EOFError
$stderr.puts "Server EOF."
# Ignore EOF errors.
ensure
$stderr.puts "Server closing stream."
stream.close
end

Expand All @@ -38,18 +41,28 @@
streamable = Protocol::HTTP::Body::Streamable.request do |stream|
stream.write("Hello, ")
stream.write("World!")

$stderr.puts "Client closing write..."
stream.close_write

$stderr.puts "Client reading response..."

while chunk = stream.readpartial(1024)
$stderr.puts "Client chunk: #{chunk.inspect}"
puts chunk
end
$stderr.puts "Client done reading response."
rescue EOFError
$stderr.puts "Client EOF."
# Ignore EOF errors.
ensure
$stderr.puts "Client closing stream."
stream.close
end

$stderr.puts "Client sending request..."
response = client.get("/", body: streamable)
$stderr.puts "Client received response and streaming it..."
streamable.stream(response.body)
ensure
server_task.stop
Expand Down
15 changes: 15 additions & 0 deletions examples/streaming/gems.locked
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,20 @@ GEM
fiber-annotation
fiber-local (~> 1.1)
json
debug (1.9.2)
irb (~> 1.10)
reline (>= 0.3.8)
fiber-annotation (0.2.0)
fiber-local (1.1.0)
fiber-storage
fiber-storage (1.0.0)
io-console (0.7.2)
io-endpoint (0.13.1)
io-event (1.6.5)
io-stream (0.4.0)
irb (1.14.0)
rdoc (>= 4.0.0)
reline (>= 0.4.2)
json (2.7.2)
metrics (0.10.2)
protocol-hpack (1.5.0)
Expand All @@ -42,6 +49,13 @@ GEM
protocol-http2 (0.18.0)
protocol-hpack (~> 1.4)
protocol-http (~> 0.18)
psych (5.1.2)
stringio
rdoc (6.7.0)
psych (>= 4.0.0)
reline (0.5.10)
io-console (~> 0.5)
stringio (3.1.1)
traces (0.13.1)

PLATFORMS
Expand All @@ -51,6 +65,7 @@ PLATFORMS
DEPENDENCIES
async
async-http
debug
protocol-http!

BUNDLED WITH
Expand Down
2 changes: 2 additions & 0 deletions examples/streaming/gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
gem "async"
gem "async-http"
gem "protocol-http", path: "../../"

gem "debug"
2 changes: 2 additions & 0 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@
gem "bake-test"
gem "bake-test-external"
end

gem "debug", ">= 1.0.0"
111 changes: 91 additions & 20 deletions lib/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ def self.new(*arguments)
end
end

def self.request(&block)
DeferredBody.new(block)
end

def self.response(request, &block)
Body.new(block, request.body)
end

# Represents an output wrapper around a stream, that can invoke a fiber when `#read`` is called.
#
# This behaves a little bit like a generator or lazy enumerator, in that it can be used to generate chunks of data on demand.
Expand All @@ -43,14 +51,12 @@ def initialize(input, block)
stream = Stream.new(input, self)

@from = nil

@fiber = Fiber.new do |from|
@from = from
@to = Fiber.new do
block.call(stream)
rescue => error
# Ignore.
ensure
@fiber = nil
@to = nil
self.close(error)
end
end
Expand All @@ -59,39 +65,45 @@ def initialize(input, block)
def write(chunk)
if from = @from
@from = nil
@from = from.transfer(chunk)
@to = Fiber.current

from.transfer(chunk)
else
raise ClosedError, "Stream is not being read!"
end
end

# Indicates that no further output will be generated.
def close_write(error = nil)
# We might want to specialize the implementation later...
close(error)
end

# Can be invoked by the block to close the stream. Closing the output means that no more chunks will be generated.
def close(error = nil)
$stderr.puts "Closing from: #{@from}, to: #{@to}, error: #{error}"
if from = @from
# We are closing from within the output fiber, so we need to transfer back to `@from`:
@from = nil
@to = Fiber.current

if error
from.raise(error)
else
$stderr.puts "Transferring to #{from}...", from.backtrace
from.transfer(nil)
end
elsif @fiber
elsif to = @to
# We are closing from outside the output fiber, so we need to resume the fiber appropriately:
@from = Fiber.current
@to = nil

if error
# The fiber will be resumed from where it last called write, and we will raise the error there:
@fiber.raise(error)
to.raise(error)
else
begin
# If we get here, it means we are closing the fiber from the outside, so we need to transfer control back to the fiber:
@fiber.transfer(nil)
to.transfer(nil)
rescue Protocol::HTTP::Body::Streamable::ClosedError
# If the fiber then tries to write to the stream, it will raise a ClosedError, and we will end up here. We can ignore it, as we are already closing the stream and don't care about further writes.
end
Expand All @@ -102,7 +114,14 @@ def close(error = nil)
def read
raise RuntimeError, "Stream is already being read!" if @from

@fiber&.transfer(Fiber.current)
if to = @to
@from = Fiber.current
@to = nil

to.transfer
else
return nil
end
end
end

Expand Down Expand Up @@ -155,30 +174,82 @@ def call(stream)

# Closing a stream indicates we are no longer interested in reading from it.
def close(error = nil)
$stderr.puts "Closing output #{@output}..."
if output = @output
@output = nil
# Closing the output here may take some time, as it may need to finish handling the stream:
output.close(error)
end

$stderr.puts "Closing input #{@input}..."
if input = @input
@input = nil
input.close(error)
end

if output = @output
@output = nil
output.close(error)
end
end

class Input
def initialize(from = Fiber.current)
@from = from
@to = nil
end

def read
if from = @from
@from = nil
@to = Fiber.current

return from.transfer
else
raise ClosedError, "Stream is not being written!"
end
end

def write(chunk)
if to = @to
@from = Fiber.current
@to = nil

to.transfer(chunk)
else
raise ClosedError, "Stream is not being read!"
end
end

def close_write(error = nil)
if to = @to
@from = Fiber.current
@to = nil

if error
to.raise(error)
else
to.transfer(nil)
end
end
end

def close(error = nil)
close_write(error)
end

def stream(body)
body&.each do |chunk|
self.write(chunk)
end
end
end

# A deferred body has an extra `stream` method which can be used to stream data into the body, as the response body won't be available until the request has been sent.
class DeferredBody < Body
def initialize(block)
super(block, Writable.new)
super(block, Input.new)
end

# Stream the response body into the block's input.
def stream(input)
input&.each do |chunk|
@input&.write(chunk)
end
@input&.close_write
def stream(body)
@input.stream(body)
end
end
end
Expand Down

0 comments on commit 079cdd2

Please sign in to comment.