Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix command could timeout without setting an exception #556

Merged
merged 17 commits into from
Dec 3, 2024
118 changes: 21 additions & 97 deletions moler/runner_single_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self):
self._stop_loop_runner = threading.Event()
self._stop_loop_runner.clear()
self._tick = 0.001
self._time_to_wait_for_connection_observer_done = 1000 * self._tick
self._in_shutdown = False
self._loop_thread = threading.Thread(
target=self._runner_loop,
Expand Down Expand Up @@ -98,13 +99,8 @@ def wait_for(self, connection_observer, connection_observer_future, timeout=10.0
)
self.logger.debug(f"go foreground: {connection_observer} - {msg}")
connection_observer.life_status.start_time = start_time
self._execute_till_eol(
connection_observer=connection_observer,
max_timeout=max_timeout,
await_timeout=await_timeout,
remain_time=remain_time,
)
connection_observer.set_end_of_life()
connection_observer.life_status.timeout = current_time + await_timeout - start_time
self._wait_for_connection_observer_done(connection_observer=connection_observer, timeout=max_timeout)
return None

def wait_for_iterator(self, connection_observer, connection_observer_future):
Expand Down Expand Up @@ -201,89 +197,33 @@ def _its_remaining_time(cls, prefix, timeout, from_start_time):
msg = f"{prefix} {remain_time:.3f} [sec], already passed {already_passed:.3f} [sec]"
return remain_time, msg

def _execute_till_eol(
self, connection_observer, max_timeout, await_timeout, remain_time
):
"""
Execute till end of life of connection_observer (done or timeout).
:param connection_observer: Instance of ConnectionObserver (command or timeout).
:param max_timeout: max timeout
:param await_timeout: await timeout
:param remain_time: remain time
:return: True if done normally, False if timeout.
"""
eol_remain_time = remain_time
# either we wait forced-max-timeout or we check done-status each 0.1sec tick
if eol_remain_time > 0.0:
# future = connection_observer_future or connection_observer._future
# assert future is not None
if max_timeout:
connection_observer_timeout = max_timeout
check_timeout = False
else:
connection_observer_timeout = await_timeout
check_timeout = True
if connection_observer.done():
return True
was_done = self._wait_till_done(
def _wait_for_connection_observer_done(self, connection_observer, timeout):
while not connection_observer.done() and time.monotonic() < self._get_max_time(connection_observer=connection_observer):
time.sleep(self._tick)
if not connection_observer.done():
self._timeout_observer(
connection_observer=connection_observer,
timeout=connection_observer_timeout,
check_timeout=check_timeout,
)
if was_done:
return True
self._prepare_for_time_out(connection_observer, timeout=await_timeout)
if connection_observer.life_status.terminating_timeout > 0.0:
connection_observer.life_status.in_terminating = True
was_done = self._wait_till_done(
connection_observer=connection_observer,
timeout=connection_observer.life_status.terminating_timeout,
check_timeout=check_timeout,
)
if was_done:
return True

self._wait_for_not_started_connection_observer_is_done(
connection_observer=connection_observer
timeout=timeout,
passed_time=time.monotonic() - connection_observer.life_status.start_time,
runner_logger=self.logger,
kind="await_done"
)
return False

def _wait_till_done(self, connection_observer, timeout, check_timeout):
"""
Wait till connection_obsertver is done.
:param connection_observer: ConnectionObserver (command or event)
:param timeout: timeout
:param check_timeout: True to check timeout from connection_observer
:return: True if done normally, False if timeout.
"""
timeout_add = 0.1
term_timeout = (
0
if connection_observer.life_status.terminating_timeout is None
else connection_observer.life_status.terminating_timeout
)
remain_time = timeout - (time.monotonic() - connection_observer.life_status.start_time) + term_timeout + timeout_add
while remain_time >= 0:
if connection_observer.done():
return True
time.sleep(self._tick)
if check_timeout:
timeout = connection_observer.timeout
term_timeout = (
0
if connection_observer.life_status.terminating_timeout is None
else connection_observer.life_status.terminating_timeout
)
remain_time = timeout - (time.monotonic() - connection_observer.life_status.start_time) + term_timeout + timeout_add
return False
def _get_max_time(self, connection_observer):
start_time = connection_observer.life_status.start_time
max_time = start_time + connection_observer.timeout
if connection_observer.life_status.terminating_timeout is not None:
max_time += connection_observer.life_status.terminating_timeout
max_time += self._time_to_wait_for_connection_observer_done
return max_time

def _wait_for_not_started_connection_observer_is_done(self, connection_observer):
"""
Wait for not started connection observer (command or event) is done.
:param connection_observer: ConnectionObserver (command or event)
:return: None
"""
# Have to wait till connection_observer is done with terminaing timeout.
# Have to wait till connection_observer is done with terminating timeout.
eol_remain_time = connection_observer.life_status.terminating_timeout
start_time = time.monotonic()
while not connection_observer.done() and eol_remain_time > 0.0:
Expand Down Expand Up @@ -363,22 +303,6 @@ def _check_timeout_connection_observers(self):
else:
connection_observer.set_end_of_life()

def _prepare_for_time_out(self, connection_observer, timeout):
"""
Prepare ConnectionObserver (command or event) for timeout.
:param connection_observer: ConnectionObserver.
:param timeout: timeout in seconds.
:return: None
"""
passed = time.monotonic() - connection_observer.life_status.start_time
self._timeout_observer(
connection_observer=connection_observer,
timeout=timeout,
passed_time=passed,
runner_logger=self.logger,
kind="await_done",
)

def _timeout_observer(
self,
connection_observer,
Expand Down Expand Up @@ -462,7 +386,7 @@ def _remove_unnecessary_connection_observers(self):

def _start_command(self, connection_observer):
"""
Srart command if connection_observer is an instance of a command. If an instance of event then do nothing.
Start command if connection_observer is an instance of a command. If an instance of event then do nothing.
:param connection_observer: ConnectionObserver
:return: None
"""
Expand Down
4 changes: 2 additions & 2 deletions test/cmd/at/test_cmd_at_attach.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_calling_at_cmd_attach_timeouts_after_500ms(buffer_connection):
at_cmd_attach()
duration = time.monotonic() - start_time
assert duration > 0.5
assert duration < 0.7
assert duration < 1.0


def test_calling_at_cmd_attach_timeouts_on_no_output(buffer_connection):
Expand All @@ -45,7 +45,7 @@ def test_calling_at_cmd_attach_timeouts_on_no_output(buffer_connection):
at_cmd_attach()
duration = time.monotonic() - start_time
assert duration > 0.5
assert duration < 0.7
assert duration < 1.0


def test_calling_at_cmd_attach_returns_expected_result(buffer_connection):
Expand Down
23 changes: 22 additions & 1 deletion test/cmd/unix/test_cmd_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Testing of cat command.
"""
__author__ = 'Sylwester Golonka, Marcin Usielski'
__copyright__ = 'Copyright (C) 2018-2023, Nokia'
__copyright__ = 'Copyright (C) 2018-2024, Nokia'
__email__ = '[email protected], [email protected]'

from moler.cmd.unix.cat import Cat
Expand Down Expand Up @@ -39,6 +39,27 @@ def test_cat_raise_timeout_exception(buffer_connection, command_output_timeout_e
cat_cmd(timeout=0.2)


def test_cat_raise_minimal_timeout_timeout_exception(buffer_connection, command_output_timeout_exception):
command_output = command_output_timeout_exception
buffer_connection.remote_inject_response([command_output])
timeout = 0.1
while timeout > 0:
cat_cmd = Cat(connection=buffer_connection.moler_connection, path="/home/test/test")
cat_cmd.terminating_timeout = 0
cat_cmd.timeout = timeout
try:
cat_cmd()
except CommandTimeout:
pass # we expect timeout exception
except Exception as ex:
msg = f"Unexpected exception {ex} for timeout={timeout}"
raise ex
else:
msg = f"No exception for {timeout}, ref = {cat_cmd.result()}"
raise Exception(msg)
timeout /= 32.


def test_cat_prompt_in_the_same_line(buffer_connection, command_output_prompt_in_the_same_line):
buffer_connection.remote_inject_response([command_output_prompt_in_the_same_line])
cat_cmd = Cat(connection=buffer_connection.moler_connection, path="/home/test/test", prompt=r"^moler_bash#$")
Expand Down
7 changes: 5 additions & 2 deletions test/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ def test_thread_test_job():
assert (3 == values['number'])


@pytest.mark.skipif(sys.version_info < (3, 4), reason="requires python3.4 or higher")
@pytest.mark.skipif(sys.version_info < (3, 9), reason="Apscheduler")
def test_asyncio_test_job():
loop = asyncio.get_event_loop()
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return # No working asyncio
Scheduler.change_kind("asyncio")
values = {'number': 0}
job = Scheduler.get_job(callback=callback, interval=0.1, callback_params={'param_dict': values})
Expand Down
Loading