Skip to content

Commit

Permalink
Fixed race condition with native clients when global timeout setting … (
Browse files Browse the repository at this point in the history
#353)

* Fixed race condition with native clients when global timeout setting is used and running short lived commands.
Resolves #344.
* Updated changelog
* Updated default log formatter set by `pssh.utils` enable logger functions.
  • Loading branch information
pkittenis authored Jul 31, 2022
1 parent a0787bd commit 29d9497
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 19 deletions.
14 changes: 14 additions & 0 deletions Changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
Change Log
============

2.11.1
+++++++

Changes
--------

* Updated default log formatter set by `pssh.utils` enable logger functions.

Fixes
------

* Using native clients under `pssh.clients.native` with very short lived commands would sometimes cause unexpected
stalls/delays in reading output from completed commands when a client ``timeout`` setting was used - #344.

2.11.0
+++++++

Expand Down
16 changes: 6 additions & 10 deletions pssh/clients/base/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ def _eagain_write_errcode(self, write_func, data, eagain, timeout=None):
rc, bytes_written = write_func(data[total_written:])
total_written += bytes_written
if rc == eagain:
self.poll(timeout=timeout)
self.poll()
sleep()

def _eagain_errcode(self, func, eagain, *args, **kwargs):
Expand Down Expand Up @@ -685,21 +685,17 @@ def _remote_paths_split(self, file_path):
if _sep > 0:
return file_path[:_sep]

def poll(self, timeout=None):
def poll(self):
raise NotImplementedError

def _poll_socket(self, events, timeout=None):
def _poll_socket(self, events):
if self.sock is None:
return
# gevent.select.poll converts seconds to miliseconds to match python socket
# implementation
timeout = timeout * 1000 if timeout is not None else 100
poller = poll()
poller.register(self.sock, eventmask=events)
poller.poll(timeout=timeout)
poller.poll(timeout=1)

def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None):
timeout = self.timeout if timeout is None else timeout
def _poll_errcodes(self, directions_func, inbound, outbound):
directions = directions_func()
if directions == 0:
return
Expand All @@ -708,4 +704,4 @@ def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None):
events = POLLIN
if directions & outbound:
events |= POLLOUT
self._poll_socket(events, timeout=timeout)
self._poll_socket(events)
2 changes: 1 addition & 1 deletion pssh/clients/native/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,12 +726,12 @@ def poll(self, timeout=None):
Blocks current greenlet only if socket has pending read or write operations
in the appropriate direction.
:param timeout: Deprecated and unused - to be removed.
"""
self._poll_errcodes(
self.session.block_directions,
LIBSSH2_SESSION_BLOCK_INBOUND,
LIBSSH2_SESSION_BLOCK_OUTBOUND,
timeout=timeout,
)

def _eagain_write(self, write_func, data, timeout=None):
Expand Down
5 changes: 1 addition & 4 deletions pssh/clients/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

try:
from io import BytesIO
except ImportError:
from cStringIO import StringIO as BytesIO
from io import BytesIO

from gevent import sleep
from gevent.event import Event
Expand Down
7 changes: 4 additions & 3 deletions pssh/clients/ssh/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def execute(self, cmd, use_pty=False, channel=None):

def _read_output_to_buffer(self, channel, _buffer, is_stderr=False):
while True:
self.poll(timeout=self.timeout)
self.poll()
try:
size, data = channel.read_nonblocking(is_stderr=is_stderr)
except EOF:
Expand Down Expand Up @@ -316,12 +316,13 @@ def close_channel(self, channel):
self._eagain(channel.close, timeout=self.timeout)

def poll(self, timeout=None):
"""ssh-python based co-operative gevent poll on session socket."""
"""ssh-python based co-operative gevent poll on session socket.
:param timeout: Deprecated and unused - to be removed.
"""
self._poll_errcodes(
self.session.get_poll_flags,
SSH_READ_PENDING,
SSH_WRITE_PENDING,
timeout=timeout,
)

def _eagain(self, func, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion pssh/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def enable_logger(_logger, level=logging.INFO):
logger.warning("Logger already has a StreamHandler attached")
return
handler = logging.StreamHandler()
host_log_format = logging.Formatter('%(message)s')
host_log_format = logging.Formatter('%(asctime)s %(levelname)-8s %(name)-15s %(message)s')
handler.setFormatter(host_log_format)
_logger.addHandler(handler)

Expand Down
14 changes: 14 additions & 0 deletions tests/native/test_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,20 @@ def test_copy_remote_dir_encoding(self):
]
self.assertListEqual(remote_file_mock.call_args_list, call_args)

def test_many_short_lived_commands(self):
for _ in range(20):
timeout = 2
start = datetime.now()
client = SSHClient(self.host, port=self.port,
pkey=self.user_key,
num_retries=1,
allow_agent=False,
timeout=timeout)
host_out = client.run_command(self.cmd)
_ = list(host_out.stdout)
end = datetime.now() - start
duration = end.total_seconds()
self.assertTrue(duration < timeout * 0.9, msg=f"Duration of instant cmd is {duration}")

# TODO
# * read output callback

0 comments on commit 29d9497

Please sign in to comment.