Skip to content

Commit

Permalink
Passing GOSSIP_MAX_SIZE to snappy uncompressor (#8899)
Browse files Browse the repository at this point in the history
* Passing GOSSIP_MAX_SIZE to snappy uncompressor
* Using GOSSIP_MAX_SIZE from network spec on LibP2PGossipNetworkBuilder
---------

Co-authored-by: Enrico Del Fante <[email protected]>
  • Loading branch information
lucassaldanha and tbenr authored Dec 10, 2024
1 parent 005f007 commit dbed1e9
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
- Added a startup script for unix systems to ensure that when jemalloc is installed the script sets the LD_PRELOAD environment variable to the use the jemalloc library
- Set `is_syncing` to `false` instead of `true` for the `/eth/v1/node/syncing` API endpoint when the head is optimistic and the sync distance is 0
- Fix libp2p direct peers handling
- Added check for gossip message maximum uncompressed size
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@
*/
public class SnappyBlockCompressor {

public Bytes uncompress(final Bytes compressedData, final SszLengthBounds lengthBounds)
public Bytes uncompress(
final Bytes compressedData, final SszLengthBounds lengthBounds, final long maxBytesLength)
throws DecodingException {

try {
final int actualLength = Snappy.uncompressedLength(compressedData.toArrayUnsafe());
if (!lengthBounds.isWithinBounds(actualLength)) {
final int uncompressedLength = Snappy.uncompressedLength(compressedData.toArrayUnsafe());

if (uncompressedLength > maxBytesLength) {
throw new DecodingException(
String.format(
"Uncompressed length %d exceeds max length in bytes of %s",
uncompressedLength, maxBytesLength));
}

if (!lengthBounds.isWithinBounds(uncompressedLength)) {
throw new DecodingException(
String.format(
"Uncompressed length %d is not within expected bounds %s",
actualLength, lengthBounds.toString()));
uncompressedLength, lengthBounds));
}
return Bytes.wrap(Snappy.uncompress(compressedData.toArrayUnsafe()));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public DecodedMessageResult getDecodedMessage() {
if (valueType == null) {
return DecodedMessageResult.failed();
} else {
final Bytes decodedMessage = uncompressPayload();
final Bytes decodedMessage = uncompressPayload(networkingConfig.getGossipMaxSize());
return DecodedMessageResult.successful(decodedMessage);
}
} catch (DecodingException e) {
Expand All @@ -141,8 +141,9 @@ private Optional<Bytes> getUncompressed() {
return decodedResult.get().getDecodedMessage();
}

private Bytes uncompressPayload() throws DecodingException {
return snappyCompressor.uncompress(compressedData, valueType.getSszLengthBounds());
private Bytes uncompressPayload(final long maxUncompressedLength) throws DecodingException {
return snappyCompressor.uncompress(
compressedData, valueType.getSszLengthBounds(), maxUncompressedLength);
}

@Override
Expand All @@ -159,7 +160,11 @@ public Optional<UInt64> getArrivalTimestamp() {

@FunctionalInterface
interface Uncompressor {
Bytes uncompress(final Bytes compressedData, final SszLengthBounds lengthBounds)

Bytes uncompress(
final Bytes compressedData,
final SszLengthBounds lengthBounds,
final long maxUncompressedLengthInBytes)
throws DecodingException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import tech.pegasys.teku.infrastructure.ssz.sos.SszLengthBounds;

public class SnappyBlockCompressorTest {

private static final long GOSSIP_MAX_SIZE = Long.MAX_VALUE;

private final SnappyBlockCompressor compressor = new SnappyBlockCompressor();

@Test
Expand All @@ -29,7 +32,8 @@ public void roundTrip() throws DecodingException {

final Bytes compressed = compressor.compress(original);
assertThat(compressed).isNotEqualTo(original);
final Bytes uncompressed = compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000));
final Bytes uncompressed =
compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000), GOSSIP_MAX_SIZE);

assertThat(uncompressed).isEqualTo(original);
}
Expand All @@ -38,25 +42,61 @@ public void roundTrip() throws DecodingException {
public void uncompress_randomData() {
final Bytes data = Bytes.fromHexString("0x0102");

assertThatThrownBy(() -> compressor.uncompress(data, SszLengthBounds.ofBytes(0, 1000)))
assertThatThrownBy(
() -> compressor.uncompress(data, SszLengthBounds.ofBytes(0, 1000), GOSSIP_MAX_SIZE))
.isInstanceOf(DecodingException.class);
}

@Test
void uncompress_uncompressedLengthLongerThanMaxLength() {
void uncompress_uncompressedLengthLongerThanSszLenghtBounds() {
final Bytes original = Bytes.fromHexString("0x010203040506");

final Bytes compressed = compressor.compress(original);
assertThatThrownBy(() -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 4)))
.isInstanceOf(DecodingException.class);
assertThatThrownBy(
() -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 4), GOSSIP_MAX_SIZE))
.isInstanceOf(DecodingException.class)
.hasMessageContaining("not within expected bounds");
}

@Test
void uncompress_uncompressedLengthShorterThanMinLength() {
void uncompress_uncompressedLengthShorterThanSszLengthBounds() {
final Bytes original = Bytes.fromHexString("0x010203040506");

final Bytes compressed = compressor.compress(original);
assertThatThrownBy(() -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(100, 200)))
.isInstanceOf(DecodingException.class);

assertThatThrownBy(
() ->
compressor.uncompress(
compressed, SszLengthBounds.ofBytes(100, 200), GOSSIP_MAX_SIZE))
.isInstanceOf(DecodingException.class)
.hasMessageContaining("not within expected bounds");
}

@Test
void uncompress_uncompressedLengthLongerThanMaxBytesLength() {
final Bytes original = Bytes.fromHexString("0x010203040506");
final long smallMaxBytesLength = 3;
assertThat(smallMaxBytesLength).isLessThan(original.size());

final Bytes compressed = compressor.compress(original);
assertThatThrownBy(
() ->
compressor.uncompress(
compressed, SszLengthBounds.ofBytes(0, 1000), smallMaxBytesLength))
.isInstanceOf(DecodingException.class)
.hasMessageContaining("exceeds max length in bytes");
}

@Test
void uncompress_uncompressedLengthEqualThanMaxBytesLength() throws DecodingException {
final Bytes original = Bytes.fromHexString("0x010203040506");
final long exactMaxBytesLength = original.size();

final Bytes compressed = compressor.compress(original);
assertThat(compressed).isNotEqualTo(original);
final Bytes uncompressed =
compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000), exactMaxBytesLength);

assertThat(uncompressed).isEqualTo(original);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -51,9 +57,9 @@ public class SnappyPreparedGossipMessageTest {
Map.of(phase0ForkDigest, SpecMilestone.PHASE0, altairForkDigest, SpecMilestone.ALTAIR));

final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY;
final Uncompressor validUncompressor = (bytes, __) -> bytes;
final Uncompressor validUncompressor = (bytes, bounds, maxLength) -> bytes;
final Uncompressor invalidUncompressor =
(bytes, __) -> {
(bytes, bounds, maxLength) -> {
throw new DecodingException("testing");
};

Expand Down Expand Up @@ -156,6 +162,22 @@ public void getMessageId_generateUniqueIdsBasedOnValidityAndMilestone() {
assertThat(messageIds).hasSize(preparedMessages.size());
}

@Test
public void getDecodedMessage_ShouldPassGossipMaxSizeToUncompressor() throws DecodingException {
final long gossipMaxSize = spec.getNetworkingConfig().getGossipMaxSize();
final Uncompressor uncompressor = mock(Uncompressor.class);
when(uncompressor.uncompress(any(), any(), anyLong())).thenReturn(Bytes.random(1000));

final String altairTopic = GossipTopics.getTopic(altairForkDigest, "test", gossipEncoding);
final SnappyPreparedGossipMessage message =
getAltairMessage(messageBytes, altairTopic, uncompressor);

message.getDecodedMessage();

verify(uncompressor)
.uncompress(eq(messageBytes), eq(schema.getSszLengthBounds()), eq(gossipMaxSize));
}

private SnappyPreparedGossipMessage getPhase0Message(
final Bytes rawMessage, final String topic, final Uncompressor uncompressor) {
return SnappyPreparedGossipMessage.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@
import tech.pegasys.teku.networking.p2p.gossip.config.GossipScoringConfig;
import tech.pegasys.teku.networking.p2p.gossip.config.GossipTopicScoringConfig;
import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;

public class LibP2PParamsFactory {

public static final int MAX_SUBSCRIPTIONS_PER_MESSAGE = 200;
public static final int MAX_COMPRESSED_GOSSIP_SIZE = 10 * (1 << 20);

public static GossipParams createGossipParams(final GossipConfig gossipConfig) {
public static GossipParams createGossipParams(
final GossipConfig gossipConfig, final NetworkingSpecConfig networkingSpecConfig) {
final GossipParamsBuilder builder = GossipParams.builder();
addGossipParamsDValues(gossipConfig, builder);
addGossipParamsMiscValues(gossipConfig, builder);
addGossipParamsMaxValues(builder);
addGossipParamsMaxValues(networkingSpecConfig, builder);
return builder.build();
}

Expand All @@ -65,9 +66,10 @@ private static void addGossipParamsDValues(
.DOut(Math.min(gossipConfig.getD() / 2, Math.max(0, gossipConfig.getDLow() - 1)));
}

private static void addGossipParamsMaxValues(final GossipParamsBuilder builder) {
private static void addGossipParamsMaxValues(
final NetworkingSpecConfig networkingSpecConfig, final GossipParamsBuilder builder) {
builder
.maxGossipMessageSize(MAX_COMPRESSED_GOSSIP_SIZE)
.maxGossipMessageSize(networkingSpecConfig.getGossipMaxSize())
.maxPublishedMessages(1000)
.maxTopicsPerPublishedMessage(1)
.maxSubscriptions(MAX_SUBSCRIPTIONS_PER_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public LibP2PGossipNetwork build() {
validate();
final GossipTopicHandlers topicHandlers = new GossipTopicHandlers();
final Gossip gossip =
createGossip(gossipConfig, logWireGossip, gossipTopicFilter, topicHandlers);
createGossip(
gossipConfig, networkingSpecConfig, logWireGossip, gossipTopicFilter, topicHandlers);
final PubsubPublisherApi publisher = gossip.createPublisher(null, NULL_SEQNO_GENERATOR);

return new LibP2PGossipNetwork(metricsSystem, gossip, publisher, topicHandlers);
Expand All @@ -100,10 +101,11 @@ private void assertNotNull(final String fieldName, final Object fieldValue) {

protected GossipRouter createGossipRouter(
final GossipConfig gossipConfig,
final NetworkingSpecConfig networkingSpecConfig,
final GossipTopicFilter gossipTopicFilter,
final GossipTopicHandlers topicHandlers) {

final GossipParams gossipParams = LibP2PParamsFactory.createGossipParams(gossipConfig);
final GossipParams gossipParams =
LibP2PParamsFactory.createGossipParams(gossipConfig, networkingSpecConfig);
final GossipScoreParams scoreParams =
LibP2PParamsFactory.createGossipScoreParams(gossipConfig.getScoringConfig());

Expand Down Expand Up @@ -145,7 +147,7 @@ protected GossipRouter createGossipRouter(
.map(handler -> handler.prepareMessage(payload, arrivalTimestamp))
.orElse(
defaultMessageFactory.create(
topic, payload, networkingSpecConfig, arrivalTimestamp));
topic, payload, this.networkingSpecConfig, arrivalTimestamp));

return new PreparedPubsubMessage(msg, preparedMessage);
});
Expand All @@ -155,11 +157,13 @@ protected GossipRouter createGossipRouter(

protected Gossip createGossip(
final GossipConfig gossipConfig,
final NetworkingSpecConfig networkingSpecConfig,
final boolean gossipLogsEnabled,
final GossipTopicFilter gossipTopicFilter,
final GossipTopicHandlers topicHandlers) {

final GossipRouter router = createGossipRouter(gossipConfig, gossipTopicFilter, topicHandlers);
final GossipRouter router =
createGossipRouter(gossipConfig, networkingSpecConfig, gossipTopicFilter, topicHandlers);

if (gossipLogsEnabled) {
if (debugGossipHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,42 @@
package tech.pegasys.teku.networking.p2p.libp2p.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import io.libp2p.pubsub.gossip.GossipParams;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.networking.p2p.gossip.config.GossipConfig;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;

public class LibP2PParamsFactoryTest {

private final Spec spec = TestSpecFactory.createMinimalPhase0();

@Test
void createGossipParams_checkZeroDsSucceed() {
GossipConfig gossipConfig = GossipConfig.builder().d(0).dLow(0).dHigh(0).build();
final GossipConfig gossipConfig = GossipConfig.builder().d(0).dLow(0).dHigh(0).build();

GossipParams gossipParams = LibP2PParamsFactory.createGossipParams(gossipConfig);
final GossipParams gossipParams =
LibP2PParamsFactory.createGossipParams(gossipConfig, spec.getNetworkingConfig());

assertThat(gossipParams.getDOut()).isEqualTo(0);
}

@Test
public void createGossipParams_setGossipMaxSizeFromNetworkSpecConfig() {
final GossipConfig gossipConfig = GossipConfig.builder().build();
final NetworkingSpecConfig networkingSpecConfig = spy(spec.getNetworkingConfig());
final int expectedGossipMaxSize = networkingSpecConfig.getGossipMaxSize();
reset(networkingSpecConfig);

final GossipParams gossipParams =
LibP2PParamsFactory.createGossipParams(gossipConfig, networkingSpecConfig);

assertThat(gossipParams.getMaxGossipMessageSize()).isEqualTo(expectedGossipMaxSize);
verify(networkingSpecConfig).getGossipMaxSize();
}
}

0 comments on commit dbed1e9

Please sign in to comment.