Skip to content

Commit

Permalink
Change BlobSidecarGossipManager to use the new BlobSidecar type
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Nov 13, 2023
1 parent 657cc3f commit 6e081fc
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.validator.coordinator.publisher;

import java.util.List;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
Expand Down Expand Up @@ -62,7 +63,8 @@ protected SafeFuture<BlockImportAndBroadcastValidationResults> importBlock(

@Override
void publishBlock(final SignedBlockContainer blockContainer) {
blockContainer.getSignedBlobSidecars().ifPresent(blobSidecarGossipChannel::publishBlobSidecars);
// TODO: publish blob sidecars with inclusion proof
blobSidecarGossipChannel.publishBlobSidecars(List.of());
blockGossipChannel.publishBlock(blockContainer.getSignedBlock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,8 @@ public void sendSignedBlock_shouldConvertBlockContentsSuccessfulResult() {
final SafeFuture<SendSignedBlockResult> result =
validatorApiHandler.sendSignedBlock(blockContents, NOT_REQUIRED);

verify(blobSidecarGossipChannel).publishBlobSidecars(blobSidecars);
// TODO: fix assertion for blob sidecars
verify(blobSidecarGossipChannel).publishBlobSidecars(List.of());
verify(blobSidecarPool).onCompletedBlockAndSignedBlobSidecars(block, blobSidecars);
verify(blockGossipChannel).publishBlock(block);
verify(blockImportChannel).importBlock(block, NOT_REQUIRED);
Expand All @@ -861,7 +862,8 @@ public void sendSignedBlock_shouldConvertBlockContentsFailedResult() {
final SafeFuture<SendSignedBlockResult> result =
validatorApiHandler.sendSignedBlock(blockContents, NOT_REQUIRED);

verify(blobSidecarGossipChannel).publishBlobSidecars(blobSidecars);
// TODO: fix assertion for blob sidecars
verify(blobSidecarGossipChannel).publishBlobSidecars(List.of());
verify(blobSidecarPool).onCompletedBlockAndSignedBlobSidecars(block, blobSidecars);
verify(blockGossipChannel).publishBlock(block);
verify(blockImportChannel).importBlock(block, NOT_REQUIRED);
Expand All @@ -884,7 +886,8 @@ public void sendSignedBlockForDeneb_shouldConvertBlockContentsKnownBlockResult()
final SafeFuture<SendSignedBlockResult> result =
validatorApiHandler.sendSignedBlock(blockContents, NOT_REQUIRED);

verify(blobSidecarGossipChannel).publishBlobSidecars(blobSidecars);
// TODO: fix assertion for blob sidecars
verify(blobSidecarGossipChannel).publishBlobSidecars(List.of());
verify(blobSidecarPool).onCompletedBlockAndSignedBlobSidecars(block, blobSidecars);
verify(blockGossipChannel).publishBlock(block);
verify(blockImportChannel).importBlock(block, NOT_REQUIRED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.kzg.KZGCommitment;
import tech.pegasys.teku.kzg.KZGProof;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
Expand Down Expand Up @@ -60,6 +61,13 @@ public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
new UnsupportedOperationException("Not available in fork choice reference tests"));
}

@Override
public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
final BlobSidecar blobSidecar, final Optional<UInt64> arrivalTimestamp) {
return SafeFuture.failedFuture(
new UnsupportedOperationException("Not available in fork choice reference tests"));
}

@Override
public void prepareForBlockImport(final BlobSidecarOld blobSidecar) {
// NOOP
Expand Down
11 changes: 4 additions & 7 deletions ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import tech.pegasys.teku.spec.constants.Domain;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlockHeader;
Expand Down Expand Up @@ -940,13 +940,10 @@ public Optional<Integer> getMaxBlobsPerBlock(final UInt64 slot) {
return getSpecConfigDeneb(slot).map(SpecConfigDeneb::getMaxBlobsPerBlock);
}

public UInt64 computeSubnetForBlobSidecar(final SignedBlobSidecarOld signedBlobSidecar) {
final SpecConfig config = atSlot(signedBlobSidecar.getSlot()).getConfig();
public UInt64 computeSubnetForBlobSidecar(final BlobSidecar blobSidecar) {
final SpecConfig config = atSlot(blobSidecar.getSlot()).getConfig();
final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config);
return signedBlobSidecar
.getBlobSidecar()
.getIndex()
.mod(specConfigDeneb.getBlobSidecarSubnetCount());
return blobSidecar.getIndex().mod(specConfigDeneb.getBlobSidecarSubnetCount());
}

public Optional<UInt64> computeFirstSlotWithBlobSupport() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2288,10 +2288,6 @@ public BlockContents randomBlockContents(final UInt64 slot) {
.create(beaconBlock, blobSidecarList);
}

public SignedBlobSidecarOld randomSignedBlobSidecar(final UInt64 index) {
return new RandomBlobSidecarOldBuilder().index(index).buildSigned();
}

public RandomBlobSidecarOldBuilder createRandomBlobSidecarBuilderOld() {
return new RandomBlobSidecarOldBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
Expand All @@ -34,6 +35,12 @@ public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
return SafeFuture.completedFuture(InternalValidationResult.ACCEPT);
}

@Override
public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
final BlobSidecar blobSidecar, final Optional<UInt64> arrivalTimestamp) {
return SafeFuture.completedFuture(InternalValidationResult.ACCEPT);
}

@Override
public void prepareForBlockImport(final BlobSidecarOld blobSidecar) {}

Expand All @@ -59,9 +66,13 @@ public BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmed
}
};

@Deprecated
SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
SignedBlobSidecarOld signedBlobSidecar, Optional<UInt64> arrivalTimestamp);

SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
BlobSidecar blobSidecar, Optional<UInt64> arrivalTimestamp);

void prepareForBlockImport(BlobSidecarOld blobSidecar);

void subscribeToReceivedBlobSidecar(ReceivedBlobSidecarListener receivedBlobSidecarListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.kzg.KZG;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
Expand Down Expand Up @@ -106,6 +107,12 @@ public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
return validationResult;
}

@Override
public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
final BlobSidecar blobSidecar, final Optional<UInt64> arrivalTimestamp) {
throw new UnsupportedOperationException("Not yet implemented");
}

@Override
public void prepareForBlockImport(final BlobSidecarOld blobSidecar) {
blobSidecarPool.onNewBlobSidecar(blobSidecar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.Constants;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
Expand Down Expand Up @@ -96,7 +96,7 @@ public class Eth2P2PNetworkBuilder {
protected EventChannels eventChannels;
protected CombinedChainDataClient combinedChainDataClient;
protected OperationProcessor<SignedBeaconBlock> gossipedBlockProcessor;
protected OperationProcessor<SignedBlobSidecarOld> gossipedBlobSidecarProcessor;
protected OperationProcessor<BlobSidecar> gossipedBlobSidecarProcessor;
protected OperationProcessor<ValidatableAttestation> gossipedAttestationConsumer;
protected OperationProcessor<ValidatableAttestation> gossipedAggregateProcessor;
protected OperationProcessor<AttesterSlashing> gossipedAttesterSlashingConsumer;
Expand Down Expand Up @@ -444,7 +444,7 @@ public Eth2P2PNetworkBuilder gossipedBlockProcessor(
}

public Eth2P2PNetworkBuilder gossipedBlobSidecarProcessor(
final OperationProcessor<SignedBlobSidecarOld> blobSidecarProcessor) {
final OperationProcessor<BlobSidecar> blobSidecarProcessor) {
checkNotNull(blobSidecarProcessor);
this.gossipedBlobSidecarProcessor = blobSidecarProcessor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@

import java.util.List;
import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;

public interface BlobSidecarGossipChannel extends VoidReturningChannelInterface {

BlobSidecarGossipChannel NOOP = blobSidecar -> {};

default void publishBlobSidecars(final List<SignedBlobSidecarOld> blobSidecars) {
default void publishBlobSidecars(final List<BlobSidecar> blobSidecars) {
blobSidecars.forEach(this::publishBlobSidecar);
}

void publishBlobSidecar(SignedBlobSidecarOld blobSidecar);
void publishBlobSidecar(BlobSidecar blobSidecar);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarSchemaOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema;
import tech.pegasys.teku.spec.datastructures.state.ForkInfo;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
Expand All @@ -43,7 +43,7 @@ public class BlobSidecarGossipManager implements GossipManager {
private final Spec spec;
private final GossipNetwork gossipNetwork;
private final GossipEncoding gossipEncoding;
private final Int2ObjectMap<Eth2TopicHandler<SignedBlobSidecarOld>> subnetIdToTopicHandler;
private final Int2ObjectMap<Eth2TopicHandler<BlobSidecar>> subnetIdToTopicHandler;

private final Int2ObjectMap<TopicChannel> subnetIdToChannel = new Int2ObjectOpenHashMap<>();

Expand All @@ -54,18 +54,18 @@ public static BlobSidecarGossipManager create(
final GossipNetwork gossipNetwork,
final GossipEncoding gossipEncoding,
final ForkInfo forkInfo,
final OperationProcessor<SignedBlobSidecarOld> processor) {
final OperationProcessor<BlobSidecar> processor) {
final SpecVersion forkSpecVersion = spec.atEpoch(forkInfo.getFork().getEpoch());
final SignedBlobSidecarSchemaOld gossipType =
final BlobSidecarSchema gossipType =
SchemaDefinitionsDeneb.required(forkSpecVersion.getSchemaDefinitions())
.getSignedBlobSidecarOldSchema();
final Int2ObjectMap<Eth2TopicHandler<SignedBlobSidecarOld>> subnetIdToTopicHandler =
.getBlobSidecarSchema();
final Int2ObjectMap<Eth2TopicHandler<BlobSidecar>> subnetIdToTopicHandler =
new Int2ObjectOpenHashMap<>();
final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(forkSpecVersion.getConfig());
IntStream.range(0, specConfigDeneb.getBlobSidecarSubnetCount())
.forEach(
subnetId -> {
final Eth2TopicHandler<SignedBlobSidecarOld> topicHandler =
final Eth2TopicHandler<BlobSidecar> topicHandler =
createBlobSidecarTopicHandler(
subnetId,
recentChainData,
Expand All @@ -85,21 +85,21 @@ private BlobSidecarGossipManager(
final Spec spec,
final GossipNetwork gossipNetwork,
final GossipEncoding gossipEncoding,
final Int2ObjectMap<Eth2TopicHandler<SignedBlobSidecarOld>> subnetIdToTopicHandler) {
final Int2ObjectMap<Eth2TopicHandler<BlobSidecar>> subnetIdToTopicHandler) {
this.spec = spec;
this.gossipNetwork = gossipNetwork;
this.gossipEncoding = gossipEncoding;
this.subnetIdToTopicHandler = subnetIdToTopicHandler;
}

public void publishBlobSidecar(final SignedBlobSidecarOld message) {
public void publishBlobSidecar(final BlobSidecar message) {
final int subnetId = spec.computeSubnetForBlobSidecar(message).intValue();
Optional.ofNullable(subnetIdToChannel.get(subnetId))
.ifPresent(channel -> channel.gossip(gossipEncoding.encode(message)));
}

@VisibleForTesting
Eth2TopicHandler<SignedBlobSidecarOld> getTopicHandler(final int subnetId) {
Eth2TopicHandler<BlobSidecar> getTopicHandler(final int subnetId) {
return subnetIdToTopicHandler.get(subnetId);
}

Expand All @@ -109,7 +109,7 @@ public void subscribe() {
.int2ObjectEntrySet()
.forEach(
entry -> {
final Eth2TopicHandler<SignedBlobSidecarOld> topicHandler = entry.getValue();
final Eth2TopicHandler<BlobSidecar> topicHandler = entry.getValue();
final TopicChannel channel =
gossipNetwork.subscribe(topicHandler.getTopic(), topicHandler);
subnetIdToChannel.put(entry.getIntKey(), channel);
Expand All @@ -127,15 +127,15 @@ public boolean isEnabledDuringOptimisticSync() {
return true;
}

private static Eth2TopicHandler<SignedBlobSidecarOld> createBlobSidecarTopicHandler(
private static Eth2TopicHandler<BlobSidecar> createBlobSidecarTopicHandler(
final int subnetId,
final RecentChainData recentChainData,
final Spec spec,
final AsyncRunner asyncRunner,
final OperationProcessor<SignedBlobSidecarOld> processor,
final OperationProcessor<BlobSidecar> processor,
final GossipEncoding gossipEncoding,
final ForkInfo forkInfo,
final SignedBlobSidecarSchemaOld gossipType) {
final BlobSidecarSchema gossipType) {
return new Eth2TopicHandler<>(
recentChainData,
asyncRunner,
Expand All @@ -146,30 +146,18 @@ private static Eth2TopicHandler<SignedBlobSidecarOld> createBlobSidecarTopicHand
new OperationMilestoneValidator<>(
spec,
forkInfo.getFork(),
blobSidecar -> spec.computeEpochAtSlot(blobSidecar.getBlobSidecar().getSlot())),
blobSidecar -> spec.computeEpochAtSlot(blobSidecar.getSlot())),
gossipType,
spec.getNetworkingConfig());
}

private static class TopicSubnetIdAwareOperationProcessor
implements OperationProcessor<SignedBlobSidecarOld> {

private final Spec spec;
private final int subnetId;
private final OperationProcessor<SignedBlobSidecarOld> delegate;

private TopicSubnetIdAwareOperationProcessor(
final Spec spec,
final int subnetId,
final OperationProcessor<SignedBlobSidecarOld> delegate) {
this.spec = spec;
this.subnetId = subnetId;
this.delegate = delegate;
}
private record TopicSubnetIdAwareOperationProcessor(
Spec spec, int subnetId, OperationProcessor<BlobSidecar> delegate)
implements OperationProcessor<BlobSidecar> {

@Override
public SafeFuture<InternalValidationResult> process(
final SignedBlobSidecarOld blobSidecar, final Optional<UInt64> arrivalTimestamp) {
final BlobSidecar blobSidecar, final Optional<UInt64> arrivalTimestamp) {
final int blobSidecarSubnet = spec.computeSubnetForBlobSidecar(blobSidecar).intValue();
if (blobSidecarSubnet != subnetId) {
return SafeFuture.completedFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
Expand Down Expand Up @@ -163,9 +163,9 @@ public synchronized void publishBlock(final SignedBeaconBlock block) {
publishMessage(block.getSlot(), block, "block", GossipForkSubscriptions::publishBlock);
}

public synchronized void publishBlobSidecar(final SignedBlobSidecarOld blobSidecar) {
public synchronized void publishBlobSidecar(final BlobSidecar blobSidecar) {
publishMessage(
blobSidecar.getBlobSidecar().getSlot(),
blobSidecar.getSlot(),
blobSidecar,
"blob sidecar",
GossipForkSubscriptions::publishBlobSidecar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
Expand All @@ -39,7 +39,7 @@ public interface GossipForkSubscriptions {

void publishBlock(SignedBeaconBlock block);

default void publishBlobSidecar(SignedBlobSidecarOld blobSidecar) {
default void publishBlobSidecar(BlobSidecar blobSidecar) {
// since Deneb
}

Expand Down
Loading

0 comments on commit 6e081fc

Please sign in to comment.