Skip to content

Commit

Permalink
Move blobs availability check to on_execution_payload
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Oct 9, 2024
1 parent c26d303 commit 94440f3
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadEnvelope;
import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment;
import tech.pegasys.teku.spec.logic.versions.deneb.blobs.BlobSidecarsAndValidationResult;
import tech.pegasys.teku.spec.logic.versions.deneb.blobs.BlobSidecarsAvailabilityChecker;
Expand Down Expand Up @@ -125,6 +126,12 @@ private BlobSidecarsAndValidationResult validateImmediately(
};
}

@Override
public BlobSidecarsAvailabilityChecker createAvailabilityChecker(
final SignedBeaconBlock block, final ExecutionPayloadEnvelope executionPayloadEnvelope) {
throw new UnsupportedOperationException("Not yet implemented");
}

@Override
public BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmediately(
final SignedBeaconBlock block, final List<BlobSidecar> blobSidecars) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadEnvelope;
import tech.pegasys.teku.spec.logic.versions.deneb.blobs.BlobSidecarsAndValidationResult;
import tech.pegasys.teku.spec.logic.versions.deneb.blobs.BlobSidecarsAvailabilityChecker;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
Expand Down Expand Up @@ -52,6 +53,13 @@ public BlobSidecarsAvailabilityChecker createAvailabilityChecker(
return BlobSidecarsAvailabilityChecker.NOOP;
}

@Override
public BlobSidecarsAvailabilityChecker createAvailabilityChecker(
final SignedBeaconBlock block,
final ExecutionPayloadEnvelope executionPayloadEnvelope) {
return BlobSidecarsAvailabilityChecker.NOOP;
}

@Override
public BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmediately(
final SignedBeaconBlock block, final List<BlobSidecar> blobSidecars) {
Expand All @@ -70,6 +78,10 @@ SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(

BlobSidecarsAvailabilityChecker createAvailabilityChecker(SignedBeaconBlock block);

// ePBS
BlobSidecarsAvailabilityChecker createAvailabilityChecker(
SignedBeaconBlock block, ExecutionPayloadEnvelope executionPayloadEnvelope);

BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmediately(
SignedBeaconBlock block, List<BlobSidecar> blobSidecars);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadEnvelope;
import tech.pegasys.teku.spec.logic.versions.deneb.blobs.BlobSidecarsAndValidationResult;
import tech.pegasys.teku.spec.logic.versions.deneb.blobs.BlobSidecarsAvailabilityChecker;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceBlobSidecarsAvailabilityChecker;
Expand Down Expand Up @@ -132,6 +133,17 @@ public BlobSidecarsAvailabilityChecker createAvailabilityChecker(final SignedBea
spec, asyncRunner, recentChainData, blockBlobSidecarsTracker, kzg);
}

@Override
public BlobSidecarsAvailabilityChecker createAvailabilityChecker(
final SignedBeaconBlock block, final ExecutionPayloadEnvelope executionPayloadEnvelope) {
final BlockBlobSidecarsTracker blockBlobSidecarsTracker =
blockBlobSidecarsTrackersPool.getOrCreateBlockBlobSidecarsTracker(
block, executionPayloadEnvelope);

return new ForkChoiceBlobSidecarsAvailabilityChecker(
spec, asyncRunner, recentChainData, blockBlobSidecarsTracker, kzg);
}

@Override
public BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmediately(
final SignedBeaconBlock block, final List<BlobSidecar> blobSidecars) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadEnvelope;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;

Expand All @@ -52,6 +53,9 @@ public class BlockBlobSidecarsTracker {
private final AtomicReference<Optional<SignedBeaconBlock>> block =
new AtomicReference<>(Optional.empty());

private final AtomicReference<Optional<ExecutionPayloadEnvelope>> executionPayloadEnvelope =
new AtomicReference<>(Optional.empty());

private final AtomicBoolean blockImportOnCompletionEnabled = new AtomicBoolean(false);

private final NavigableMap<UInt64, BlobSidecar> blobSidecars = new ConcurrentSkipListMap<>();
Expand Down Expand Up @@ -101,6 +105,10 @@ public Optional<SignedBeaconBlock> getBlock() {
return block.get();
}

public Optional<ExecutionPayloadEnvelope> getExecutionPayloadEnvelope() {
return executionPayloadEnvelope.get();
}

public boolean containsBlobSidecar(final BlobIdentifier blobIdentifier) {
return Optional.ofNullable(blobSidecars.get(blobIdentifier.getIndex()))
.map(blobSidecar -> blobSidecar.getBlockRoot().equals(blobIdentifier.getBlockRoot()))
Expand Down Expand Up @@ -193,6 +201,29 @@ public boolean setBlock(final SignedBeaconBlock block) {
return true;
}

public boolean setBlockAndExecutionPayloadEnvelope(
final SignedBeaconBlock block, final ExecutionPayloadEnvelope executionPayloadEnvelope) {
checkArgument(block.getSlotAndBlockRoot().equals(slotAndBlockRoot), "Wrong block");
final Optional<SignedBeaconBlock> oldBlock = this.block.getAndSet(Optional.of(block));
final Optional<ExecutionPayloadEnvelope> oldExecutionPayloadEnvelope =
this.executionPayloadEnvelope.getAndSet(Optional.of(executionPayloadEnvelope));
if (oldBlock.isPresent() || oldExecutionPayloadEnvelope.isPresent()) {
return false;
}

LOG.debug(
"Block and execution payload envelope received for {}", slotAndBlockRoot::toLogString);
maybeDebugTimings.ifPresent(
debugTimings -> {
debugTimings.put(BLOCK_ARRIVAL_TIMING_IDX, System.currentTimeMillis());
});

pruneExcessiveBlobSidecars();
checkCompletion();

return true;
}

public void enableBlockImportOnCompletion(final BlockImportChannel blockImportChannel) {
final boolean alreadyEnabled = blockImportOnCompletionEnabled.getAndSet(true);
if (alreadyEnabled) {
Expand Down Expand Up @@ -279,7 +310,13 @@ private Optional<Integer> getBlobKzgCommitmentsCount() {
b ->
BeaconBlockBodyDeneb.required(b.getMessage().getBody())
.getOptionalBlobKzgCommitments()
.map(SszList::size));
.map(SszList::size))
// ePBS
.or(
() ->
executionPayloadEnvelope
.get()
.map(envelope -> envelope.getBlobKzgCommitments().size()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadEnvelope;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin;

Expand Down Expand Up @@ -73,6 +74,13 @@ public BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker(
throw new UnsupportedOperationException();
}

@Override
public BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker(
final SignedBeaconBlock block,
final ExecutionPayloadEnvelope executionPayloadEnvelope) {
throw new UnsupportedOperationException();
}

@Override
public Optional<BlockBlobSidecarsTracker> getBlockBlobSidecarsTracker(
final SignedBeaconBlock block) {
Expand Down Expand Up @@ -127,6 +135,10 @@ public void subscribeNewBlobSidecar(NewBlobSidecarSubscriber newBlobSidecarSubsc

BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker(SignedBeaconBlock block);

// ePBS
BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker(
SignedBeaconBlock block, ExecutionPayloadEnvelope executionPayloadEnvelope);

Optional<BlockBlobSidecarsTracker> getBlockBlobSidecarsTracker(SignedBeaconBlock block);

void enableBlockImportOnCompletion(SignedBeaconBlock block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ private SafeFuture<Void> onExecutionPayload(
final ForkChoicePayloadExecutorEip7732 payloadExecutor =
ForkChoicePayloadExecutorEip7732.create(executionLayer, block.getSlot());
final BlobSidecarsAvailabilityChecker blobSidecarsAvailabilityChecker =
blobSidecarManager.createAvailabilityChecker(block);
blobSidecarManager.createAvailabilityChecker(block, signedEnvelope.getMessage());

blobSidecarsAvailabilityChecker.initiateDataAvailabilityCheck();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,15 @@ static Supplier<List<KZGCommitment>> createLazyKzgCommitmentsSupplier(
.flatMap(SignedBeaconBlock::getBeaconBlock)
.map(BeaconBlock::getBody)
.orElseThrow())
.getBlobKzgCommitments()
.getOptionalBlobKzgCommitments()
// ePBS
.orElseGet(
() ->
availabilityChecker
.blockBlobSidecarsTracker
.getExecutionPayloadEnvelope()
.orElseThrow()
.getBlobKzgCommitments())
.stream()
.map(SszKZGCommitment::getKZGCommitment)
.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb;
import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadEnvelope;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment;
import tech.pegasys.teku.spec.datastructures.type.SszKZGProof;
Expand All @@ -72,6 +73,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
private static final Logger LOG = LogManager.getLogger();

static final String COUNTER_BLOCK_TYPE = "block";
static final String COUNTER_EXECUTION_PAYLOAD_ENVELOPE_TYPE = "execution_payload_envelope";
static final String COUNTER_SIDECAR_TYPE = "blob_sidecar";

static final String COUNTER_GOSSIP_SUBTYPE = "gossip";
Expand Down Expand Up @@ -290,6 +292,13 @@ public synchronized BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker
return internalOnNewBlock(block, Optional.empty());
}

@Override
public BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker(
final SignedBeaconBlock block, final ExecutionPayloadEnvelope executionPayloadEnvelope) {
return internalOnNewBlockAndExecutionPayloadEnvelope(
block, executionPayloadEnvelope, Optional.empty());
}

@Override
public synchronized Optional<BlockBlobSidecarsTracker> getBlockBlobSidecarsTracker(
final SignedBeaconBlock block) {
Expand Down Expand Up @@ -511,6 +520,62 @@ private BlockBlobSidecarsTracker internalOnNewBlock(
return tracker;
}

private BlockBlobSidecarsTracker internalOnNewBlockAndExecutionPayloadEnvelope(
final SignedBeaconBlock block,
final ExecutionPayloadEnvelope executionPayloadEnvelope,
final Optional<RemoteOrigin> remoteOrigin) {
final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot();

final BlockBlobSidecarsTracker tracker =
getOrCreateBlobSidecarsTracker(
slotAndBlockRoot,
newTracker -> {
newTracker.setBlockAndExecutionPayloadEnvelope(block, executionPayloadEnvelope);
countBlock(remoteOrigin);
onFirstSeen(slotAndBlockRoot);
},
existingTracker -> {
if (!existingTracker.setBlockAndExecutionPayloadEnvelope(
block, executionPayloadEnvelope)) {
// block and execution envelope were already set
countDuplicateBlock(remoteOrigin);
return;
}

countBlock(remoteOrigin);

if (existingTracker.isRpcFetchTriggered()) {
// block has been set for the first time and we previously triggered fetching of
// missing blobSidecars. So we may have requested to fetch more sidecars
// than the block actually requires. Let's drop them.
existingTracker
.getUnusedBlobSidecarsForBlock()
.forEach(
blobIdentifier ->
requiredBlobSidecarDroppedSubscribers.deliver(
RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped,
blobIdentifier));

// if we attempted to fetch via RPC, we missed the opportunity to complete the
// tracker via local EL (local El fetch requires the block to be known)
// Let's try now
if (!existingTracker.isLocalElFetchTriggered() && !existingTracker.isCompleted()) {
fetchMissingContentFromLocalEL(slotAndBlockRoot)
.finish(
error ->
LOG.error(
"An occurred while attempting to fetch blobs via local EL"));
}
}
});

if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) {
sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL);
}

return tracker;
}

private void countBlock(final Optional<RemoteOrigin> maybeRemoteOrigin) {
maybeRemoteOrigin.ifPresent(
remoteOrigin -> {
Expand Down Expand Up @@ -571,13 +636,13 @@ private void onFirstSeen(final SlotAndBlockRoot slotAndBlockRoot) {
asyncRunner
.runAfterDelay(
() ->
this.fetchMissingContentFromLocalEL(slotAndBlockRoot)
fetchMissingContentFromLocalEL(slotAndBlockRoot)
.handleException(
error ->
LOG.warn(
"Local EL blobs lookup failed: {}",
ExceptionUtils.getMessage(error)))
.thenRun(() -> this.fetchMissingContentFromRemotePeers(slotAndBlockRoot)),
.thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot)),
fetchDelay)
.finish(
error ->
Expand Down Expand Up @@ -639,19 +704,26 @@ private synchronized SafeFuture<Void> fetchMissingContentFromLocalEL(
specVersion.getSchemaDefinitions().toVersionDeneb().orElseThrow().getBlobSidecarSchema();
final MiscHelpersDeneb miscHelpersDeneb =
specVersion.miscHelpers().toVersionDeneb().orElseThrow();
final SignedBeaconBlockHeader signedBeaconBlockHeader =
blockBlobSidecarsTracker.getBlock().get().asHeader();
final BeaconBlockBodyDeneb beaconBlockBodyDeneb =
final BeaconBlockBodyDeneb beaconBlockBody =
blockBlobSidecarsTracker
.getBlock()
.get()
.getMessage()
.getBody()
.toVersionDeneb()
.orElseThrow();
final SignedBeaconBlockHeader signedBeaconBlockHeader =
blockBlobSidecarsTracker.getBlock().get().asHeader();

final SszList<SszKZGCommitment> sszKZGCommitments =
beaconBlockBodyDeneb.getBlobKzgCommitments();
beaconBlockBody
.getOptionalBlobKzgCommitments()
.orElseGet(
() ->
blockBlobSidecarsTracker
.getExecutionPayloadEnvelope()
.orElseThrow()
.getBlobKzgCommitments());

final List<BlobIdentifier> missingBlobsIdentifiers =
blockBlobSidecarsTracker.getMissingBlobSidecars().toList();
Expand Down Expand Up @@ -687,13 +759,15 @@ private synchronized SafeFuture<Void> fetchMissingContentFromLocalEL(
continue;
}

final BlobIdentifier blobIdentifier = missingBlobsIdentifiers.get(index);
final BlobSidecar blobSidecar =
createBlobSidecarFromBlobAndProof(
blobSidecarSchema,
miscHelpersDeneb,
missingBlobsIdentifiers.get(index),
blobIdentifier,
blobAndProof.get(),
beaconBlockBodyDeneb,
sszKZGCommitments.get(blobIdentifier.getIndex().intValue()),
beaconBlockBody,
signedBeaconBlockHeader);

onNewBlobSidecar(blobSidecar, LOCAL_EL);
Expand All @@ -706,12 +780,10 @@ private BlobSidecar createBlobSidecarFromBlobAndProof(
final MiscHelpersDeneb miscHelpersDeneb,
final BlobIdentifier blobIdentifier,
final BlobAndProof blobAndProof,
final SszKZGCommitment sszKZGCommitment,
final BeaconBlockBodyDeneb beaconBlockBodyDeneb,
final SignedBeaconBlockHeader signedBeaconBlockHeader) {

final SszKZGCommitment sszKZGCommitment =
beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue());

return blobSidecarSchema.create(
blobIdentifier.getIndex(),
blobAndProof.blob(),
Expand Down

0 comments on commit 94440f3

Please sign in to comment.