From b7814ef45af072fe8821af9a04fcd0f10586226d Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 6 Sep 2024 21:42:06 +1200 Subject: [PATCH] WIP --- examples/streaming/bidirectional.rb | 52 +++++++++++ examples/streaming/gems.locked | 57 ++++++++++++ examples/streaming/gems.rb | 5 + examples/streaming/unidirectional.rb | 56 ++++++++++++ guides/links.yaml | 2 +- guides/streaming/readme.md | 131 +++++++++++++++++++++++++++ lib/protocol/http/body/file.rb | 2 + lib/protocol/http/body/readable.rb | 10 +- lib/protocol/http/body/streamable.rb | 49 +++++++--- 9 files changed, 349 insertions(+), 15 deletions(-) create mode 100755 examples/streaming/bidirectional.rb create mode 100644 examples/streaming/gems.locked create mode 100644 examples/streaming/gems.rb create mode 100755 examples/streaming/unidirectional.rb create mode 100644 guides/streaming/readme.md diff --git a/examples/streaming/bidirectional.rb b/examples/streaming/bidirectional.rb new file mode 100755 index 0000000..0807b4e --- /dev/null +++ b/examples/streaming/bidirectional.rb @@ -0,0 +1,52 @@ +#!/usr/bin/env ruby + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' + +require 'protocol/http/body/streamable' +require 'protocol/http/body/writable' +require 'protocol/http/body/stream' + +endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000') + +Async do + server = Async::HTTP::Server.for(endpoint) do |request| + output = Protocol::HTTP::Body::Streamable.response(request) do |stream| + # Simple echo server: + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + Protocol::HTTP::Response[200, {}, output] + end + + server_task = Async{server.run} + + client = Async::HTTP::Client.new(endpoint) + + streamable = Protocol::HTTP::Body::Streamable.request do |stream| + stream.write("Hello, ") + stream.write("World!") + stream.close_write + + while chunk = stream.readpartial(1024) + puts chunk + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + response = client.get("/", body: streamable) + streamable.stream(response.body) +ensure + server_task.stop +end diff --git a/examples/streaming/gems.locked b/examples/streaming/gems.locked new file mode 100644 index 0000000..4af7498 --- /dev/null +++ b/examples/streaming/gems.locked @@ -0,0 +1,57 @@ +PATH + remote: ../.. + specs: + protocol-http (0.33.0) + +GEM + remote: https://rubygems.org/ + specs: + async (2.17.0) + console (~> 1.26) + fiber-annotation + io-event (~> 1.6, >= 1.6.5) + async-http (0.75.0) + async (>= 2.10.2) + async-pool (~> 0.7) + io-endpoint (~> 0.11) + io-stream (~> 0.4) + protocol-http (~> 0.30) + protocol-http1 (~> 0.20) + protocol-http2 (~> 0.18) + traces (>= 0.10) + async-pool (0.8.1) + async (>= 1.25) + metrics + traces + console (1.27.0) + fiber-annotation + fiber-local (~> 1.1) + json + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.0) + io-endpoint (0.13.1) + io-event (1.6.5) + io-stream (0.4.0) + json (2.7.2) + metrics (0.10.2) + protocol-hpack (1.5.0) + protocol-http1 (0.22.0) + protocol-http (~> 0.22) + protocol-http2 (0.18.0) + protocol-hpack (~> 1.4) + protocol-http (~> 0.18) + traces (0.13.1) + +PLATFORMS + ruby + x86_64-linux + +DEPENDENCIES + async + async-http + protocol-http! + +BUNDLED WITH + 2.5.16 diff --git a/examples/streaming/gems.rb b/examples/streaming/gems.rb new file mode 100644 index 0000000..eef9f94 --- /dev/null +++ b/examples/streaming/gems.rb @@ -0,0 +1,5 @@ +source "https://rubygems.org" + +gem "async" +gem "async-http" +gem "protocol-http", path: "../../" diff --git a/examples/streaming/unidirectional.rb b/examples/streaming/unidirectional.rb new file mode 100755 index 0000000..87f26d1 --- /dev/null +++ b/examples/streaming/unidirectional.rb @@ -0,0 +1,56 @@ +#!/usr/bin/env ruby + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' + +require 'protocol/http/body/stream' +require 'protocol/http/body/writable' + +endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000') + +Async do + server = Async::HTTP::Server.for(endpoint) do |request| + output = Protocol::HTTP::Body::Writable.new + stream = Protocol::HTTP::Body::Stream.new(request.body, output) + + Async do + # Simple echo server: + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + Protocol::HTTP::Response[200, {}, output] + end + + server_task = Async{server.run} + + client = Async::HTTP::Client.new(endpoint) + + input = Protocol::HTTP::Body::Writable.new + response = client.get("/", body: input) + + begin + stream = Protocol::HTTP::Body::Stream.new(response.body, input) + + stream.write("Hello, ") + stream.write("World!") + stream.close_write + + while chunk = stream.readpartial(1024) + puts chunk + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end +ensure + server_task.stop +end diff --git a/guides/links.yaml b/guides/links.yaml index e240013..23c0f9f 100644 --- a/guides/links.yaml +++ b/guides/links.yaml @@ -1,4 +1,4 @@ getting-started: order: 1 design-overview: - order: 2 + order: 10 diff --git a/guides/streaming/readme.md b/guides/streaming/readme.md new file mode 100644 index 0000000..ae24ee6 --- /dev/null +++ b/guides/streaming/readme.md @@ -0,0 +1,131 @@ +# Streaming + +This guide gives an overview of how to implement streaming requests and responses. + +## Independent Uni-directional Streaming + +The request and response body work independently of each other can stream data in both directions. {ruby Protocol::HTTP::Body::Stream} provides an interface to merge these independent streams into an IO-like interface. + +```ruby +#!/usr/bin/env ruby + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' + +require 'protocol/http/body/stream' +require 'protocol/http/body/writable' + +endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000') + +Async do + server = Async::HTTP::Server.for(endpoint) do |request| + output = Protocol::HTTP::Body::Writable.new + stream = Protocol::HTTP::Body::Stream.new(request.body, output) + + Async do + # Simple echo server: + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + Protocol::HTTP::Response[200, {}, output] + end + + server_task = Async{server.run} + + client = Async::HTTP::Client.new(endpoint) + + input = Protocol::HTTP::Body::Writable.new + response = client.get("/", body: input) + + begin + stream = Protocol::HTTP::Body::Stream.new(response.body, input) + + stream.write("Hello, ") + stream.write("World!") + stream.close_write + + while chunk = stream.readpartial(1024) + puts chunk + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end +ensure + server_task.stop +end +``` + +This approach works quite well, especially when the input and output bodies are independently compressed, decompressed, or chunked. However, some protocols, notably, WebSockets operate on the raw connection and don't require this level of abstraction. + +## Bi-directional Streaming + +While WebSockets can work on the above streaming interface, it's a bit more convenient to use the streaming interface directly, which gives raw access to the underlying stream where possible. + +```ruby +#!/usr/bin/env ruby + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' + +require 'protocol/http/body/stream' +require 'protocol/http/body/writable' + +endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000') + +Async do + server = Async::HTTP::Server.for(endpoint) do |request| + streamable = Protocol::HTTP::Body::Streamable. + output = Protocol::HTTP::Body::Writable.new + stream = Protocol::HTTP::Body::Stream.new(request.body, output) + + Async do + # Simple echo server: + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + Protocol::HTTP::Response[200, {}, output] + end + + server_task = Async{server.run} + + client = Async::HTTP::Client.new(endpoint) + + input = Protocol::HTTP::Body::Writable.new + response = client.get("/", body: input) + + begin + stream = Protocol::HTTP::Body::Stream.new(response.body, input) + + stream.write("Hello, ") + stream.write("World!") + stream.close_write + + while chunk = stream.readpartial(1024) + puts chunk + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end +ensure + server_task.stop +end \ No newline at end of file diff --git a/lib/protocol/http/body/file.rb b/lib/protocol/http/body/file.rb index 996463b..4854f28 100644 --- a/lib/protocol/http/body/file.rb +++ b/lib/protocol/http/body/file.rb @@ -74,6 +74,8 @@ def stream? def call(stream) IO.copy_stream(@file, stream, @remaining) + ensure + stream.close end def join diff --git a/lib/protocol/http/body/readable.rb b/lib/protocol/http/body/readable.rb index 226c897..889f27b 100644 --- a/lib/protocol/http/body/readable.rb +++ b/lib/protocol/http/body/readable.rb @@ -98,12 +98,14 @@ def stream? false end - # Write the body to the given stream. + # Invoke the body with the given stream. # - # In some cases, the stream may also be readable, such as when hijacking an HTTP/1 connection. In that case, it may be acceptable to read and write to the stream directly. + # The default implementation simply writes each chunk to the stream. If the body is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs. # - # If the stream is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs. + # Write the body to the given stream. # + # @parameter stream [IO | Object] An `IO`-like object that responds to `#read`, `#write` and `#flush`. + # @returns [Boolean] Whether the ownership of the stream was transferred. def call(stream) self.each do |chunk| stream.write(chunk) @@ -113,6 +115,8 @@ def call(stream) stream.flush end end + ensure + stream.close end # Read all remaining chunks into a buffered body and close the underlying input. diff --git a/lib/protocol/http/body/streamable.rb b/lib/protocol/http/body/streamable.rb index 9e8f641..1da360b 100644 --- a/lib/protocol/http/body/streamable.rb +++ b/lib/protocol/http/body/streamable.rb @@ -4,6 +4,8 @@ # Copyright, 2022, by Samuel Williams. require_relative 'readable' +require_relative 'writable' + require_relative 'stream' module Protocol @@ -15,6 +17,14 @@ module Body # # When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server. class Streamable < Readable + def self.response(request, &block) + self.new(block, request.body) + end + + def self.request(&block) + self.new(block, Writable.new) + end + class Closed < StandardError end @@ -27,13 +37,9 @@ def initialize(block, input = nil) # Closing a stream indicates we are no longer interested in reading from it. def close(error = nil) if @input - @input.close + @input.close(error) @input = nil end - - if @output - @output.close(error) - end end attr :block @@ -89,23 +95,44 @@ def read # Invokes the block in a fiber which yields chunks when they are available. def read - @output ||= Output.new(@input, @block) + if @output.nil? + @output = Output.new(@input, @block) + @block = nil + end - return @output.read + @output.read end def stream? true end + # Invoke the block with the given stream. + # + # The block can read and write to the stream, and must close the stream when finished. def call(stream) - raise "Streaming body has already been read!" if @output - - @block.call(stream) + if block = @block + @input = @block = nil + + # Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream. + block.call(stream) + else + raise "Streaming body has already been read!" + end + rescue => error + # If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here: + stream.close + raise + end + + def stream(input) + input.each do |chunk| + @input&.write(chunk) + end rescue => error raise ensure - self.close(error) + @input&.close(error) end end end