Skip to content

Commit

Permalink
Merge pull request #769 from valory-xyz/fix/acn-restart
Browse files Browse the repository at this point in the history
fix: `p2p_libp2p_client` connection drop handling
  • Loading branch information
Adamantios authored Nov 15, 2024
2 parents ae889bf + d79bac7 commit aded5cd
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 24 deletions.
2 changes: 1 addition & 1 deletion docs/development-setup.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


An AEA <a href="../package-imports">consists of packages </a>. When developing, it helps to be able to save packages in a local package registry, rather than pushing them to <a href="https://aea-registry.fetch.ai" target="_blank">remote registry</a>. This guide helps you set up a local package registry and configure the working directory for development.
An AEA <a href="../package-imports">consists of packages </a>. When developing, it helps to be able to save packages in a local package registry, rather than pushing them to remote registry. This guide helps you set up a local package registry and configure the working directory for development.

There are two ways to write code for an AEA:

Expand Down
4 changes: 2 additions & 2 deletions docs/package_list.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
| connection/valory/ledger/0.19.0 | `bafybeigntoericenpzvwejqfuc3kqzo2pscs76qoygg5dbj6f4zxusru5e` |
| connection/valory/http_server/0.22.0 | `bafybeihpgu56ovmq4npazdbh6y6ru5i7zuv6wvdglpxavsckyih56smu7m` |
| connection/valory/p2p_libp2p/0.1.0 | `bafybeic2u7azbwjny2nhaltqnbohlvysx3x6ectzbege7sxwrbzcz4lcma` |
| connection/valory/p2p_libp2p_client/0.1.0 | `bafybeid3xg5k2ol5adflqloy75ibgljmol6xsvzvezebsg7oudxeeolz7e` |
| connection/valory/p2p_libp2p_client/0.1.0 | `bafybeihs5zlwa5wlozct3rjlxsirm3ve3e4buse5nfehiky6ymnnfrobne` |
| connection/valory/p2p_libp2p_mailbox/0.1.0 | `bafybeiecclc65ogngs3piaxpwhiyl77mlpqun5ejlyv4kamwzrrh746guq` |
| connection/fetchai/local/0.20.0 | `bafybeiema4rnxi54luhzbrccb27pfrwlohemka45eqf4nidgmtkwwmxeyi` |
| connection/valory/http_client/0.23.0 | `bafybeihi772xgzpqeipp3fhmvpct4y6e6tpjp4sogwqrnf3wqspgeilg4u` |
| connection/valory/test_libp2p/0.1.0 | `bafybeid4uexpzjgb3m6npbekohqayn2oukf3fershneha2dptmwtkayxza` |
| connection/valory/test_libp2p/0.1.0 | `bafybeiasj7ppsmn24mes2yw3o72nlmcircscalpl3j5ujzxstknlassa3q` |
| skill/fetchai/echo/0.19.0 | `bafybeicoawiackcbgqo3na3e56tpdc62atag4yxknur77py37caqq4mmya` |
| skill/fetchai/error_test_skill/0.1.0 | `bafybeihsbtlpe7h6fsvoxban5rilkmwviwkokul5cqym6atoolirontiyu` |
| skill/fetchai/gym/0.20.0 | `bafybeie7y2fsxfuhsqxqcaluo5exskmrm5q3a6e2hfcskcuvzvxjjhijh4` |
Expand Down
2 changes: 1 addition & 1 deletion docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ This copies the <code>fetchai/echo:0.19.0</code> skill code containing the "beha

Just like humans, AEAs can have _skills_ to achieve their tasks. As an agent developer, you can create skills to add to your own AEAs. You can also choose to publish your skills so others add them to their AEAs. More details on skills can be found on <a href="../skill/"> this page </a>.

The above agent has an <a href="https://aea-registry.fetch.ai/details/skill/fetchai/echo/latest" target="_blank">echo skill</a>, fetched from <a href="https://aea-registry.fetch.ai" target="_blank">the registry</a>, which simply echoes any messages it receives back to its sender.
The above agent has an echo skill, fetched from the registry, which simply echoes any messages it receives back to its sender.

### Communication via envelopes and messages

Expand Down
4 changes: 2 additions & 2 deletions packages/packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
"connection/valory/ledger/0.19.0": "bafybeigntoericenpzvwejqfuc3kqzo2pscs76qoygg5dbj6f4zxusru5e",
"connection/valory/http_server/0.22.0": "bafybeihpgu56ovmq4npazdbh6y6ru5i7zuv6wvdglpxavsckyih56smu7m",
"connection/valory/p2p_libp2p/0.1.0": "bafybeic2u7azbwjny2nhaltqnbohlvysx3x6ectzbege7sxwrbzcz4lcma",
"connection/valory/p2p_libp2p_client/0.1.0": "bafybeid3xg5k2ol5adflqloy75ibgljmol6xsvzvezebsg7oudxeeolz7e",
"connection/valory/p2p_libp2p_client/0.1.0": "bafybeihs5zlwa5wlozct3rjlxsirm3ve3e4buse5nfehiky6ymnnfrobne",
"connection/valory/p2p_libp2p_mailbox/0.1.0": "bafybeiecclc65ogngs3piaxpwhiyl77mlpqun5ejlyv4kamwzrrh746guq",
"connection/fetchai/local/0.20.0": "bafybeiema4rnxi54luhzbrccb27pfrwlohemka45eqf4nidgmtkwwmxeyi",
"connection/valory/http_client/0.23.0": "bafybeihi772xgzpqeipp3fhmvpct4y6e6tpjp4sogwqrnf3wqspgeilg4u",
"connection/valory/test_libp2p/0.1.0": "bafybeid4uexpzjgb3m6npbekohqayn2oukf3fershneha2dptmwtkayxza",
"connection/valory/test_libp2p/0.1.0": "bafybeiasj7ppsmn24mes2yw3o72nlmcircscalpl3j5ujzxstknlassa3q",
"skill/fetchai/echo/0.19.0": "bafybeicoawiackcbgqo3na3e56tpdc62atag4yxknur77py37caqq4mmya",
"skill/fetchai/error_test_skill/0.1.0": "bafybeihsbtlpe7h6fsvoxban5rilkmwviwkokul5cqym6atoolirontiyu",
"skill/fetchai/gym/0.20.0": "bafybeie7y2fsxfuhsqxqcaluo5exskmrm5q3a6e2hfcskcuvzvxjjhijh4",
Expand Down
27 changes: 21 additions & 6 deletions packages/valory/connections/p2p_libp2p_client/connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2022-2023 Valory AG
# Copyright 2022-2024 Valory AG
# Copyright 2018-2019 Fetch.AI Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -279,6 +279,7 @@ class P2PLibp2pClientConnection(Connection):
connection_id = PUBLIC_ID

DEFAULT_CONNECT_RETRIES = 3
DEFAULT_RESEND_ENVELOPE_RETRY = 1
DEFAULT_TLS_CONNECTION_SIGNATURE_TIMEOUT = 5.0

def __init__(self, **kwargs: Any) -> None:
Expand All @@ -292,6 +293,9 @@ def __init__(self, **kwargs: Any) -> None:
self.connect_retries = self.configuration.config.get(
"connect_retries", self.DEFAULT_CONNECT_RETRIES
)
self.resend_envelope_retry = self.configuration.config.get(
"resend_envelope_retry", self.DEFAULT_RESEND_ENVELOPE_RETRY
)
ledger_id = self.configuration.config.get("ledger_id", DEFAULT_LEDGER)
if ledger_id not in SUPPORTED_LEDGER_IDS:
raise ValueError( # pragma: nocover
Expand Down Expand Up @@ -386,11 +390,17 @@ async def _send_loop(self) -> None:
)
await asyncio.shield(self.disconnect())

async def _send_envelope_with_node_client(self, envelope: Envelope) -> None:
async def _send_envelope_with_node_client(
self, envelope: Envelope, retry_counter: int = 0
) -> None:
"""Send envelope with node client, reconnect and retry on fail."""
if not self._node_client: # pragma: nocover
raise ValueError("Connection not connected to node!")

if retry_counter > self.resend_envelope_retry:
self.logger.warning(
f"Dropping envelope {envelope}. It failed after retry. "
)
return
self._ensure_valid_envelope_for_external_comms(envelope)
try:
await self._node_client.send_envelope(envelope)
Expand All @@ -399,7 +409,7 @@ async def _send_envelope_with_node_client(self, envelope: Envelope) -> None:
"Exception raised on message send. Try reconnect and send again."
)
await self._perform_connection_to_node()
await self._node_client.send_envelope(envelope)
await self._send_envelope_with_node_client(envelope, retry_counter + 1)

async def connect(self) -> None:
"""Set up the connection."""
Expand Down Expand Up @@ -584,9 +594,14 @@ async def _process_messages(self) -> None:
envelope = await self._read_envelope_from_node()
if self._in_queue is None:
raise ValueError("Input queue not initialized.") # pragma: nocover
self._in_queue.put_nowait(envelope)
self.logger.debug(f"Received envelope: {envelope}")
if envelope is None:
break # pragma: no cover
# give it time to recover
# twice the amount what we wait for ACK timeouts
timeout = NodeClient.ACN_ACK_TIMEOUT * 2
await asyncio.sleep(timeout)
continue # pragma: no cover
self._in_queue.put_nowait(envelope)


class TCPSocketChannelClientTLS(TCPSocketChannelClient):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ aea_version: '>=1.0.0, <2.0.0'
fingerprint:
README.md: bafybeiaf5kdnfdc2jifojgniib76zl2c4utnx7ofewc3szqkrxsby62ulu
__init__.py: bafybeid2azroxglu6fl7bxdfcsv3j77vyzgpikjnfwpxg73zeb5orez6ju
connection.py: bafybeiauso3ectgdvbhkt5j6wstfev5mnxiuj6hrkczyqgxurnvva3loqm
connection.py: bafybeihcwyo5nhxmsjxw7qrrvkvp7qu6izbgzgipmmnrpdvc6jtwmw7vgy
fingerprint_ignore_patterns: []
connections: []
protocols:
- valory/acn:1.1.0:bafybeidluaoeakae3exseupaea4i3yvvk5vivyt227xshjlffywwxzcxqe
class_name: P2PLibp2pClientConnection
config:
connect_retries: 3
resend_envelope_retry: 1
ledger_id: cosmos
nodes:
- uri: acn.staging.autonolas.tech:9005
Expand Down
6 changes: 3 additions & 3 deletions packages/valory/connections/test_libp2p/connection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ fingerprint:
tests/test_p2p_libp2p_client/__init__.py: bafybeihjzl7ireo5rbnxcdbghbncykgcgcbh26m4mjjofsseeflauuf6sy
tests/test_p2p_libp2p_client/test_aea_cli.py: bafybeie5yaezltj4xydnqghltt7f2t4vicdts5rm6bz4fdpt4kkkd42cyu
tests/test_p2p_libp2p_client/test_communication.py: bafybeiceec5zsd2u37t56oiajijgwx7frpo3wzu4qdfd3lcq3lcldyqszq
tests/test_p2p_libp2p_client/test_errors.py: bafybeiakajzr6jtecwnlzcrrx7paydz3obmleqzv7am5xbadu6wjtzabmm
tests/test_p2p_libp2p_client/test_errors.py: bafybeihopjaixftexdru35rxf75tr45itr6ygoao2ajxax3jvq6mcopw6m
tests/test_p2p_libp2p_mailbox/__init__.py: bafybeiad64wftnugaahubhqc6bcqzg7om4435dzojdv7oeq4zdmn7kxzui
tests/test_p2p_libp2p_mailbox/test_aea_cli.py: bafybeievjaiacpvcpabemtrmmeejbf4cbkrqzhu2ekxzyfyeg42ityxn5q
tests/test_p2p_libp2p_mailbox/test_communication.py: bafybeiaiwlqxuzp34nu33s5pz6sudvwmdi3h2ckfa4mmwyvd6wqn2elo4m
tests/test_p2p_libp2p_mailbox/test_errors.py: bafybeiampgm6dxpo4tqiyhyhpwr5d3kdsq5x5vwl323oauhdzmsnprf4bi
tests/test_p2p_libp2p_mailbox/test_errors.py: bafybeia7lhpakt6tv3jaskm7edawleyw4oxm3qcife7n3zlvr56zeo5x4m
tests/test_p2p_libp2p_mailbox/test_mailbox_service.py: bafybeibp3bkwkrw57qahvuysjdlumywtlk3te5gsvusgrvhsc75k7rrk4u
fingerprint_ignore_patterns: []
connections:
- valory/p2p_libp2p:0.1.0:bafybeic2u7azbwjny2nhaltqnbohlvysx3x6ectzbege7sxwrbzcz4lcma
- valory/p2p_libp2p_client:0.1.0:bafybeid3xg5k2ol5adflqloy75ibgljmol6xsvzvezebsg7oudxeeolz7e
- valory/p2p_libp2p_client:0.1.0:bafybeihs5zlwa5wlozct3rjlxsirm3ve3e4buse5nfehiky6ymnnfrobne
- valory/p2p_libp2p_mailbox:0.1.0:bafybeiecclc65ogngs3piaxpwhiyl77mlpqun5ejlyv4kamwzrrh746guq
protocols:
- fetchai/default:1.0.0:bafybeihdvtmnz7fzy7kwi3wlo6rfl27f6q3g5entplgvq7y23i3v5uoz24
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2022 Valory AG
# Copyright 2022-2024 Valory AG
# Copyright 2018-2019 Fetch.AI Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -103,16 +103,13 @@ async def test_reconnect_on_send_fail(self):
"""Test reconnect on send fails."""

self.connection._node_client = Mock()
f = Future()
f.set_exception(Exception("oops"))
self.connection._node_client.send_envelope.side_effect = Exception("oops")
with patch.object(
self.connection, "_perform_connection_to_node", return_value=DONE_FUTURE
) as connect_mock, patch.object(
self.connection, "_ensure_valid_envelope_for_external_comms"
):
with pytest.raises(Exception, match="oops"):
await self.connection._send_envelope_with_node_client(Mock())
await self.connection._send_envelope_with_node_client(Mock())
connect_mock.assert_called()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2022 Valory AG
# Copyright 2022-2024 Valory AG
# Copyright 2018-2019 Fetch.AI Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -19,8 +19,7 @@
# ------------------------------------------------------------------------------

"""This test module contains negative tests for Libp2p tcp client connection."""

# pylint: skip-file
from unittest.mock import Mock, patch

import pytest

Expand All @@ -31,6 +30,9 @@
BaseP2PLibp2pTest,
_make_libp2p_mailbox_connection,
)
from packages.valory.connections.test_libp2p.tests.test_p2p_libp2p_client.test_errors import (
DONE_FUTURE,
)
from packages.valory.connections.test_libp2p.tests.test_p2p_libp2p_client.test_errors import (
TestLibp2pClientConnectionFailureConnectionSetup as BaseFailureConnectionSetup,
)
Expand All @@ -39,6 +41,9 @@
)


# pylint: skip-file


@pytest.mark.asyncio
class TestLibp2pMailboxConnectionFailureNodeNotConnected(BaseFailureNodeNotConnected):
"""Test that connection fails when node not running"""
Expand All @@ -48,6 +53,21 @@ class TestLibp2pMailboxConnectionFailureNodeNotConnected(BaseFailureNodeNotConne
# overwrite, no mailbox equivalent of P2PLibp2pClientConnection (TCPSocketChannelClient)
test_connect_attempts = None

@pytest.mark.asyncio
async def test_reconnect_on_send_fail(self):
"""Test reconnect on send fails."""

self.connection._node_client = Mock()
self.connection._node_client.send_envelope.side_effect = Exception("oops")
with patch.object(
self.connection, "_perform_connection_to_node", return_value=DONE_FUTURE
) as connect_mock, patch.object(
self.connection, "_ensure_valid_envelope_for_external_comms"
):
with pytest.raises(Exception, match="oops"):
await self.connection._send_envelope_with_node_client(Mock())
connect_mock.assert_called()


class TestLibp2pMailboxConnectionFailureConnectionSetup(BaseFailureConnectionSetup):
"""Test that connection fails when setup incorrectly"""
Expand Down

0 comments on commit aded5cd

Please sign in to comment.