diff --git a/chia/_tests/core/server/test_rate_limits.py b/chia/_tests/core/server/test_rate_limits.py index 8dc646f8074f..456c52f31144 100644 --- a/chia/_tests/core/server/test_rate_limits.py +++ b/chia/_tests/core/server/test_rate_limits.py @@ -5,6 +5,8 @@ import pytest from chia._tests.conftest import node_with_params +from chia._tests.util.time_out_assert import time_out_assert +from chia.protocols.full_node_protocol import RejectBlock, RejectBlocks, RespondBlock, RespondBlocks from chia.protocols.protocol_message_types import ProtocolMessageTypes from chia.protocols.shared_protocol import Capability from chia.server.outbound_message import make_msg @@ -13,7 +15,9 @@ from chia.server.rate_limits import RateLimiter from chia.server.server import ChiaServer from chia.server.ws_connection import WSChiaConnection +from chia.simulator.block_tools import BlockTools from chia.types.peer_info import PeerInfo +from chia.util.ints import uint32 rl_v2 = [Capability.BASE, Capability.BLOCK_HEADERS, Capability.RATE_LIMITS_V2] rl_v1 = [Capability.BASE] @@ -89,7 +93,7 @@ async def test_too_much_data(self): assert saw_disconnect r = RateLimiter(incoming=True) - block_message = make_msg(ProtocolMessageTypes.respond_block, bytes([1] * 1024 * 1024)) + block_message = make_msg(ProtocolMessageTypes.respond_unfinished_block, bytes([1] * 1024 * 1024)) for i in range(10): assert r.process_msg_and_check(block_message, rl_v2, rl_v2) is None @@ -124,7 +128,7 @@ async def test_non_tx_aggregate_limits(self): # Size limits r = RateLimiter(incoming=True) message_4 = make_msg(ProtocolMessageTypes.respond_proof_of_weight, bytes([1] * 49 * 1024 * 1024)) - message_5 = make_msg(ProtocolMessageTypes.respond_blocks, bytes([1] * 49 * 1024 * 1024)) + message_5 = make_msg(ProtocolMessageTypes.request_blocks, bytes([1] * 49 * 1024 * 1024)) for i in range(2): assert r.process_msg_and_check(message_4, rl_v2, rl_v2) is None @@ -183,7 +187,7 @@ async def test_percentage_limits(self): assert saw_disconnect r = RateLimiter(True, 60, 40) - block_message = make_msg(ProtocolMessageTypes.respond_block, bytes([1] * 1024 * 1024)) + block_message = make_msg(ProtocolMessageTypes.respond_unfinished_block, bytes([1] * 1024 * 1024)) for i in range(5): assert r.process_msg_and_check(block_message, rl_v2, rl_v2) is None @@ -215,7 +219,7 @@ async def test_percentage_limits(self): # Aggregate percentage limit max total size r = RateLimiter(True, 60, 40) message_4 = make_msg(ProtocolMessageTypes.respond_proof_of_weight, bytes([1] * 18 * 1024 * 1024)) - message_5 = make_msg(ProtocolMessageTypes.respond_blocks, bytes([1] * 24 * 1024 * 1024)) + message_5 = make_msg(ProtocolMessageTypes.respond_unfinished_block, bytes([1] * 24 * 1024 * 1024)) for i in range(2): assert r.process_msg_and_check(message_4, rl_v2, rl_v2) is None @@ -367,3 +371,109 @@ async def test_compose(self): # Otherwise, fall back to v1 assert ProtocolMessageTypes.request_block in rl_1["rate_limits_other"] assert ProtocolMessageTypes.request_block not in rl_1["rate_limits_tx"] + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "msg_type", + [ + ProtocolMessageTypes.respond_blocks, + ProtocolMessageTypes.reject_blocks, + ProtocolMessageTypes.respond_block, + ProtocolMessageTypes.reject_block, + ], +) +async def test_unlimited(msg_type: ProtocolMessageTypes): + r = RateLimiter(incoming=False) + + message = make_msg(msg_type, bytes([1] * 10 * 1024 * 1024)) + + for i in range(1000): + # since this is a backwards compatible change, it also affects V1 + assert r.process_msg_and_check(message, rl_v1, rl_v1) is None + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "msg_type", + [ + ProtocolMessageTypes.respond_blocks, + ProtocolMessageTypes.reject_blocks, + ProtocolMessageTypes.respond_block, + ProtocolMessageTypes.reject_block, + ], +) +@pytest.mark.parametrize( + "node_with_params", + [ + pytest.param( + dict( + disable_capabilities=[Capability.BLOCK_HEADERS, Capability.RATE_LIMITS_V2], + ), + id="V1", + ), + pytest.param( + dict( + disable_capabilities=[], + ), + id="V2", + ), + ], + indirect=True, +) +@pytest.mark.parametrize( + "node_with_params_b", + [ + pytest.param( + dict( + disable_capabilities=[Capability.BLOCK_HEADERS, Capability.RATE_LIMITS_V2], + ), + id="V1", + ), + pytest.param( + dict( + disable_capabilities=[], + ), + id="V2", + ), + ], + indirect=True, +) +async def test_unsolicited_responses( + node_with_params, node_with_params_b, self_hostname: str, msg_type: ProtocolMessageTypes, bt: BlockTools +): + node_a = node_with_params + node_b = node_with_params_b + + msg = { + ProtocolMessageTypes.respond_blocks: make_msg( + ProtocolMessageTypes.respond_blocks, bytes(RespondBlocks(uint32(1), uint32(2), [])) + ), + ProtocolMessageTypes.reject_blocks: make_msg( + ProtocolMessageTypes.reject_blocks, bytes(RejectBlocks(uint32(1), uint32(2))) + ), + ProtocolMessageTypes.respond_block: make_msg( + ProtocolMessageTypes.respond_block, bytes(RespondBlock(bt.get_consecutive_blocks(1)[0])) + ), + ProtocolMessageTypes.reject_block: make_msg(ProtocolMessageTypes.reject_block, bytes(RejectBlock(uint32(0)))), + }[msg_type] + + full_node_server_a: ChiaServer = node_a.full_node.server + full_node_server_b: ChiaServer = node_b.full_node.server + + await full_node_server_b.start_client(PeerInfo(self_hostname, full_node_server_a.get_port()), None) + + assert len(full_node_server_b.get_connections()) == 1 + assert len(full_node_server_a.get_connections()) == 1 + + a_con: WSChiaConnection = full_node_server_a.get_connections()[0] + b_con: WSChiaConnection = full_node_server_b.get_connections()[0] + + assert not a_con.closed + assert not b_con.closed + + await a_con.send_message(msg) + + # make sure the connection is closed because of the unsolicited response + # message + await time_out_assert(5, lambda: a_con.closed) diff --git a/chia/full_node/full_node_api.py b/chia/full_node/full_node_api.py index 6bf09d867d1d..aaa50715946c 100644 --- a/chia/full_node/full_node_api.py +++ b/chia/full_node/full_node_api.py @@ -34,6 +34,7 @@ from chia.protocols import farmer_protocol, full_node_protocol, introducer_protocol, timelord_protocol, wallet_protocol from chia.protocols.full_node_protocol import RejectBlock, RejectBlocks from chia.protocols.protocol_message_types import ProtocolMessageTypes +from chia.protocols.protocol_timing import RATE_LIMITER_BAN_SECONDS from chia.protocols.shared_protocol import Capability from chia.protocols.wallet_protocol import ( CoinState, @@ -402,17 +403,32 @@ async def request_blocks(self, request: full_node_protocol.RequestBlocks) -> Opt return msg - @metadata.request() - async def reject_block(self, request: full_node_protocol.RejectBlock) -> None: - self.log.debug(f"reject_block {request.height}") + @metadata.request(peer_required=True) + async def reject_block( + self, + request: full_node_protocol.RejectBlock, + peer: WSChiaConnection, + ) -> None: + self.log.warning(f"unsolicited reject_block {request.height}") + await peer.close(RATE_LIMITER_BAN_SECONDS) - @metadata.request() - async def reject_blocks(self, request: full_node_protocol.RejectBlocks) -> None: - self.log.debug(f"reject_blocks {request.start_height} {request.end_height}") + @metadata.request(peer_required=True) + async def reject_blocks( + self, + request: full_node_protocol.RejectBlocks, + peer: WSChiaConnection, + ) -> None: + self.log.warning(f"reject_blocks {request.start_height} {request.end_height}") + await peer.close(RATE_LIMITER_BAN_SECONDS) - @metadata.request() - async def respond_blocks(self, request: full_node_protocol.RespondBlocks) -> None: + @metadata.request(peer_required=True) + async def respond_blocks( + self, + request: full_node_protocol.RespondBlocks, + peer: WSChiaConnection, + ) -> None: self.log.warning("Received unsolicited/late blocks") + await peer.close(RATE_LIMITER_BAN_SECONDS) @metadata.request(peer_required=True) async def respond_block( @@ -420,11 +436,8 @@ async def respond_block( respond_block: full_node_protocol.RespondBlock, peer: WSChiaConnection, ) -> Optional[Message]: - """ - Receive a full block from a peer full node (or ourselves). - """ - self.log.warning(f"Received unsolicited/late block from peer {peer.get_peer_logging()}") + await peer.close(RATE_LIMITER_BAN_SECONDS) return None @metadata.request() diff --git a/chia/protocols/protocol_timing.py b/chia/protocols/protocol_timing.py index 215d4e2a1859..1015da5ba0a2 100644 --- a/chia/protocols/protocol_timing.py +++ b/chia/protocols/protocol_timing.py @@ -5,3 +5,4 @@ API_EXCEPTION_BAN_SECONDS = 10 INTERNAL_PROTOCOL_ERROR_BAN_SECONDS = 10 # Don't flap if our client is at fault CONSENSUS_ERROR_BAN_SECONDS = 600 +RATE_LIMITER_BAN_SECONDS = 300 diff --git a/chia/server/rate_limit_numbers.py b/chia/server/rate_limit_numbers.py index 521cf73ac134..20ab40deb19a 100644 --- a/chia/server/rate_limit_numbers.py +++ b/chia/server/rate_limit_numbers.py @@ -94,11 +94,11 @@ def compose_rate_limits(old_rate_limits: dict[str, Any], new_rate_limits: dict[s ProtocolMessageTypes.request_proof_of_weight: RLSettings(5, 100), ProtocolMessageTypes.respond_proof_of_weight: RLSettings(5, 50 * 1024 * 1024, 100 * 1024 * 1024), ProtocolMessageTypes.request_block: RLSettings(200, 100), - ProtocolMessageTypes.reject_block: RLSettings(200, 100), + ProtocolMessageTypes.reject_block: None, ProtocolMessageTypes.request_blocks: RLSettings(500, 100), - ProtocolMessageTypes.respond_blocks: RLSettings(100, 50 * 1024 * 1024, 5 * 50 * 1024 * 1024), - ProtocolMessageTypes.reject_blocks: RLSettings(100, 100), - ProtocolMessageTypes.respond_block: RLSettings(200, 2 * 1024 * 1024, 10 * 2 * 1024 * 1024), + ProtocolMessageTypes.respond_blocks: None, + ProtocolMessageTypes.reject_blocks: None, + ProtocolMessageTypes.respond_block: None, ProtocolMessageTypes.new_unfinished_block: RLSettings(200, 100), ProtocolMessageTypes.request_unfinished_block: RLSettings(200, 100), ProtocolMessageTypes.new_unfinished_block2: RLSettings(200, 100), diff --git a/chia/server/rate_limits.py b/chia/server/rate_limits.py index ab39b0d34afb..0c650baf7304 100644 --- a/chia/server/rate_limits.py +++ b/chia/server/rate_limits.py @@ -77,8 +77,20 @@ def process_msg_and_check( limits: RLSettings = rate_limits["default_settings"] if message_type in rate_limits["rate_limits_tx"]: limits = rate_limits["rate_limits_tx"][message_type] + if limits is None: + # this message type is not rate limited. This is used for + # response messages and must be combined with banning peers + # sending unsolicited responses of this type + ret = True + return None elif message_type in rate_limits["rate_limits_other"]: limits = rate_limits["rate_limits_other"][message_type] + if limits is None: + # this message type is not rate limited. This is used for + # response messages and must be combined with banning peers + # sending unsolicited responses of this type + ret = True + return None non_tx_freq = rate_limits["non_tx_freq"] non_tx_max_total_size = rate_limits["non_tx_max_total_size"] new_non_tx_count = self.non_tx_message_counts + 1 diff --git a/chia/server/ws_connection.py b/chia/server/ws_connection.py index 093c5efa7cfb..5528c1eba178 100644 --- a/chia/server/ws_connection.py +++ b/chia/server/ws_connection.py @@ -22,6 +22,7 @@ API_EXCEPTION_BAN_SECONDS, CONSENSUS_ERROR_BAN_SECONDS, INTERNAL_PROTOCOL_ERROR_BAN_SECONDS, + RATE_LIMITER_BAN_SECONDS, ) from chia.protocols.shared_protocol import Capability, Error, Handshake, protocol_version from chia.server.api_protocol import ApiMetadata, ApiProtocol @@ -713,7 +714,7 @@ async def _read_one_message(self) -> Optional[Message]: self.log.error(f"Peer has been rate limited and will be disconnected: {details}") # Only full node disconnects peers, to prevent abuse and crashing timelords, farmers, etc # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close(300)) # noqa: RUF006 + asyncio.create_task(self.close(RATE_LIMITER_BAN_SECONDS)) # noqa: RUF006 await asyncio.sleep(3) return None else: @@ -727,7 +728,7 @@ async def _read_one_message(self) -> Optional[Message]: self.log.error(f"WebSocket Error: {message}") if isinstance(message.data, WebSocketError) and message.data.code == WSCloseCode.MESSAGE_TOO_BIG: # TODO: stop dropping tasks on the floor - asyncio.create_task(self.close(300)) # noqa: RUF006 + asyncio.create_task(self.close(RATE_LIMITER_BAN_SECONDS)) # noqa: RUF006 else: # TODO: stop dropping tasks on the floor asyncio.create_task(self.close()) # noqa: RUF006