Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark local-el retrieved blobs as seen WRT gossip equivocation cache #8675

Merged
merged 5 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -63,6 +64,7 @@
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackerFactory;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator;
import tech.pegasys.teku.storage.client.RecentChainData;

public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHistoricalSlot
Expand Down Expand Up @@ -98,6 +100,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
private final ExecutionLayerChannel executionLayer;
private final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier;
private final Consumer<BlobSidecar> blobSidecarGossipPublisher;
private final int maxTrackers;

Expand Down Expand Up @@ -129,6 +132,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final UInt64 historicalSlotTolerance,
final UInt64 futureSlotTolerance,
Expand All @@ -140,6 +144,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
this.asyncRunner = asyncRunner;
this.recentChainData = recentChainData;
this.executionLayer = executionLayer;
this.gossipValidatorSupplier = gossipValidatorSupplier;
this.blobSidecarGossipPublisher = blobSidecarGossipPublisher;
this.maxTrackers = maxTrackers;
this.sizeGauge = sizeGauge;
Expand All @@ -159,6 +164,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final UInt64 historicalSlotTolerance,
final UInt64 futureSlotTolerance,
Expand All @@ -171,6 +177,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
this.asyncRunner = asyncRunner;
this.recentChainData = recentChainData;
this.executionLayer = executionLayer;
this.gossipValidatorSupplier = gossipValidatorSupplier;
this.blobSidecarGossipPublisher = blobSidecarGossipPublisher;
this.maxTrackers = maxTrackers;
this.sizeGauge = sizeGauge;
Expand Down Expand Up @@ -224,8 +231,8 @@ public synchronized void onNewBlobSidecar(
sizeGauge.set(++totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL);
countBlobSidecar(remoteOrigin);
newBlobSidecarSubscribers.deliver(NewBlobSidecarSubscriber::onNewBlobSidecar, blobSidecar);
if (remoteOrigin.equals(LOCAL_EL)) {
blobSidecarGossipPublisher.accept(blobSidecar);
if (remoteOrigin.equals(LOCAL_EL) && slotAndBlockRoot.getSlot().equals(getCurrentSlot())) {
publishRecoveredBlobSidecar(blobSidecar);
}
} else {
countDuplicateBlobSidecar(remoteOrigin);
Expand All @@ -236,6 +243,12 @@ public synchronized void onNewBlobSidecar(
}
}

private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) {
LOG.debug("Publishing recovered blob sidecar {}", blobSidecar::toLogString);
gossipValidatorSupplier.get().markForEquivocation(blobSidecar);
blobSidecarGossipPublisher.accept(blobSidecar);
}

private void countBlobSidecar(final RemoteOrigin origin) {
switch (origin) {
case RPC -> poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_RPC_SUBTYPE).inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
Expand All @@ -31,6 +32,7 @@
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackerFactory;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator;
import tech.pegasys.teku.storage.client.RecentChainData;

public class PoolFactory {
Expand Down Expand Up @@ -117,6 +119,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher) {
return createPoolForBlockBlobSidecarsTrackers(
blockImportChannel,
Expand All @@ -125,6 +128,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
asyncRunner,
recentChainData,
executionLayer,
gossipValidatorSupplier,
blobSidecarGossipPublisher,
DEFAULT_HISTORICAL_SLOT_TOLERANCE,
FutureItems.DEFAULT_FUTURE_SLOT_TOLERANCE,
Expand All @@ -138,6 +142,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final UInt64 historicalBlockTolerance,
final UInt64 futureBlockTolerance,
Expand All @@ -151,6 +156,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
asyncRunner,
recentChainData,
executionLayer,
gossipValidatorSupplier,
blobSidecarGossipPublisher,
historicalBlockTolerance,
futureBlockTolerance,
Expand All @@ -165,6 +171,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final UInt64 historicalBlockTolerance,
final UInt64 futureBlockTolerance,
Expand All @@ -179,6 +186,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
asyncRunner,
recentChainData,
executionLayer,
gossipValidatorSupplier,
blobSidecarGossipPublisher,
historicalBlockTolerance,
futureBlockTolerance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,7 @@ public SafeFuture<InternalValidationResult> validate(final BlobSidecar blobSidec
* [IGNORE] The sidecar is the first sidecar for the tuple (block_header.slot, block_header.proposer_index, blob_sidecar.index)
* with valid header signature, sidecar inclusion proof, and kzg proof.
*/
if (!receivedValidBlobSidecarInfoSet.add(
new SlotProposerIndexAndBlobIndex(
blockHeader.getSlot(),
blockHeader.getProposerIndex(),
blobSidecar.getIndex()))) {
if (!markForEquivocation(blockHeader, blobSidecar.getIndex())) {
return ignore(
"BlobSidecar is not the first valid for its slot and index. It will be dropped.");
}
Expand All @@ -277,6 +273,17 @@ public SafeFuture<InternalValidationResult> validate(final BlobSidecar blobSidec
});
}

private boolean markForEquivocation(final BeaconBlockHeader blockHeader, final UInt64 index) {
return receivedValidBlobSidecarInfoSet.add(
new SlotProposerIndexAndBlobIndex(
blockHeader.getSlot(), blockHeader.getProposerIndex(), index));
}

public boolean markForEquivocation(final BlobSidecar blobSidecar) {
return markForEquivocation(
blobSidecar.getSignedBeaconBlockHeader().getMessage(), blobSidecar.getIndex());
}

private SafeFuture<InternalValidationResult> validateBlobSidecarWithKnownValidHeader(
final BlobSidecar blobSidecar, final BeaconBlockHeader blockHeader) {

Expand Down Expand Up @@ -310,9 +317,7 @@ private SafeFuture<InternalValidationResult> validateBlobSidecarWithKnownValidHe
* [IGNORE] The sidecar is the first sidecar for the tuple (block_header.slot, block_header.proposer_index, blob_sidecar.index)
* with valid header signature, sidecar inclusion proof, and kzg proof.
*/
if (!receivedValidBlobSidecarInfoSet.add(
new SlotProposerIndexAndBlobIndex(
blockHeader.getSlot(), blockHeader.getProposerIndex(), blobSidecar.getIndex()))) {
if (!markForEquivocation(blockHeader, blobSidecar.getIndex())) {
return SafeFuture.completedFuture(
ignore("BlobSidecar is not the first valid for its slot and index. It will be dropped."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTracker;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator;
import tech.pegasys.teku.storage.client.RecentChainData;

public class BlockBlobSidecarsTrackersPoolImplTest {
Expand All @@ -82,6 +83,9 @@ public class BlockBlobSidecarsTrackersPoolImplTest {
@SuppressWarnings("unchecked")
private final Consumer<BlobSidecar> blobSidecarPublisher = mock(Consumer.class);

private final BlobSidecarGossipValidator blobSidecarGossipValidator =
mock(BlobSidecarGossipValidator.class);

private final BlockImportChannel blockImportChannel = mock(BlockImportChannel.class);
private final int maxItems = 15;
private final BlockBlobSidecarsTrackersPoolImpl blockBlobSidecarsTrackersPool =
Expand All @@ -93,6 +97,7 @@ public class BlockBlobSidecarsTrackersPoolImplTest {
asyncRunner,
recentChainData,
executionLayer,
() -> blobSidecarGossipValidator,
blobSidecarPublisher,
historicalTolerance,
futureTolerance,
Expand Down Expand Up @@ -203,7 +208,7 @@ public void onNewBlobSidecar_shouldIgnoreDuplicates() {
}

@Test
public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalEL() {
public void onNewBlobSidecar_shouldMarkForEquivocationAndPublishWhenOriginIsLocalEL() {
final BlobSidecar blobSidecar1 =
dataStructureUtil
.createRandomBlobSidecarBuilder()
Expand All @@ -220,16 +225,58 @@ public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalEL() {
.signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot))
.build();

when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(true);

blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL);
blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar2, RemoteOrigin.GOSSIP);
blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar3, RemoteOrigin.RPC);

assertBlobSidecarsCount(3);
assertBlobSidecarsTrackersCount(3);

verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, times(1)).accept(blobSidecar1);
}

@Test
public void onNewBlobSidecar_shouldPublishEvenWhenOriginIsLocalELButEquivocating() {
tbenr marked this conversation as resolved.
Show resolved Hide resolved
final BlobSidecar blobSidecar1 =
dataStructureUtil
.createRandomBlobSidecarBuilder()
.signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot))
.build();

when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(false);

blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL);

assertBlobSidecarsCount(1);
assertBlobSidecarsTrackersCount(1);

verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, times(1)).accept(blobSidecar1);
}

@Test
public void onNewBlobSidecar_shouldNotPublishWhenOriginIsLocalELIsNotCurrentSlot() {
final BlobSidecar blobSidecar1 =
dataStructureUtil
.createRandomBlobSidecarBuilder()
.signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot))
.build();

when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(false);
blockBlobSidecarsTrackersPool.onSlot(currentSlot.plus(1));

blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL);

assertBlobSidecarsCount(1);
assertBlobSidecarsTrackersCount(1);

verify(blobSidecarGossipValidator, never()).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, never()).accept(blobSidecar1);
}

@Test
public void onNewBlock_shouldIgnorePreDenebBlocks() {
final Spec spec = TestSpecFactory.createMainnetCapella();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.statetransition.validation;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
Expand Down Expand Up @@ -277,6 +278,16 @@ void shouldTrackValidInfoSet() {
.isCompletedWithValueMatching(InternalValidationResult::isIgnore);
}

@TestTemplate
void shouldMarkForEquivocation() {
assertThat(blobSidecarValidator.markForEquivocation(blobSidecar)).isTrue();

assertThat(blobSidecarValidator.markForEquivocation(blobSidecar)).isFalse();

SafeFutureAssert.assertThatSafeFuture(blobSidecarValidator.validate(blobSidecar))
.isCompletedWithValueMatching(InternalValidationResult::isIgnore);
}

@TestTemplate
void shouldIgnoreImmediatelyWhenBlobFromValidInfoSet() {
SafeFutureAssert.assertThatSafeFuture(blobSidecarValidator.validate(blobSidecar))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public class BeaconChainController extends Service implements BeaconChainControl
protected volatile GossipValidationHelper gossipValidationHelper;
protected volatile KZG kzg;
protected volatile BlobSidecarManager blobSidecarManager;
protected volatile BlobSidecarGossipValidator blobSidecarValidator;
protected volatile Optional<TerminalPowBlockMonitor> terminalPowBlockMonitor = Optional.empty();
protected volatile ProposersDataManager proposersDataManager;
protected volatile KeyValueStore<String, Bytes> keyValueStore;
Expand Down Expand Up @@ -568,7 +569,7 @@ protected void initBlobSidecarManager() {
LimitedMap.createSynchronizedLRU(500);
final MiscHelpersDeneb miscHelpers =
MiscHelpersDeneb.required(spec.forMilestone(SpecMilestone.DENEB).miscHelpers());
final BlobSidecarGossipValidator blobSidecarValidator =
blobSidecarValidator =
BlobSidecarGossipValidator.create(
spec, invalidBlockRoots, gossipValidationHelper, miscHelpers, kzg);
final BlobSidecarManagerImpl blobSidecarManagerImpl =
Expand Down Expand Up @@ -626,6 +627,7 @@ protected void initBlockBlobSidecarsTrackersPool() {
beaconAsyncRunner,
recentChainData,
executionLayer,
() -> blobSidecarValidator,
blobSidecarGossipChannel::publishBlobSidecar);
eventChannels.subscribe(FinalizedCheckpointChannel.class, pool);
blockBlobSidecarsTrackersPool = pool;
Expand Down