Skip to content

Commit

Permalink
maintain @readiness in Watcher::IO, fix #1
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr committed Jan 6, 2018
1 parent 6e07fc7 commit 09464f5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
16 changes: 11 additions & 5 deletions lib/lightio/library/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ def wait_readable
@io_watcher.wait_readable
end

def io_watcher
@io_watcher
end

class << self
def open(*args)
io = self.new(*args)
Expand All @@ -141,14 +145,16 @@ def pipe(*args)

def select(read_fds, write_fds=nil, _except_fds=nil, timeout=nil)
timer = timeout && Time.now
# run once ioloop
LightIO.sleep 0
loop do
r_fds = (read_fds || []).select {|fd| fd.closed? ? raise(IOError, 'closed stream') : fd.instance_variable_get(:@io_watcher).readable?}
w_fds = (write_fds || []).select {|fd| fd.closed? ? raise(IOError, 'closed stream') : fd.instance_variable_get(:@io_watcher).writable?}
# clear io watcher status
read_fds.each {|fd| fd.send(:io_watcher).clear_status}
write_fds.each {|fd| fd.send(:io_watcher).clear_status}
# run ioloop once
LightIO.sleep 0
r_fds = (read_fds || []).select {|fd| fd.closed? ? raise(IOError, 'closed stream') : fd.send(:io_watcher).readable?}
w_fds = (write_fds || []).select {|fd| fd.closed? ? raise(IOError, 'closed stream') : fd.send(:io_watcher).writable?}
e_fds = []
if r_fds.empty? && w_fds.empty?
LightIO.sleep 0
if timeout && Time.now - timer > timeout
return nil
end
Expand Down
16 changes: 14 additions & 2 deletions lib/lightio/watchers/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def initialize(io, interests=:rw)
@waiting = false
ObjectSpace.define_finalizer(self, self.class.finalizer(@monitor))
@error = nil
# maintain socket status, see https://github.com/socketry/lightio/issues/1
@readiness = nil
end

# NIO::Monitor
Expand All @@ -41,18 +43,26 @@ def finalizer(monitor)
extend Forwardable
def_delegators :monitor, :interests, :interests=, :closed?

# this method return previous IO.select status
# should avoid to directly use
def readable?
check_monitor_read
monitor.readable?
@readiness == :r || @readiness == :rw
end

# this method return previous IO.select status
# should avoid to directly use
def writable?
check_monitor_write
monitor.writable?
@readiness == :w || @readiness == :rw
end

alias :writeable? :writable?

def clear_status
@readiness = nil
end

# Blocking until io is readable
# @param [Numeric] timeout return nil after timeout seconds, otherwise return self
# @return [LightIO::Watchers::IO, nil]
Expand Down Expand Up @@ -142,6 +152,8 @@ def in_waiting(mode)
end

def callback_on_waiting
# update readiness on callback
@readiness = monitor.readiness
# only call callback on waiting
return unless io_is_ready?
if @error
Expand Down

0 comments on commit 09464f5

Please sign in to comment.