In this tutorial we'll show how to build a simple fiber-based server using Polyphony, how to make it concurrent and how to make it resilient to errors. We'll assume you have read the overview. If you haven't yet, please go and read it now before continuing with this tutorial.
Here's what we want to build: a concurrent echo server. Our server will accept TCP connections and send back whatever it receives from the client.
We'll start by opening a server socket:
require 'polyphony'
server = TCPServer.open('127.0.0.1', 1234)
puts 'Echoing on port 1234...'
Next, we'll add a loop accepting connections:
while (client = server.accept)
handle_client(client)
end
The handle_client
method is almost trivial:
def handle_client(client)
while (data = client.gets)
client << data
end
rescue Errno::ECONNRESET
puts 'Connection reset by client'
end
Up until now, we did nothing about concurrency. In fact, our code will not be
able to handle more than one client at a time, because the accept loop cannot
continue to run until the call to #handle_client
returns, and that will not
happen as long as the read loop is not done.
Fortunately, Polyphony makes it super easy to do more than one thing at once. Let's spin up a separate fiber for each client:
while (client = server.accept)
spin { handle_client(client) }
end
Now, our little program can effectively handle thousands of clients, all with a
little sprinkling of spin
. The call to server.accept
suspends the main fiber
until a connection is made, allowing other fibers to run while it's waiting.
Likewise, the call to client.gets
suspends the client's fiber until incoming
data becomes available. Again, all of that is handled automatically by
Polyphony, and the only hint that our program is concurrent is that little
innocent call to #spin
.
Here's a flow chart showing the transfer of control between the different fibers:
Let's consider the advantage of the Polyphony concurrency model:
- We didn't need to create custom handler classes with callbacks.
- We didn't need to use custom classes or APIs for our networking code.
- Each task is expressed sequentially. Our code is terse, easy to read and, most importantly, expresses the order of events clearly and without having our logic split across different methods.
- We have a server that can scale to thousands of clients without breaking a sweat.
Now that we have a working concurrent echo server, let's add some bells and
whistles. First of all, let's get rid of clients that are not active. We'll do
this by wrapping our read loop in a call to cancel_after
:
def handle_client(client)
cancel_after(10) do |timeout|
while (data = client.gets)
timeout.restart
client << data
end
end
rescue Polyphony::Cancel
client.puts 'Closing connection due to inactivity.'
rescue Errno::ECONNRESET
puts 'Connection reset by client'
ensure
client.close
end
The call to #cancel_after
spins up a new fiber that will sleep for 10 seconds,
then cancel its parent. The call to client.gets
blocks until new data is
available. If no new data is available, the timeout
fiber will finish
sleeping, and then cancel the client handling fiber by raising a
Polyphony::Cancel
exception. However, if new data is received, the timeout
fiber is restarted, causing it to begin sleeping again for 10 seconds. If the
client has closed the connection, or some other exception occurs, the timeout
fiber is automatically stopped as it is a child of the fiber running the
handle_client
method.
The habit of always cleaning up using ensure
in the face of potential
interruptions is a fundamental element of using Polyphony correctly. This makes
your code robust, even in a highly chaotic concurrent execution environment
where fibers can be started, restarted and interrupted at any time.
Let's now add graceful shutdown to our server. This means that when the server is stopped we'll first stop accepting new connections, but we'll let any already connected clients keep their sessions.
Polyphony's concurrency model is structured. Fibers are limited to the lifetime
of their direct parent. When the main fiber terminates (on program exit), it
will terminate all its child fibers, each of which will in turn terminate its
own children. The termination of child fibers is implemented by sending each
child fiber a Polyphony::Terminate
exception. We can implement custom
termination logic simply by adding an exception handler for
Polyphony::Terminate
:
# We first refactor the echo loop into a method
def client_loop(client, timeout = nil)
while (data = client.gets)
timeout&.reset
client << data
end
end
def handle_client(client)
cancel_after(10) { |timeout| client_loop(client, timeout) }
rescue Polyphony::Cancel
client.puts 'Closing connection due to inactivity.'
rescue Polyphony::Terminate
# We add a handler for the Terminate exception, and give
client.puts 'Server is shutting down. You have 5 more seconds...'
move_on_after(5) do
client_loop(client)
end
rescue Errno::ECONNRESET
puts 'Connection reset by client'
ensure
timeout.stop
client.close
end
In this tutorial, we have shown how Polyphony can be used to create robust, highly concurrent Ruby applications. As we have discussed above, Polyphony provides a comprehensive set of tools that make it simple and intuitive to write concurrent applications, with features such as structured concurrency (for controlling fiber lifetime), timeouts (for handling inactive or slow clients), custom termination logic (for implementing graceful shutdown).
Here's the complete source code for our Polyphony-based echo server:
require 'polyphony/auto_run'
server = TCPServer.open('127.0.0.1', 1234)
puts 'Echoing on port 1234...'
def client_loop(client, timeout = nil)
while (data = client.gets)
timeout&.reset
client << data
end
end
def handle_client(client)
cancel_after(10) { |timeout| client_loop(client, timeout) }
rescue Polyphony::Cancel
client.puts 'Closing connection due to inactivity.'
rescue Polyphony::Terminate
client.puts 'Server is shutting down. You have 5 more seconds...'
move_on_after(5) do
client_loop(client)
end
rescue Errno::ECONNRESET
puts 'Connection reset by client'
ensure
timeout.stop
client.close
end
while (client = server.accept)
spin { handle_client(client) }
end