From dbed1e9df96d7bdc0968f6bc41ef90594dc1f283 Mon Sep 17 00:00:00 2001 From: Lucas Saldanha Date: Wed, 11 Dec 2024 09:21:42 +1300 Subject: [PATCH] Passing GOSSIP_MAX_SIZE to snappy uncompressor (#8899) * Passing GOSSIP_MAX_SIZE to snappy uncompressor * Using GOSSIP_MAX_SIZE from network spec on LibP2PGossipNetworkBuilder --------- Co-authored-by: Enrico Del Fante --- CHANGELOG.md | 1 + .../encoding/SnappyBlockCompressor.java | 18 ++++-- .../encoding/SnappyPreparedGossipMessage.java | 13 +++-- .../encoding/SnappyBlockCompressorTest.java | 56 ++++++++++++++++--- .../SnappyPreparedGossipMessageTest.java | 26 ++++++++- .../libp2p/config/LibP2PParamsFactory.java | 12 ++-- .../gossip/LibP2PGossipNetworkBuilder.java | 14 +++-- .../config/LibP2PParamsFactoryTest.java | 27 ++++++++- 8 files changed, 136 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42fc0c2741b..c291d65dc4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,3 +17,4 @@ - Added a startup script for unix systems to ensure that when jemalloc is installed the script sets the LD_PRELOAD environment variable to the use the jemalloc library - Set `is_syncing` to `false` instead of `true` for the `/eth/v1/node/syncing` API endpoint when the head is optimistic and the sync distance is 0 - Fix libp2p direct peers handling +- Added check for gossip message maximum uncompressed size diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyBlockCompressor.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyBlockCompressor.java index 6c5ef38089a..e385883c87e 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyBlockCompressor.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyBlockCompressor.java @@ -24,16 +24,24 @@ */ public class SnappyBlockCompressor { - public Bytes uncompress(final Bytes compressedData, final SszLengthBounds lengthBounds) + public Bytes uncompress( + final Bytes compressedData, final SszLengthBounds lengthBounds, final long maxBytesLength) throws DecodingException { - try { - final int actualLength = Snappy.uncompressedLength(compressedData.toArrayUnsafe()); - if (!lengthBounds.isWithinBounds(actualLength)) { + final int uncompressedLength = Snappy.uncompressedLength(compressedData.toArrayUnsafe()); + + if (uncompressedLength > maxBytesLength) { + throw new DecodingException( + String.format( + "Uncompressed length %d exceeds max length in bytes of %s", + uncompressedLength, maxBytesLength)); + } + + if (!lengthBounds.isWithinBounds(uncompressedLength)) { throw new DecodingException( String.format( "Uncompressed length %d is not within expected bounds %s", - actualLength, lengthBounds.toString())); + uncompressedLength, lengthBounds)); } return Bytes.wrap(Snappy.uncompress(compressedData.toArrayUnsafe())); } catch (IOException e) { diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessage.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessage.java index 10006b308e7..a8bf01df95f 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessage.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessage.java @@ -124,7 +124,7 @@ public DecodedMessageResult getDecodedMessage() { if (valueType == null) { return DecodedMessageResult.failed(); } else { - final Bytes decodedMessage = uncompressPayload(); + final Bytes decodedMessage = uncompressPayload(networkingConfig.getGossipMaxSize()); return DecodedMessageResult.successful(decodedMessage); } } catch (DecodingException e) { @@ -141,8 +141,9 @@ private Optional getUncompressed() { return decodedResult.get().getDecodedMessage(); } - private Bytes uncompressPayload() throws DecodingException { - return snappyCompressor.uncompress(compressedData, valueType.getSszLengthBounds()); + private Bytes uncompressPayload(final long maxUncompressedLength) throws DecodingException { + return snappyCompressor.uncompress( + compressedData, valueType.getSszLengthBounds(), maxUncompressedLength); } @Override @@ -159,7 +160,11 @@ public Optional getArrivalTimestamp() { @FunctionalInterface interface Uncompressor { - Bytes uncompress(final Bytes compressedData, final SszLengthBounds lengthBounds) + + Bytes uncompress( + final Bytes compressedData, + final SszLengthBounds lengthBounds, + final long maxUncompressedLengthInBytes) throws DecodingException; } } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyBlockCompressorTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyBlockCompressorTest.java index 2c70c701e4c..21c7441ea92 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyBlockCompressorTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyBlockCompressorTest.java @@ -21,6 +21,9 @@ import tech.pegasys.teku.infrastructure.ssz.sos.SszLengthBounds; public class SnappyBlockCompressorTest { + + private static final long GOSSIP_MAX_SIZE = Long.MAX_VALUE; + private final SnappyBlockCompressor compressor = new SnappyBlockCompressor(); @Test @@ -29,7 +32,8 @@ public void roundTrip() throws DecodingException { final Bytes compressed = compressor.compress(original); assertThat(compressed).isNotEqualTo(original); - final Bytes uncompressed = compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000)); + final Bytes uncompressed = + compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000), GOSSIP_MAX_SIZE); assertThat(uncompressed).isEqualTo(original); } @@ -38,25 +42,61 @@ public void roundTrip() throws DecodingException { public void uncompress_randomData() { final Bytes data = Bytes.fromHexString("0x0102"); - assertThatThrownBy(() -> compressor.uncompress(data, SszLengthBounds.ofBytes(0, 1000))) + assertThatThrownBy( + () -> compressor.uncompress(data, SszLengthBounds.ofBytes(0, 1000), GOSSIP_MAX_SIZE)) .isInstanceOf(DecodingException.class); } @Test - void uncompress_uncompressedLengthLongerThanMaxLength() { + void uncompress_uncompressedLengthLongerThanSszLenghtBounds() { final Bytes original = Bytes.fromHexString("0x010203040506"); final Bytes compressed = compressor.compress(original); - assertThatThrownBy(() -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 4))) - .isInstanceOf(DecodingException.class); + assertThatThrownBy( + () -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 4), GOSSIP_MAX_SIZE)) + .isInstanceOf(DecodingException.class) + .hasMessageContaining("not within expected bounds"); } @Test - void uncompress_uncompressedLengthShorterThanMinLength() { + void uncompress_uncompressedLengthShorterThanSszLengthBounds() { final Bytes original = Bytes.fromHexString("0x010203040506"); final Bytes compressed = compressor.compress(original); - assertThatThrownBy(() -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(100, 200))) - .isInstanceOf(DecodingException.class); + + assertThatThrownBy( + () -> + compressor.uncompress( + compressed, SszLengthBounds.ofBytes(100, 200), GOSSIP_MAX_SIZE)) + .isInstanceOf(DecodingException.class) + .hasMessageContaining("not within expected bounds"); + } + + @Test + void uncompress_uncompressedLengthLongerThanMaxBytesLength() { + final Bytes original = Bytes.fromHexString("0x010203040506"); + final long smallMaxBytesLength = 3; + assertThat(smallMaxBytesLength).isLessThan(original.size()); + + final Bytes compressed = compressor.compress(original); + assertThatThrownBy( + () -> + compressor.uncompress( + compressed, SszLengthBounds.ofBytes(0, 1000), smallMaxBytesLength)) + .isInstanceOf(DecodingException.class) + .hasMessageContaining("exceeds max length in bytes"); + } + + @Test + void uncompress_uncompressedLengthEqualThanMaxBytesLength() throws DecodingException { + final Bytes original = Bytes.fromHexString("0x010203040506"); + final long exactMaxBytesLength = original.size(); + + final Bytes compressed = compressor.compress(original); + assertThat(compressed).isNotEqualTo(original); + final Bytes uncompressed = + compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000), exactMaxBytesLength); + + assertThat(uncompressed).isEqualTo(original); } } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageTest.java index c76f6c8eb16..d0d9ce682b4 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageTest.java @@ -15,6 +15,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.HashSet; import java.util.List; @@ -51,9 +57,9 @@ public class SnappyPreparedGossipMessageTest { Map.of(phase0ForkDigest, SpecMilestone.PHASE0, altairForkDigest, SpecMilestone.ALTAIR)); final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY; - final Uncompressor validUncompressor = (bytes, __) -> bytes; + final Uncompressor validUncompressor = (bytes, bounds, maxLength) -> bytes; final Uncompressor invalidUncompressor = - (bytes, __) -> { + (bytes, bounds, maxLength) -> { throw new DecodingException("testing"); }; @@ -156,6 +162,22 @@ public void getMessageId_generateUniqueIdsBasedOnValidityAndMilestone() { assertThat(messageIds).hasSize(preparedMessages.size()); } + @Test + public void getDecodedMessage_ShouldPassGossipMaxSizeToUncompressor() throws DecodingException { + final long gossipMaxSize = spec.getNetworkingConfig().getGossipMaxSize(); + final Uncompressor uncompressor = mock(Uncompressor.class); + when(uncompressor.uncompress(any(), any(), anyLong())).thenReturn(Bytes.random(1000)); + + final String altairTopic = GossipTopics.getTopic(altairForkDigest, "test", gossipEncoding); + final SnappyPreparedGossipMessage message = + getAltairMessage(messageBytes, altairTopic, uncompressor); + + message.getDecodedMessage(); + + verify(uncompressor) + .uncompress(eq(messageBytes), eq(schema.getSszLengthBounds()), eq(gossipMaxSize)); + } + private SnappyPreparedGossipMessage getPhase0Message( final Bytes rawMessage, final String topic, final Uncompressor uncompressor) { return SnappyPreparedGossipMessage.create( diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/config/LibP2PParamsFactory.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/config/LibP2PParamsFactory.java index c24629cd897..ea3abca876e 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/config/LibP2PParamsFactory.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/config/LibP2PParamsFactory.java @@ -29,17 +29,18 @@ import tech.pegasys.teku.networking.p2p.gossip.config.GossipScoringConfig; import tech.pegasys.teku.networking.p2p.gossip.config.GossipTopicScoringConfig; import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId; +import tech.pegasys.teku.spec.config.NetworkingSpecConfig; public class LibP2PParamsFactory { public static final int MAX_SUBSCRIPTIONS_PER_MESSAGE = 200; - public static final int MAX_COMPRESSED_GOSSIP_SIZE = 10 * (1 << 20); - public static GossipParams createGossipParams(final GossipConfig gossipConfig) { + public static GossipParams createGossipParams( + final GossipConfig gossipConfig, final NetworkingSpecConfig networkingSpecConfig) { final GossipParamsBuilder builder = GossipParams.builder(); addGossipParamsDValues(gossipConfig, builder); addGossipParamsMiscValues(gossipConfig, builder); - addGossipParamsMaxValues(builder); + addGossipParamsMaxValues(networkingSpecConfig, builder); return builder.build(); } @@ -65,9 +66,10 @@ private static void addGossipParamsDValues( .DOut(Math.min(gossipConfig.getD() / 2, Math.max(0, gossipConfig.getDLow() - 1))); } - private static void addGossipParamsMaxValues(final GossipParamsBuilder builder) { + private static void addGossipParamsMaxValues( + final NetworkingSpecConfig networkingSpecConfig, final GossipParamsBuilder builder) { builder - .maxGossipMessageSize(MAX_COMPRESSED_GOSSIP_SIZE) + .maxGossipMessageSize(networkingSpecConfig.getGossipMaxSize()) .maxPublishedMessages(1000) .maxTopicsPerPublishedMessage(1) .maxSubscriptions(MAX_SUBSCRIPTIONS_PER_MESSAGE) diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PGossipNetworkBuilder.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PGossipNetworkBuilder.java index 38fab38a0c2..d582da6eafe 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PGossipNetworkBuilder.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PGossipNetworkBuilder.java @@ -79,7 +79,8 @@ public LibP2PGossipNetwork build() { validate(); final GossipTopicHandlers topicHandlers = new GossipTopicHandlers(); final Gossip gossip = - createGossip(gossipConfig, logWireGossip, gossipTopicFilter, topicHandlers); + createGossip( + gossipConfig, networkingSpecConfig, logWireGossip, gossipTopicFilter, topicHandlers); final PubsubPublisherApi publisher = gossip.createPublisher(null, NULL_SEQNO_GENERATOR); return new LibP2PGossipNetwork(metricsSystem, gossip, publisher, topicHandlers); @@ -100,10 +101,11 @@ private void assertNotNull(final String fieldName, final Object fieldValue) { protected GossipRouter createGossipRouter( final GossipConfig gossipConfig, + final NetworkingSpecConfig networkingSpecConfig, final GossipTopicFilter gossipTopicFilter, final GossipTopicHandlers topicHandlers) { - - final GossipParams gossipParams = LibP2PParamsFactory.createGossipParams(gossipConfig); + final GossipParams gossipParams = + LibP2PParamsFactory.createGossipParams(gossipConfig, networkingSpecConfig); final GossipScoreParams scoreParams = LibP2PParamsFactory.createGossipScoreParams(gossipConfig.getScoringConfig()); @@ -145,7 +147,7 @@ protected GossipRouter createGossipRouter( .map(handler -> handler.prepareMessage(payload, arrivalTimestamp)) .orElse( defaultMessageFactory.create( - topic, payload, networkingSpecConfig, arrivalTimestamp)); + topic, payload, this.networkingSpecConfig, arrivalTimestamp)); return new PreparedPubsubMessage(msg, preparedMessage); }); @@ -155,11 +157,13 @@ protected GossipRouter createGossipRouter( protected Gossip createGossip( final GossipConfig gossipConfig, + final NetworkingSpecConfig networkingSpecConfig, final boolean gossipLogsEnabled, final GossipTopicFilter gossipTopicFilter, final GossipTopicHandlers topicHandlers) { - final GossipRouter router = createGossipRouter(gossipConfig, gossipTopicFilter, topicHandlers); + final GossipRouter router = + createGossipRouter(gossipConfig, networkingSpecConfig, gossipTopicFilter, topicHandlers); if (gossipLogsEnabled) { if (debugGossipHandler != null) { diff --git a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/config/LibP2PParamsFactoryTest.java b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/config/LibP2PParamsFactoryTest.java index 191fc456242..7d4003d6305 100644 --- a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/config/LibP2PParamsFactoryTest.java +++ b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/config/LibP2PParamsFactoryTest.java @@ -14,19 +14,42 @@ package tech.pegasys.teku.networking.p2p.libp2p.config; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import io.libp2p.pubsub.gossip.GossipParams; import org.junit.jupiter.api.Test; import tech.pegasys.teku.networking.p2p.gossip.config.GossipConfig; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.config.NetworkingSpecConfig; public class LibP2PParamsFactoryTest { + private final Spec spec = TestSpecFactory.createMinimalPhase0(); + @Test void createGossipParams_checkZeroDsSucceed() { - GossipConfig gossipConfig = GossipConfig.builder().d(0).dLow(0).dHigh(0).build(); + final GossipConfig gossipConfig = GossipConfig.builder().d(0).dLow(0).dHigh(0).build(); - GossipParams gossipParams = LibP2PParamsFactory.createGossipParams(gossipConfig); + final GossipParams gossipParams = + LibP2PParamsFactory.createGossipParams(gossipConfig, spec.getNetworkingConfig()); assertThat(gossipParams.getDOut()).isEqualTo(0); } + + @Test + public void createGossipParams_setGossipMaxSizeFromNetworkSpecConfig() { + final GossipConfig gossipConfig = GossipConfig.builder().build(); + final NetworkingSpecConfig networkingSpecConfig = spy(spec.getNetworkingConfig()); + final int expectedGossipMaxSize = networkingSpecConfig.getGossipMaxSize(); + reset(networkingSpecConfig); + + final GossipParams gossipParams = + LibP2PParamsFactory.createGossipParams(gossipConfig, networkingSpecConfig); + + assertThat(gossipParams.getMaxGossipMessageSize()).isEqualTo(expectedGossipMaxSize); + verify(networkingSpecConfig).getGossipMaxSize(); + } }