Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 6, 2024
1 parent e9a5ffc commit b7814ef
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 15 deletions.
52 changes: 52 additions & 0 deletions examples/streaming/bidirectional.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env ruby

require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'

require 'protocol/http/body/streamable'
require 'protocol/http/body/writable'
require 'protocol/http/body/stream'

endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')

Async do
server = Async::HTTP::Server.for(endpoint) do |request|
output = Protocol::HTTP::Body::Streamable.response(request) do |stream|
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

Protocol::HTTP::Response[200, {}, output]
end

server_task = Async{server.run}

client = Async::HTTP::Client.new(endpoint)

streamable = Protocol::HTTP::Body::Streamable.request do |stream|
stream.write("Hello, ")
stream.write("World!")
stream.close_write

while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

response = client.get("/", body: streamable)
streamable.stream(response.body)
ensure
server_task.stop
end
57 changes: 57 additions & 0 deletions examples/streaming/gems.locked
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
PATH
remote: ../..
specs:
protocol-http (0.33.0)

GEM
remote: https://rubygems.org/
specs:
async (2.17.0)
console (~> 1.26)
fiber-annotation
io-event (~> 1.6, >= 1.6.5)
async-http (0.75.0)
async (>= 2.10.2)
async-pool (~> 0.7)
io-endpoint (~> 0.11)
io-stream (~> 0.4)
protocol-http (~> 0.30)
protocol-http1 (~> 0.20)
protocol-http2 (~> 0.18)
traces (>= 0.10)
async-pool (0.8.1)
async (>= 1.25)
metrics
traces
console (1.27.0)
fiber-annotation
fiber-local (~> 1.1)
json
fiber-annotation (0.2.0)
fiber-local (1.1.0)
fiber-storage
fiber-storage (1.0.0)
io-endpoint (0.13.1)
io-event (1.6.5)
io-stream (0.4.0)
json (2.7.2)
metrics (0.10.2)
protocol-hpack (1.5.0)
protocol-http1 (0.22.0)
protocol-http (~> 0.22)
protocol-http2 (0.18.0)
protocol-hpack (~> 1.4)
protocol-http (~> 0.18)
traces (0.13.1)

PLATFORMS
ruby
x86_64-linux

DEPENDENCIES
async
async-http
protocol-http!

BUNDLED WITH
2.5.16
5 changes: 5 additions & 0 deletions examples/streaming/gems.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
source "https://rubygems.org"

gem "async"
gem "async-http"
gem "protocol-http", path: "../../"
56 changes: 56 additions & 0 deletions examples/streaming/unidirectional.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env ruby

require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'

require 'protocol/http/body/stream'
require 'protocol/http/body/writable'

endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')

Async do
server = Async::HTTP::Server.for(endpoint) do |request|
output = Protocol::HTTP::Body::Writable.new
stream = Protocol::HTTP::Body::Stream.new(request.body, output)

Async do
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

Protocol::HTTP::Response[200, {}, output]
end

server_task = Async{server.run}

client = Async::HTTP::Client.new(endpoint)

input = Protocol::HTTP::Body::Writable.new
response = client.get("/", body: input)

begin
stream = Protocol::HTTP::Body::Stream.new(response.body, input)

stream.write("Hello, ")
stream.write("World!")
stream.close_write

while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
ensure
server_task.stop
end
2 changes: 1 addition & 1 deletion guides/links.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
getting-started:
order: 1
design-overview:
order: 2
order: 10
131 changes: 131 additions & 0 deletions guides/streaming/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Streaming

This guide gives an overview of how to implement streaming requests and responses.

## Independent Uni-directional Streaming

The request and response body work independently of each other can stream data in both directions. {ruby Protocol::HTTP::Body::Stream} provides an interface to merge these independent streams into an IO-like interface.

```ruby
#!/usr/bin/env ruby

require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'

require 'protocol/http/body/stream'
require 'protocol/http/body/writable'

endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')

Async do
server = Async::HTTP::Server.for(endpoint) do |request|
output = Protocol::HTTP::Body::Writable.new
stream = Protocol::HTTP::Body::Stream.new(request.body, output)

Async do
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

Protocol::HTTP::Response[200, {}, output]
end

server_task = Async{server.run}

client = Async::HTTP::Client.new(endpoint)

input = Protocol::HTTP::Body::Writable.new
response = client.get("/", body: input)

begin
stream = Protocol::HTTP::Body::Stream.new(response.body, input)

stream.write("Hello, ")
stream.write("World!")
stream.close_write

while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
ensure
server_task.stop
end
```

This approach works quite well, especially when the input and output bodies are independently compressed, decompressed, or chunked. However, some protocols, notably, WebSockets operate on the raw connection and don't require this level of abstraction.

## Bi-directional Streaming

While WebSockets can work on the above streaming interface, it's a bit more convenient to use the streaming interface directly, which gives raw access to the underlying stream where possible.

```ruby
#!/usr/bin/env ruby

require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'

require 'protocol/http/body/stream'
require 'protocol/http/body/writable'

endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')

Async do
server = Async::HTTP::Server.for(endpoint) do |request|
streamable = Protocol::HTTP::Body::Streamable.
output = Protocol::HTTP::Body::Writable.new
stream = Protocol::HTTP::Body::Stream.new(request.body, output)

Async do
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

Protocol::HTTP::Response[200, {}, output]
end

server_task = Async{server.run}

client = Async::HTTP::Client.new(endpoint)

input = Protocol::HTTP::Body::Writable.new
response = client.get("/", body: input)

begin
stream = Protocol::HTTP::Body::Stream.new(response.body, input)

stream.write("Hello, ")
stream.write("World!")
stream.close_write

while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
ensure
server_task.stop
end
2 changes: 2 additions & 0 deletions lib/protocol/http/body/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def stream?

def call(stream)
IO.copy_stream(@file, stream, @remaining)
ensure
stream.close
end

def join
Expand Down
10 changes: 7 additions & 3 deletions lib/protocol/http/body/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,14 @@ def stream?
false
end

# Write the body to the given stream.
# Invoke the body with the given stream.
#
# In some cases, the stream may also be readable, such as when hijacking an HTTP/1 connection. In that case, it may be acceptable to read and write to the stream directly.
# The default implementation simply writes each chunk to the stream. If the body is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs.
#
# If the stream is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs.
# Write the body to the given stream.
#
# @parameter stream [IO | Object] An `IO`-like object that responds to `#read`, `#write` and `#flush`.
# @returns [Boolean] Whether the ownership of the stream was transferred.
def call(stream)
self.each do |chunk|
stream.write(chunk)
Expand All @@ -113,6 +115,8 @@ def call(stream)
stream.flush
end
end
ensure
stream.close
end

# Read all remaining chunks into a buffered body and close the underlying input.
Expand Down
Loading

0 comments on commit b7814ef

Please sign in to comment.