From 29d94979184f5842155b2250ad87cbef0f9df138 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 31 Jul 2022 18:09:28 +0100 Subject: [PATCH] =?UTF-8?q?Fixed=20race=20condition=20with=20native=20clie?= =?UTF-8?q?nts=20when=20global=20timeout=20setting=20=E2=80=A6=20(#353)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixed race condition with native clients when global timeout setting is used and running short lived commands. Resolves #344. * Updated changelog * Updated default log formatter set by `pssh.utils` enable logger functions. --- Changelog.rst | 14 ++++++++++++++ pssh/clients/base/single.py | 16 ++++++---------- pssh/clients/native/single.py | 2 +- pssh/clients/reader.py | 5 +---- pssh/clients/ssh/single.py | 7 ++++--- pssh/utils.py | 2 +- tests/native/test_single_client.py | 14 ++++++++++++++ 7 files changed, 41 insertions(+), 19 deletions(-) diff --git a/Changelog.rst b/Changelog.rst index 285cda7c..8f1b93dc 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -1,6 +1,20 @@ Change Log ============ +2.11.1 ++++++++ + +Changes +-------- + +* Updated default log formatter set by `pssh.utils` enable logger functions. + +Fixes +------ + +* Using native clients under `pssh.clients.native` with very short lived commands would sometimes cause unexpected + stalls/delays in reading output from completed commands when a client ``timeout`` setting was used - #344. + 2.11.0 +++++++ diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 56bc2c16..4255a14f 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -559,7 +559,7 @@ def _eagain_write_errcode(self, write_func, data, eagain, timeout=None): rc, bytes_written = write_func(data[total_written:]) total_written += bytes_written if rc == eagain: - self.poll(timeout=timeout) + self.poll() sleep() def _eagain_errcode(self, func, eagain, *args, **kwargs): @@ -685,21 +685,17 @@ def _remote_paths_split(self, file_path): if _sep > 0: return file_path[:_sep] - def poll(self, timeout=None): + def poll(self): raise NotImplementedError - def _poll_socket(self, events, timeout=None): + def _poll_socket(self, events): if self.sock is None: return - # gevent.select.poll converts seconds to miliseconds to match python socket - # implementation - timeout = timeout * 1000 if timeout is not None else 100 poller = poll() poller.register(self.sock, eventmask=events) - poller.poll(timeout=timeout) + poller.poll(timeout=1) - def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None): - timeout = self.timeout if timeout is None else timeout + def _poll_errcodes(self, directions_func, inbound, outbound): directions = directions_func() if directions == 0: return @@ -708,4 +704,4 @@ def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None): events = POLLIN if directions & outbound: events |= POLLOUT - self._poll_socket(events, timeout=timeout) + self._poll_socket(events) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 7f3ebaf4..2dc4e7cf 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -726,12 +726,12 @@ def poll(self, timeout=None): Blocks current greenlet only if socket has pending read or write operations in the appropriate direction. + :param timeout: Deprecated and unused - to be removed. """ self._poll_errcodes( self.session.block_directions, LIBSSH2_SESSION_BLOCK_INBOUND, LIBSSH2_SESSION_BLOCK_OUTBOUND, - timeout=timeout, ) def _eagain_write(self, write_func, data, timeout=None): diff --git a/pssh/clients/reader.py b/pssh/clients/reader.py index f9ab1e9f..2fb19094 100644 --- a/pssh/clients/reader.py +++ b/pssh/clients/reader.py @@ -15,10 +15,7 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -try: - from io import BytesIO -except ImportError: - from cStringIO import StringIO as BytesIO +from io import BytesIO from gevent import sleep from gevent.event import Event diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 6a11c2ab..855ada98 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -243,7 +243,7 @@ def execute(self, cmd, use_pty=False, channel=None): def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): while True: - self.poll(timeout=self.timeout) + self.poll() try: size, data = channel.read_nonblocking(is_stderr=is_stderr) except EOF: @@ -316,12 +316,13 @@ def close_channel(self, channel): self._eagain(channel.close, timeout=self.timeout) def poll(self, timeout=None): - """ssh-python based co-operative gevent poll on session socket.""" + """ssh-python based co-operative gevent poll on session socket. + :param timeout: Deprecated and unused - to be removed. + """ self._poll_errcodes( self.session.get_poll_flags, SSH_READ_PENDING, SSH_WRITE_PENDING, - timeout=timeout, ) def _eagain(self, func, *args, **kwargs): diff --git a/pssh/utils.py b/pssh/utils.py index 9eca4d23..3ee6ef2b 100644 --- a/pssh/utils.py +++ b/pssh/utils.py @@ -33,7 +33,7 @@ def enable_logger(_logger, level=logging.INFO): logger.warning("Logger already has a StreamHandler attached") return handler = logging.StreamHandler() - host_log_format = logging.Formatter('%(message)s') + host_log_format = logging.Formatter('%(asctime)s %(levelname)-8s %(name)-15s %(message)s') handler.setFormatter(host_log_format) _logger.addHandler(handler) diff --git a/tests/native/test_single_client.py b/tests/native/test_single_client.py index b99a4065..61df944e 100644 --- a/tests/native/test_single_client.py +++ b/tests/native/test_single_client.py @@ -1039,6 +1039,20 @@ def test_copy_remote_dir_encoding(self): ] self.assertListEqual(remote_file_mock.call_args_list, call_args) + def test_many_short_lived_commands(self): + for _ in range(20): + timeout = 2 + start = datetime.now() + client = SSHClient(self.host, port=self.port, + pkey=self.user_key, + num_retries=1, + allow_agent=False, + timeout=timeout) + host_out = client.run_command(self.cmd) + _ = list(host_out.stdout) + end = datetime.now() - start + duration = end.total_seconds() + self.assertTrue(duration < timeout * 0.9, msg=f"Duration of instant cmd is {duration}") # TODO # * read output callback