Skip to content

Commit

Permalink
never rate limit outgoing response messages (RespondBlock, RespondBlo…
Browse files Browse the repository at this point in the history
…cks, RejectBlock, RejectBlocks). Instead, disconnect any peer sending unsolicited response blocks
  • Loading branch information
arvidn committed Dec 6, 2024
1 parent e03042c commit 10e6e76
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 22 deletions.
118 changes: 114 additions & 4 deletions chia/_tests/core/server/test_rate_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import pytest

from chia._tests.conftest import node_with_params
from chia._tests.util.time_out_assert import time_out_assert
from chia.protocols.full_node_protocol import RejectBlock, RejectBlocks, RespondBlock, RespondBlocks
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.protocols.shared_protocol import Capability
from chia.server.outbound_message import make_msg
Expand All @@ -13,7 +15,9 @@
from chia.server.rate_limits import RateLimiter
from chia.server.server import ChiaServer
from chia.server.ws_connection import WSChiaConnection
from chia.simulator.block_tools import BlockTools
from chia.types.peer_info import PeerInfo
from chia.util.ints import uint32

rl_v2 = [Capability.BASE, Capability.BLOCK_HEADERS, Capability.RATE_LIMITS_V2]
rl_v1 = [Capability.BASE]
Expand Down Expand Up @@ -89,7 +93,7 @@ async def test_too_much_data(self):
assert saw_disconnect

r = RateLimiter(incoming=True)
block_message = make_msg(ProtocolMessageTypes.respond_block, bytes([1] * 1024 * 1024))
block_message = make_msg(ProtocolMessageTypes.respond_unfinished_block, bytes([1] * 1024 * 1024))
for i in range(10):
assert r.process_msg_and_check(block_message, rl_v2, rl_v2) is None

Expand Down Expand Up @@ -124,7 +128,7 @@ async def test_non_tx_aggregate_limits(self):
# Size limits
r = RateLimiter(incoming=True)
message_4 = make_msg(ProtocolMessageTypes.respond_proof_of_weight, bytes([1] * 49 * 1024 * 1024))
message_5 = make_msg(ProtocolMessageTypes.respond_blocks, bytes([1] * 49 * 1024 * 1024))
message_5 = make_msg(ProtocolMessageTypes.request_blocks, bytes([1] * 49 * 1024 * 1024))

for i in range(2):
assert r.process_msg_and_check(message_4, rl_v2, rl_v2) is None
Expand Down Expand Up @@ -183,7 +187,7 @@ async def test_percentage_limits(self):
assert saw_disconnect

r = RateLimiter(True, 60, 40)
block_message = make_msg(ProtocolMessageTypes.respond_block, bytes([1] * 1024 * 1024))
block_message = make_msg(ProtocolMessageTypes.respond_unfinished_block, bytes([1] * 1024 * 1024))
for i in range(5):
assert r.process_msg_and_check(block_message, rl_v2, rl_v2) is None

Expand Down Expand Up @@ -215,7 +219,7 @@ async def test_percentage_limits(self):
# Aggregate percentage limit max total size
r = RateLimiter(True, 60, 40)
message_4 = make_msg(ProtocolMessageTypes.respond_proof_of_weight, bytes([1] * 18 * 1024 * 1024))
message_5 = make_msg(ProtocolMessageTypes.respond_blocks, bytes([1] * 24 * 1024 * 1024))
message_5 = make_msg(ProtocolMessageTypes.respond_unfinished_block, bytes([1] * 24 * 1024 * 1024))

for i in range(2):
assert r.process_msg_and_check(message_4, rl_v2, rl_v2) is None
Expand Down Expand Up @@ -367,3 +371,109 @@ async def test_compose(self):
# Otherwise, fall back to v1
assert ProtocolMessageTypes.request_block in rl_1["rate_limits_other"]
assert ProtocolMessageTypes.request_block not in rl_1["rate_limits_tx"]


@pytest.mark.anyio
@pytest.mark.parametrize(
"msg_type",
[
ProtocolMessageTypes.respond_blocks,
ProtocolMessageTypes.reject_blocks,
ProtocolMessageTypes.respond_block,
ProtocolMessageTypes.reject_block,
],
)
async def test_unlimited(msg_type: ProtocolMessageTypes):
r = RateLimiter(incoming=False)

message = make_msg(msg_type, bytes([1] * 10 * 1024 * 1024))

for i in range(1000):
# since this is a backwards compatible change, it also affects V1
assert r.process_msg_and_check(message, rl_v1, rl_v1) is None


@pytest.mark.anyio
@pytest.mark.parametrize(
"msg_type",
[
ProtocolMessageTypes.respond_blocks,
ProtocolMessageTypes.reject_blocks,
ProtocolMessageTypes.respond_block,
ProtocolMessageTypes.reject_block,
],
)
@pytest.mark.parametrize(
"node_with_params",
[
pytest.param(
dict(
disable_capabilities=[Capability.BLOCK_HEADERS, Capability.RATE_LIMITS_V2],
),
id="V1",
),
pytest.param(
dict(
disable_capabilities=[],
),
id="V2",
),
],
indirect=True,
)
@pytest.mark.parametrize(
"node_with_params_b",
[
pytest.param(
dict(
disable_capabilities=[Capability.BLOCK_HEADERS, Capability.RATE_LIMITS_V2],
),
id="V1",
),
pytest.param(
dict(
disable_capabilities=[],
),
id="V2",
),
],
indirect=True,
)
async def test_unsolicited_responses(
node_with_params, node_with_params_b, self_hostname: str, msg_type: ProtocolMessageTypes, bt: BlockTools
):
node_a = node_with_params
node_b = node_with_params_b

msg = {
ProtocolMessageTypes.respond_blocks: make_msg(
ProtocolMessageTypes.respond_blocks, bytes(RespondBlocks(uint32(1), uint32(2), []))
),
ProtocolMessageTypes.reject_blocks: make_msg(
ProtocolMessageTypes.reject_blocks, bytes(RejectBlocks(uint32(1), uint32(2)))
),
ProtocolMessageTypes.respond_block: make_msg(
ProtocolMessageTypes.respond_block, bytes(RespondBlock(bt.get_consecutive_blocks(1)[0]))
),
ProtocolMessageTypes.reject_block: make_msg(ProtocolMessageTypes.reject_block, bytes(RejectBlock(uint32(0)))),
}[msg_type]

full_node_server_a: ChiaServer = node_a.full_node.server
full_node_server_b: ChiaServer = node_b.full_node.server

await full_node_server_b.start_client(PeerInfo(self_hostname, full_node_server_a.get_port()), None)

assert len(full_node_server_b.get_connections()) == 1
assert len(full_node_server_a.get_connections()) == 1

a_con: WSChiaConnection = full_node_server_a.get_connections()[0]
b_con: WSChiaConnection = full_node_server_b.get_connections()[0]

assert not a_con.closed
assert not b_con.closed

await a_con.send_message(msg)

# make sure the connection is closed because of the unsolicited response
# message
await time_out_assert(5, lambda: a_con.closed)
37 changes: 25 additions & 12 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from chia.protocols import farmer_protocol, full_node_protocol, introducer_protocol, timelord_protocol, wallet_protocol
from chia.protocols.full_node_protocol import RejectBlock, RejectBlocks
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.protocols.protocol_timing import RATE_LIMITER_BAN_SECONDS
from chia.protocols.shared_protocol import Capability
from chia.protocols.wallet_protocol import (
CoinState,
Expand Down Expand Up @@ -402,29 +403,41 @@ async def request_blocks(self, request: full_node_protocol.RequestBlocks) -> Opt

return msg

@metadata.request()
async def reject_block(self, request: full_node_protocol.RejectBlock) -> None:
self.log.debug(f"reject_block {request.height}")
@metadata.request(peer_required=True)
async def reject_block(
self,
request: full_node_protocol.RejectBlock,
peer: WSChiaConnection,
) -> None:
self.log.warning(f"unsolicited reject_block {request.height}")
await peer.close(RATE_LIMITER_BAN_SECONDS)

@metadata.request()
async def reject_blocks(self, request: full_node_protocol.RejectBlocks) -> None:
self.log.debug(f"reject_blocks {request.start_height} {request.end_height}")
@metadata.request(peer_required=True)
async def reject_blocks(
self,
request: full_node_protocol.RejectBlocks,
peer: WSChiaConnection,
) -> None:
self.log.warning(f"reject_blocks {request.start_height} {request.end_height}")
await peer.close(RATE_LIMITER_BAN_SECONDS)

@metadata.request()
async def respond_blocks(self, request: full_node_protocol.RespondBlocks) -> None:
@metadata.request(peer_required=True)
async def respond_blocks(
self,
request: full_node_protocol.RespondBlocks,
peer: WSChiaConnection,
) -> None:
self.log.warning("Received unsolicited/late blocks")
await peer.close(RATE_LIMITER_BAN_SECONDS)

@metadata.request(peer_required=True)
async def respond_block(
self,
respond_block: full_node_protocol.RespondBlock,
peer: WSChiaConnection,
) -> Optional[Message]:
"""
Receive a full block from a peer full node (or ourselves).
"""

self.log.warning(f"Received unsolicited/late block from peer {peer.get_peer_logging()}")
await peer.close(RATE_LIMITER_BAN_SECONDS)
return None

@metadata.request()
Expand Down
1 change: 1 addition & 0 deletions chia/protocols/protocol_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
API_EXCEPTION_BAN_SECONDS = 10
INTERNAL_PROTOCOL_ERROR_BAN_SECONDS = 10 # Don't flap if our client is at fault
CONSENSUS_ERROR_BAN_SECONDS = 600
RATE_LIMITER_BAN_SECONDS = 300
8 changes: 4 additions & 4 deletions chia/server/rate_limit_numbers.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ def compose_rate_limits(old_rate_limits: dict[str, Any], new_rate_limits: dict[s
ProtocolMessageTypes.request_proof_of_weight: RLSettings(5, 100),
ProtocolMessageTypes.respond_proof_of_weight: RLSettings(5, 50 * 1024 * 1024, 100 * 1024 * 1024),
ProtocolMessageTypes.request_block: RLSettings(200, 100),
ProtocolMessageTypes.reject_block: RLSettings(200, 100),
ProtocolMessageTypes.reject_block: None,
ProtocolMessageTypes.request_blocks: RLSettings(500, 100),
ProtocolMessageTypes.respond_blocks: RLSettings(100, 50 * 1024 * 1024, 5 * 50 * 1024 * 1024),
ProtocolMessageTypes.reject_blocks: RLSettings(100, 100),
ProtocolMessageTypes.respond_block: RLSettings(200, 2 * 1024 * 1024, 10 * 2 * 1024 * 1024),
ProtocolMessageTypes.respond_blocks: None,
ProtocolMessageTypes.reject_blocks: None,
ProtocolMessageTypes.respond_block: None,
ProtocolMessageTypes.new_unfinished_block: RLSettings(200, 100),
ProtocolMessageTypes.request_unfinished_block: RLSettings(200, 100),
ProtocolMessageTypes.new_unfinished_block2: RLSettings(200, 100),
Expand Down
12 changes: 12 additions & 0 deletions chia/server/rate_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,20 @@ def process_msg_and_check(
limits: RLSettings = rate_limits["default_settings"]
if message_type in rate_limits["rate_limits_tx"]:
limits = rate_limits["rate_limits_tx"][message_type]
if limits is None:
# this message type is not rate limited. This is used for
# response messages and must be combined with banning peers
# sending unsolicited responses of this type
ret = True
return None
elif message_type in rate_limits["rate_limits_other"]:
limits = rate_limits["rate_limits_other"][message_type]
if limits is None:
# this message type is not rate limited. This is used for
# response messages and must be combined with banning peers
# sending unsolicited responses of this type
ret = True
return None
non_tx_freq = rate_limits["non_tx_freq"]
non_tx_max_total_size = rate_limits["non_tx_max_total_size"]
new_non_tx_count = self.non_tx_message_counts + 1
Expand Down
5 changes: 3 additions & 2 deletions chia/server/ws_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
API_EXCEPTION_BAN_SECONDS,
CONSENSUS_ERROR_BAN_SECONDS,
INTERNAL_PROTOCOL_ERROR_BAN_SECONDS,
RATE_LIMITER_BAN_SECONDS,
)
from chia.protocols.shared_protocol import Capability, Error, Handshake, protocol_version
from chia.server.api_protocol import ApiMetadata, ApiProtocol
Expand Down Expand Up @@ -713,7 +714,7 @@ async def _read_one_message(self) -> Optional[Message]:
self.log.error(f"Peer has been rate limited and will be disconnected: {details}")
# Only full node disconnects peers, to prevent abuse and crashing timelords, farmers, etc
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close(300)) # noqa: RUF006
asyncio.create_task(self.close(RATE_LIMITER_BAN_SECONDS)) # noqa: RUF006
await asyncio.sleep(3)
return None
else:
Expand All @@ -727,7 +728,7 @@ async def _read_one_message(self) -> Optional[Message]:
self.log.error(f"WebSocket Error: {message}")
if isinstance(message.data, WebSocketError) and message.data.code == WSCloseCode.MESSAGE_TOO_BIG:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close(300)) # noqa: RUF006
asyncio.create_task(self.close(RATE_LIMITER_BAN_SECONDS)) # noqa: RUF006
else:
# TODO: stop dropping tasks on the floor
asyncio.create_task(self.close()) # noqa: RUF006
Expand Down

0 comments on commit 10e6e76

Please sign in to comment.