Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing GOSSIP_MAX_SIZE to snappy uncompressor #8899

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
- Added a startup script for unix systems to ensure that when jemalloc is installed the script sets the LD_PRELOAD environment variable to the use the jemalloc library
- Set `is_syncing` to `false` instead of `true` for the `/eth/v1/node/syncing` API endpoint when the head is optimistic and the sync distance is 0
- Fix libp2p direct peers handling
- Added check for gossip message maximum uncompressed size
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@
*/
public class SnappyBlockCompressor {

public Bytes uncompress(final Bytes compressedData, final SszLengthBounds lengthBounds)
public Bytes uncompress(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a check to compress to ensure that the uncompressed size is <= maxBytesLength.

Likewise, clients MUST NOT emit or propagate messages larger than this limit.

https://github.com/ethereum/consensus-specs/blob/3d180786af8f661a9dc53d1aadf0f89972a56024/specs/phase0/p2p-interface.md?plain=1#L276

final Bytes compressedData, final SszLengthBounds lengthBounds, final long maxBytesLength)
throws DecodingException {

try {
final int actualLength = Snappy.uncompressedLength(compressedData.toArrayUnsafe());
if (!lengthBounds.isWithinBounds(actualLength)) {
final int uncompressedLength = Snappy.uncompressedLength(compressedData.toArrayUnsafe());

if (uncompressedLength > maxBytesLength) {
throw new DecodingException(
String.format(
"Uncompressed length %d exceeds max length in bytes of %s",
uncompressedLength, maxBytesLength));
}

if (!lengthBounds.isWithinBounds(uncompressedLength)) {
throw new DecodingException(
String.format(
"Uncompressed length %d is not within expected bounds %s",
actualLength, lengthBounds.toString()));
uncompressedLength, lengthBounds));
}
return Bytes.wrap(Snappy.uncompress(compressedData.toArrayUnsafe()));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public DecodedMessageResult getDecodedMessage() {
if (valueType == null) {
return DecodedMessageResult.failed();
} else {
final Bytes decodedMessage = uncompressPayload();
final Bytes decodedMessage = uncompressPayload(networkingConfig.getGossipMaxSize());
return DecodedMessageResult.successful(decodedMessage);
}
} catch (DecodingException e) {
Expand All @@ -141,8 +141,9 @@ private Optional<Bytes> getUncompressed() {
return decodedResult.get().getDecodedMessage();
}

private Bytes uncompressPayload() throws DecodingException {
return snappyCompressor.uncompress(compressedData, valueType.getSszLengthBounds());
private Bytes uncompressPayload(final long maxUncompressedLength) throws DecodingException {
return snappyCompressor.uncompress(
compressedData, valueType.getSszLengthBounds(), maxUncompressedLength);
}

@Override
Expand All @@ -159,7 +160,11 @@ public Optional<UInt64> getArrivalTimestamp() {

@FunctionalInterface
interface Uncompressor {
Bytes uncompress(final Bytes compressedData, final SszLengthBounds lengthBounds)

Bytes uncompress(
final Bytes compressedData,
final SszLengthBounds lengthBounds,
final long maxUncompressedLengthInBytes)
throws DecodingException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import tech.pegasys.teku.infrastructure.ssz.sos.SszLengthBounds;

public class SnappyBlockCompressorTest {

private static final long GOSSIP_MAX_SIZE = Long.MAX_VALUE;

private final SnappyBlockCompressor compressor = new SnappyBlockCompressor();

@Test
Expand All @@ -29,7 +32,8 @@ public void roundTrip() throws DecodingException {

final Bytes compressed = compressor.compress(original);
assertThat(compressed).isNotEqualTo(original);
final Bytes uncompressed = compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000));
final Bytes uncompressed =
compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000), GOSSIP_MAX_SIZE);

assertThat(uncompressed).isEqualTo(original);
}
Expand All @@ -38,25 +42,61 @@ public void roundTrip() throws DecodingException {
public void uncompress_randomData() {
final Bytes data = Bytes.fromHexString("0x0102");

assertThatThrownBy(() -> compressor.uncompress(data, SszLengthBounds.ofBytes(0, 1000)))
assertThatThrownBy(
() -> compressor.uncompress(data, SszLengthBounds.ofBytes(0, 1000), GOSSIP_MAX_SIZE))
.isInstanceOf(DecodingException.class);
}

@Test
void uncompress_uncompressedLengthLongerThanMaxLength() {
void uncompress_uncompressedLengthLongerThanSszLenghtBounds() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- uncompress_uncompressedLengthLongerThanSszLenghtBounds()
+ uncompress_uncompressedLengthLongerThanSszLenghthBounds()

final Bytes original = Bytes.fromHexString("0x010203040506");

final Bytes compressed = compressor.compress(original);
assertThatThrownBy(() -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 4)))
.isInstanceOf(DecodingException.class);
assertThatThrownBy(
() -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 4), GOSSIP_MAX_SIZE))
.isInstanceOf(DecodingException.class)
.hasMessageContaining("not within expected bounds");
}

@Test
void uncompress_uncompressedLengthShorterThanMinLength() {
void uncompress_uncompressedLengthShorterThanSszLengthBounds() {
final Bytes original = Bytes.fromHexString("0x010203040506");

final Bytes compressed = compressor.compress(original);
assertThatThrownBy(() -> compressor.uncompress(compressed, SszLengthBounds.ofBytes(100, 200)))
.isInstanceOf(DecodingException.class);

assertThatThrownBy(
() ->
compressor.uncompress(
compressed, SszLengthBounds.ofBytes(100, 200), GOSSIP_MAX_SIZE))
.isInstanceOf(DecodingException.class)
.hasMessageContaining("not within expected bounds");
}

@Test
void uncompress_uncompressedLengthLongerThanMaxBytesLength() {
final Bytes original = Bytes.fromHexString("0x010203040506");
final long smallMaxBytesLength = 3;
assertThat(smallMaxBytesLength).isLessThan(original.size());
Comment on lines +77 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be a little more precise, I would probably do something like:

assertThat(original.size()).isNotEqualTo(0);
final long smallMaxBytesLength = original.size() - 1;


final Bytes compressed = compressor.compress(original);
assertThatThrownBy(
() ->
compressor.uncompress(
compressed, SszLengthBounds.ofBytes(0, 1000), smallMaxBytesLength))
.isInstanceOf(DecodingException.class)
.hasMessageContaining("exceeds max length in bytes");
}

@Test
void uncompress_uncompressedLengthEqualThanMaxBytesLength() throws DecodingException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- uncompress_uncompressedLengthEqualThanMaxBytesLength()
+ uncompress_uncompressedLengthEqualToMaxBytesLength()

final Bytes original = Bytes.fromHexString("0x010203040506");
final long exactMaxBytesLength = original.size();

final Bytes compressed = compressor.compress(original);
assertThat(compressed).isNotEqualTo(original);
final Bytes uncompressed =
compressor.uncompress(compressed, SszLengthBounds.ofBytes(0, 1000), exactMaxBytesLength);

assertThat(uncompressed).isEqualTo(original);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -51,9 +57,9 @@ public class SnappyPreparedGossipMessageTest {
Map.of(phase0ForkDigest, SpecMilestone.PHASE0, altairForkDigest, SpecMilestone.ALTAIR));

final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY;
final Uncompressor validUncompressor = (bytes, __) -> bytes;
final Uncompressor validUncompressor = (bytes, bounds, maxLength) -> bytes;
final Uncompressor invalidUncompressor =
(bytes, __) -> {
(bytes, bounds, maxLength) -> {
throw new DecodingException("testing");
};

Expand Down Expand Up @@ -156,6 +162,22 @@ public void getMessageId_generateUniqueIdsBasedOnValidityAndMilestone() {
assertThat(messageIds).hasSize(preparedMessages.size());
}

@Test
public void getDecodedMessage_ShouldPassGossipMaxSizeToUncompressor() throws DecodingException {
final long gossipMaxSize = spec.getNetworkingConfig().getGossipMaxSize();
final Uncompressor uncompressor = mock(Uncompressor.class);
when(uncompressor.uncompress(any(), any(), anyLong())).thenReturn(Bytes.random(1000));

final String altairTopic = GossipTopics.getTopic(altairForkDigest, "test", gossipEncoding);
final SnappyPreparedGossipMessage message =
getAltairMessage(messageBytes, altairTopic, uncompressor);

message.getDecodedMessage();

verify(uncompressor)
.uncompress(eq(messageBytes), eq(schema.getSszLengthBounds()), eq(gossipMaxSize));
}

private SnappyPreparedGossipMessage getPhase0Message(
final Bytes rawMessage, final String topic, final Uncompressor uncompressor) {
return SnappyPreparedGossipMessage.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@
import tech.pegasys.teku.networking.p2p.gossip.config.GossipScoringConfig;
import tech.pegasys.teku.networking.p2p.gossip.config.GossipTopicScoringConfig;
import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;

public class LibP2PParamsFactory {

public static final int MAX_SUBSCRIPTIONS_PER_MESSAGE = 200;
public static final int MAX_COMPRESSED_GOSSIP_SIZE = 10 * (1 << 20);

public static GossipParams createGossipParams(final GossipConfig gossipConfig) {
public static GossipParams createGossipParams(
final GossipConfig gossipConfig, final NetworkingSpecConfig networkingSpecConfig) {
final GossipParamsBuilder builder = GossipParams.builder();
addGossipParamsDValues(gossipConfig, builder);
addGossipParamsMiscValues(gossipConfig, builder);
addGossipParamsMaxValues(builder);
addGossipParamsMaxValues(networkingSpecConfig, builder);
return builder.build();
}

Expand All @@ -65,9 +66,10 @@ private static void addGossipParamsDValues(
.DOut(Math.min(gossipConfig.getD() / 2, Math.max(0, gossipConfig.getDLow() - 1)));
}

private static void addGossipParamsMaxValues(final GossipParamsBuilder builder) {
private static void addGossipParamsMaxValues(
final NetworkingSpecConfig networkingSpecConfig, final GossipParamsBuilder builder) {
builder
.maxGossipMessageSize(MAX_COMPRESSED_GOSSIP_SIZE)
.maxGossipMessageSize(networkingSpecConfig.getGossipMaxSize())
.maxPublishedMessages(1000)
.maxTopicsPerPublishedMessage(1)
.maxSubscriptions(MAX_SUBSCRIPTIONS_PER_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public LibP2PGossipNetwork build() {
validate();
final GossipTopicHandlers topicHandlers = new GossipTopicHandlers();
final Gossip gossip =
createGossip(gossipConfig, logWireGossip, gossipTopicFilter, topicHandlers);
createGossip(
gossipConfig, networkingSpecConfig, logWireGossip, gossipTopicFilter, topicHandlers);
final PubsubPublisherApi publisher = gossip.createPublisher(null, NULL_SEQNO_GENERATOR);

return new LibP2PGossipNetwork(metricsSystem, gossip, publisher, topicHandlers);
Expand All @@ -100,10 +101,11 @@ private void assertNotNull(final String fieldName, final Object fieldValue) {

protected GossipRouter createGossipRouter(
final GossipConfig gossipConfig,
final NetworkingSpecConfig networkingSpecConfig,
final GossipTopicFilter gossipTopicFilter,
final GossipTopicHandlers topicHandlers) {

final GossipParams gossipParams = LibP2PParamsFactory.createGossipParams(gossipConfig);
final GossipParams gossipParams =
LibP2PParamsFactory.createGossipParams(gossipConfig, networkingSpecConfig);
final GossipScoreParams scoreParams =
LibP2PParamsFactory.createGossipScoreParams(gossipConfig.getScoringConfig());

Expand Down Expand Up @@ -145,7 +147,7 @@ protected GossipRouter createGossipRouter(
.map(handler -> handler.prepareMessage(payload, arrivalTimestamp))
.orElse(
defaultMessageFactory.create(
topic, payload, networkingSpecConfig, arrivalTimestamp));
topic, payload, this.networkingSpecConfig, arrivalTimestamp));

return new PreparedPubsubMessage(msg, preparedMessage);
});
Expand All @@ -155,11 +157,13 @@ protected GossipRouter createGossipRouter(

protected Gossip createGossip(
final GossipConfig gossipConfig,
final NetworkingSpecConfig networkingSpecConfig,
final boolean gossipLogsEnabled,
final GossipTopicFilter gossipTopicFilter,
final GossipTopicHandlers topicHandlers) {

final GossipRouter router = createGossipRouter(gossipConfig, gossipTopicFilter, topicHandlers);
final GossipRouter router =
createGossipRouter(gossipConfig, networkingSpecConfig, gossipTopicFilter, topicHandlers);

if (gossipLogsEnabled) {
if (debugGossipHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,42 @@
package tech.pegasys.teku.networking.p2p.libp2p.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import io.libp2p.pubsub.gossip.GossipParams;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.networking.p2p.gossip.config.GossipConfig;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;

public class LibP2PParamsFactoryTest {

private final Spec spec = TestSpecFactory.createMinimalPhase0();

@Test
void createGossipParams_checkZeroDsSucceed() {
GossipConfig gossipConfig = GossipConfig.builder().d(0).dLow(0).dHigh(0).build();
final GossipConfig gossipConfig = GossipConfig.builder().d(0).dLow(0).dHigh(0).build();

GossipParams gossipParams = LibP2PParamsFactory.createGossipParams(gossipConfig);
final GossipParams gossipParams =
LibP2PParamsFactory.createGossipParams(gossipConfig, spec.getNetworkingConfig());

assertThat(gossipParams.getDOut()).isEqualTo(0);
}

@Test
public void createGossipParams_setGossipMaxSizeFromNetworkSpecConfig() {
final GossipConfig gossipConfig = GossipConfig.builder().build();
final NetworkingSpecConfig networkingSpecConfig = spy(spec.getNetworkingConfig());
final int expectedGossipMaxSize = networkingSpecConfig.getGossipMaxSize();
reset(networkingSpecConfig);

final GossipParams gossipParams =
LibP2PParamsFactory.createGossipParams(gossipConfig, networkingSpecConfig);

assertThat(gossipParams.getMaxGossipMessageSize()).isEqualTo(expectedGossipMaxSize);
verify(networkingSpecConfig).getGossipMaxSize();
}
}
Loading