Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.5.0 #56

Merged
merged 2 commits into from
Aug 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
* Changed keep synced behaviour to override keep linked.
* Fixed unit tests.
* Added default retry strategy.
DobromirM committed Aug 2, 2024
commit 874bd8f1a72e34fa51e723dcfd550f30d058fe24
30 changes: 16 additions & 14 deletions swimos/client/_connections.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@
# limitations under the License.

import asyncio
from abc import ABC, abstractmethod
import websockets

from enum import Enum
@@ -27,20 +26,15 @@
from ._downlinks._downlinks import _DownlinkView


class RetryStrategy(ABC):
@abstractmethod
class RetryStrategy:
async def retry(self) -> bool:
"""
Wait for a period of time that is defined by the retry strategy.
"""
raise NotImplementedError
return False

@abstractmethod
def reset(self):
"""
Reset the retry strategy to its original state.
"""
raise NotImplementedError
pass


class IntervalStrategy(RetryStrategy):
@@ -52,7 +46,7 @@ def __init__(self, retries_limit=None, delay=3) -> None:
self.retries = 0

async def retry(self) -> bool:
if self.retries_limit is None or self.retries_limit >= self.retries:
if self.retries_limit is None or self.retries_limit > self.retries:
await asyncio.sleep(self.delay)
self.retries += 1
return True
@@ -85,7 +79,7 @@ def reset(self):

class _ConnectionPool:

def __init__(self, retry_strategy: RetryStrategy = None) -> None:
def __init__(self, retry_strategy: RetryStrategy = RetryStrategy()) -> None:
self.__connections = dict()
self.retry_strategy = retry_strategy

@@ -161,7 +155,7 @@ async def _remove_downlink_view(self, downlink_view: '_DownlinkView') -> None:
class _WSConnection:

def __init__(self, host_uri: str, scheme: str, keep_linked, keep_synced,
retry_strategy: RetryStrategy = None) -> None:
retry_strategy: RetryStrategy = RetryStrategy()) -> None:
self.host_uri = host_uri
self.scheme = scheme
self.retry_strategy = retry_strategy
@@ -191,7 +185,7 @@ async def _open(self) -> None:
self.retry_strategy.reset()
self.status = _ConnectionStatus.IDLE
except Exception as error:
if self.keep_linked and await self.retry_strategy.retry():
if self.should_reconnect() and await self.retry_strategy.retry():
exception_warn(error)
continue
else:
@@ -209,6 +203,14 @@ async def _close(self) -> None:
await self.websocket.close()
self.connected.clear()

def should_reconnect(self) -> bool:
"""
Return a boolean flag indicating whether the connection should try to reconnect on failure.

:return: - True if the connection should try to reconnect on failure.
"""
return self.keep_linked or self.keep_synced

def _set_init_message(self, message: str) -> None:
"""
Set the initial message that gets sent when the underlying downlink is established.
@@ -285,7 +287,7 @@ async def _wait_for_messages(self) -> None:
except ConnectionClosed as error:
exception_warn(error)
await self._close()
if self.keep_linked and await self.retry_strategy.retry():
if self.should_reconnect() and await self.retry_strategy.retry():
await self._open()
await self._send_init_message()
continue
6 changes: 3 additions & 3 deletions swimos/client/_downlinks/_downlinks.py
Original file line number Diff line number Diff line change
@@ -188,11 +188,11 @@ def registered_classes(self) -> dict:

def open(self) -> '_DownlinkView':
if self._host_uri is None:
raise Exception(f'Downlink cannot be opened without first setting the host URI!')
raise Exception('Downlink cannot be opened without first setting the host URI!')
if self._node_uri is None:
raise Exception(f'Downlink cannot be opened without first setting the node URI!')
raise Exception('Downlink cannot be opened without first setting the node URI!')
if self._lane_uri is None:
raise Exception(f'Downlink cannot be opened without first setting the lane URI!')
raise Exception('Downlink cannot be opened without first setting the lane URI!')

if not self._is_open:
task = self._client._schedule_task(self._client._add_downlink_view, self)
6 changes: 4 additions & 2 deletions test/client/downlinks/test_downlink_utils.py
Original file line number Diff line number Diff line change
@@ -93,7 +93,8 @@ def test_after_open_invalid_with_args(self, mock_warn):
# When
downlink_view.get(False)
# Then
self.assertEqual('Cannot execute "get" before the downlink has been opened!', mock_warn.call_args_list[0][0][0])
self.assertEqual('Cannot execute "get" before the downlink has been opened or after it has closed!',
mock_warn.call_args_list[0][0][0])

def test_after_open_valid_with_kwargs(self):
# Given
@@ -115,7 +116,8 @@ def test_after_open_invalid_with_kwargs(self, mock_warn):
# When
downlink_view.get(wait_sync=False)
# Then
self.assertEqual('Cannot execute "get" before the downlink has been opened!', mock_warn.call_args_list[0][0][0])
self.assertEqual('Cannot execute "get" before the downlink has been opened or after it has closed!',
mock_warn.call_args_list[0][0][0])

def test_map_request_get_key_item_primitive(self):
# Given
17 changes: 11 additions & 6 deletions test/client/downlinks/test_downlinks.py
Original file line number Diff line number Diff line change
@@ -1140,6 +1140,7 @@ async def test_value_downlink_model_establish_downlink(self):
downlink_model = _ValueDownlinkModel(client)
downlink_model.node_uri = 'foo'
downlink_model.lane_uri = 'bar'
downlink_model.keep_synced = True
downlink_model.connection = MockConnection()

# When
@@ -1448,7 +1449,8 @@ async def test_value_downlink_view_get_before_open(self, mock_warn):
downlink_view.get()

# Then
self.assertEqual('Cannot execute "get" before the downlink has been opened!', mock_warn.call_args_list[0][0][0])
self.assertEqual('Cannot execute "get" before the downlink has been opened or after it has closed!',
mock_warn.call_args_list[0][0][0])

@patch('concurrent.futures._base.Future.result')
async def test_value_downlink_view_set_blocking(self, mock_result):
@@ -1511,7 +1513,8 @@ async def test_value_downlink_view_set_before_open(self, mock_warn):
downlink_view.set('value')

# Then
self.assertEqual('Cannot execute "set" before the downlink has been opened!', mock_warn.call_args_list[0][0][0])
self.assertEqual('Cannot execute "set" before the downlink has been opened or after it has closed!',
mock_warn.call_args_list[0][0][0])

async def test_value_downlink_view_execute_did_set(self):
# Given
@@ -1580,6 +1583,7 @@ async def test_map_downlink_model_establish_downlink(self):
downlink_model = _MapDownlinkModel(client)
downlink_model.node_uri = 'dog'
downlink_model.lane_uri = 'bark'
downlink_model.keep_synced = True
downlink_model.connection = MockConnection()

# When
@@ -2060,7 +2064,8 @@ async def test_map_downlink_view_get_before_open(self, mock_warn):
downlink_view.get('a')

# Then
self.assertEqual('Cannot execute "get" before the downlink has been opened!', mock_warn.call_args_list[0][0][0])
self.assertEqual('Cannot execute "get" before the downlink has been opened or after it has closed!',
mock_warn.call_args_list[0][0][0])

async def test_map_downlink_view_get_all_immediate(self):
# Given
@@ -2147,7 +2152,7 @@ async def test_map_downlink_view_get_all_before_open(self, mock_warn):
downlink_view.get_all()

# Then
self.assertEqual('Cannot execute "get_all" before the downlink has been opened!',
self.assertEqual('Cannot execute "get_all" before the downlink has been opened or after it has closed!',
mock_warn.call_args_list[0][0][0])

@patch('concurrent.futures._base.Future.result')
@@ -2207,7 +2212,7 @@ async def test_map_downlink_view_put_before_open(self, mock_warn):
downlink_view.put('key_map', 'value_map')

# Then
self.assertEqual('Cannot execute "put" before the downlink has been opened!',
self.assertEqual('Cannot execute "put" before the downlink has been opened or after it has closed!',
mock_warn.call_args_list[0][0][0])

@patch('concurrent.futures._base.Future.result')
@@ -2267,7 +2272,7 @@ async def test_map_downlink_view_remove_before_open(self, mock_warn):
downlink_view.remove('key_map_remove')

# Then
self.assertEqual('Cannot execute "remove" before the downlink has been opened!',
self.assertEqual('Cannot execute "remove" before the downlink has been opened or after it has closed!',
mock_warn.call_args_list[0][0][0])

async def test_map_downlink_view_execute_did_update(self):
Loading