From 49d8d48cc3135a3bcf2d98dcfa898040715ec8b3 Mon Sep 17 00:00:00 2001 From: Mehdi AOUADI Date: Fri, 29 Nov 2024 10:04:59 +0100 Subject: [PATCH] Performance tracker (#8851) * use attestation bits aggregator in performance tracker * add electra tests * fix spotless * fix unit test * always clear objects * refactor bits aggregator or operation * revert bits aggregator or operation return type --- .../DefaultPerformanceTracker.java | 29 ++-- .../DefaultPerformanceTrackerTest.java | 132 ++++++++++++------ 2 files changed, 109 insertions(+), 52 deletions(-) diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/performance/DefaultPerformanceTracker.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/performance/DefaultPerformanceTracker.java index afce203d35f..381229ffca5 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/performance/DefaultPerformanceTracker.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/performance/DefaultPerformanceTracker.java @@ -16,6 +16,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; +import it.unimi.dsi.fastutil.ints.Int2IntMap; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntSet; @@ -42,7 +43,6 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.logging.StatusLogger; import tech.pegasys.teku.infrastructure.metrics.SettableGauge; -import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; @@ -51,6 +51,7 @@ import tech.pegasys.teku.spec.datastructures.operations.Attestation; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.statetransition.attestation.utils.AttestationBitsAggregator; import tech.pegasys.teku.storage.client.CombinedChainDataClient; import tech.pegasys.teku.validator.api.ValidatorPerformanceTrackingMode; import tech.pegasys.teku.validator.coordinator.ActiveValidatorTracker; @@ -169,6 +170,9 @@ private SafeFuture reportBlockPerformance(final UInt64 currentEpoch) { validatorPerformanceMetrics.updateBlockPerformanceMetrics(blockPerformance); } } + }) + .alwaysRun( + () -> { producedBlocksByEpoch.headMap(blockProductionEpoch, true).clear(); blockProductionAttemptsByEpoch.headMap(blockProductionEpoch, true).clear(); }); @@ -208,8 +212,8 @@ private SafeFuture reportAttestationPerformance(final UInt64 currentEpoch) { validatorPerformanceMetrics.updateAttestationPerformanceMetrics( attestationPerformance); } - producedAttestationsByEpoch.headMap(analyzedEpoch, true).clear(); - }); + }) + .alwaysRun(() -> producedAttestationsByEpoch.headMap(analyzedEpoch, true).clear()); } private SafeFuture getBlockPerformanceForEpoch(final UInt64 currentEpoch) { @@ -300,16 +304,23 @@ private AttestationPerformance calculateAttestationPerformance( // Pre-process attestations included on chain to group them by // data hash to inclusion slot to aggregation bitlist - final Map> slotAndBitlistsByAttestationDataHash = - new HashMap<>(); + final Map> + slotAndBitlistsByAttestationDataHash = new HashMap<>(); for (Map.Entry> entry : attestationsIncludedOnChain.entrySet()) { + final Optional committeesSize = + Optional.of(spec.getBeaconCommitteesSize(state, entry.getKey())); for (Attestation attestation : entry.getValue()) { final Bytes32 attestationDataHash = attestation.getData().hashTreeRoot(); - final NavigableMap slotToBitlists = + final NavigableMap slotToBitlists = slotAndBitlistsByAttestationDataHash.computeIfAbsent( attestationDataHash, __ -> new TreeMap<>()); slotToBitlists.merge( - entry.getKey(), attestation.getAggregationBits(), SszBitlist::nullableOr); + entry.getKey(), + AttestationBitsAggregator.of(attestation, committeesSize), + (firstBitsAggregator, secondBitsAggregator) -> { + firstBitsAggregator.or(secondBitsAggregator); + return firstBitsAggregator; + }); } } @@ -319,10 +330,10 @@ private AttestationPerformance calculateAttestationPerformance( if (!slotAndBitlistsByAttestationDataHash.containsKey(sentAttestationDataHash)) { continue; } - final NavigableMap slotAndBitlists = + final NavigableMap slotAndBitlists = slotAndBitlistsByAttestationDataHash.get(sentAttestationDataHash); for (UInt64 slot : slotAndBitlists.keySet()) { - if (slotAndBitlists.get(slot).isSuperSetOf(sentAttestation.getAggregationBits())) { + if (slotAndBitlists.get(slot).isSuperSetOf(sentAttestation)) { inclusionDistances.add(slot.minus(sentAttestationSlot).intValue()); break; } diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/performance/DefaultPerformanceTrackerTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/performance/DefaultPerformanceTrackerTest.java index 147f15568c8..1fee82b765d 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/performance/DefaultPerformanceTrackerTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/performance/DefaultPerformanceTrackerTest.java @@ -24,39 +24,37 @@ import static tech.pegasys.teku.validator.coordinator.performance.DefaultPerformanceTracker.ATTESTATION_INCLUSION_RANGE; import java.util.List; +import java.util.Optional; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import tech.pegasys.infrastructure.logging.LogCaptor; import tech.pegasys.teku.bls.BLSKeyGenerator; import tech.pegasys.teku.bls.BLSKeyPair; -import tech.pegasys.teku.bls.BLSTestUtil; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.logging.StatusLogger; import tech.pegasys.teku.infrastructure.metrics.SettableGauge; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.TestSpecContext; +import tech.pegasys.teku.spec.TestSpecInvocationContextProvider; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; import tech.pegasys.teku.spec.datastructures.operations.Attestation; import tech.pegasys.teku.spec.generator.AttestationGenerator; import tech.pegasys.teku.spec.generator.ChainBuilder; import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.storage.client.ChainHead; import tech.pegasys.teku.storage.client.ChainUpdater; +import tech.pegasys.teku.storage.client.CombinedChainDataClient; import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder; import tech.pegasys.teku.storage.storageSystem.StorageSystem; import tech.pegasys.teku.validator.api.ValidatorPerformanceTrackingMode; import tech.pegasys.teku.validator.coordinator.ActiveValidatorTracker; +@TestSpecContext(milestone = {SpecMilestone.PHASE0, SpecMilestone.ELECTRA}) public class DefaultPerformanceTrackerTest { private static final List VALIDATOR_KEYS = BLSKeyGenerator.generateKeyPairs(64); - private final Spec spec = TestSpecFactory.createMinimalPhase0(); - protected StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec); - protected ChainBuilder chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS); - protected ChainUpdater chainUpdater = - new ChainUpdater(storageSystem.recentChainData(), chainBuilder); - - private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); private final StatusLogger log = mock(StatusLogger.class); private final ActiveValidatorTracker validatorTracker = mock(ActiveValidatorTracker.class); private final SyncCommitteePerformanceTracker syncCommitteePerformanceTracker = @@ -64,19 +62,34 @@ public class DefaultPerformanceTrackerTest { private final ValidatorPerformanceMetrics validatorPerformanceMetrics = mock(ValidatorPerformanceMetrics.class); - private final DefaultPerformanceTracker performanceTracker = - new DefaultPerformanceTracker( - storageSystem.combinedChainDataClient(), - log, - validatorPerformanceMetrics, - ValidatorPerformanceTrackingMode.ALL, - validatorTracker, - syncCommitteePerformanceTracker, - spec, - mock(SettableGauge.class)); + private Spec spec; + private StorageSystem storageSystem; + private ChainBuilder chainBuilder; + private ChainUpdater chainUpdater; + private DataStructureUtil dataStructureUtil; + private DefaultPerformanceTracker performanceTracker; @BeforeEach - void beforeEach() { + void beforeEach(final TestSpecInvocationContextProvider.SpecContext specContext) { + spec = specContext.getSpec(); + dataStructureUtil = specContext.getDataStructureUtil(); + + storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec); + chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS); + + chainUpdater = new ChainUpdater(storageSystem.recentChainData(), chainBuilder, spec); + + performanceTracker = + new DefaultPerformanceTracker( + storageSystem.combinedChainDataClient(), + log, + validatorPerformanceMetrics, + ValidatorPerformanceTrackingMode.ALL, + validatorTracker, + syncCommitteePerformanceTracker, + spec, + mock(SettableGauge.class)); + when(validatorTracker.getNumberOfValidatorsForEpoch(any())).thenReturn(0); when(syncCommitteePerformanceTracker.calculatePerformance(any())) .thenReturn( @@ -85,7 +98,7 @@ void beforeEach() { performanceTracker.start(UInt64.ZERO); } - @Test + @TestTemplate void shouldDisplayPerfectBlockInclusion() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(10)); performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(1))); @@ -99,7 +112,7 @@ void shouldDisplayPerfectBlockInclusion() { verify(log).performance(expectedBlockPerformance.toString()); } - @Test + @TestTemplate void shouldDisplayBlockInclusionWhenProducedBlockIsChainHead() { final UInt64 lastSlot = spec.computeStartSlotAtEpoch(UInt64.ONE); final SignedBlockAndState bestBlock = chainUpdater.advanceChainUntil(2); @@ -111,7 +124,7 @@ void shouldDisplayBlockInclusionWhenProducedBlockIsChainHead() { verify(log).performance(expectedBlockPerformance.toString()); } - @Test + @TestTemplate void shouldDisplayOneMissedBlock() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(10)); performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(1))); @@ -128,7 +141,7 @@ void shouldDisplayOneMissedBlock() { verify(log).performance(expectedBlockPerformance.toString()); } - @Test + @TestTemplate void shouldDisplayPerfectAttestationInclusion() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1)); @@ -151,7 +164,7 @@ void shouldDisplayPerfectAttestationInclusion() { verify(log).performance(expectedAttestationPerformance.toString()); } - @Test + @TestTemplate void shouldDisplayInclusionDistanceOfMax2Min1() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1)); @@ -180,7 +193,7 @@ void shouldDisplayInclusionDistanceOfMax2Min1() { verify(log).performance(expectedAttestationPerformance.toString()); } - @Test + @TestTemplate void shouldDisplayIncorrectTargetRoot() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1)); @@ -219,7 +232,7 @@ void shouldDisplayIncorrectTargetRoot() { verify(log).performance(expectedAttestationPerformance.toString()); } - @Test + @TestTemplate void shouldDisplayIncorrectHeadBlockRoot() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1)); @@ -235,7 +248,9 @@ void shouldDisplayIncorrectHeadBlockRoot() { chainUpdater.saveBlock(blockAndState1); chainUpdater.updateBestBlock(blockAndState1); - SignedBlockAndState blockAndState = chainUpdaterFork.advanceChainUntil(8); + chainUpdaterFork.advanceChainUntil(7); + SignedBlockAndState blockAndState = chainBuilder.getBlockAndStateAtSlot(8); + chainUpdaterFork.updateBestBlock(blockAndState); ChainBuilder.BlockOptions block2Options = ChainBuilder.BlockOptions.create(); AttestationGenerator attestationGenerator = new AttestationGenerator(spec, chainBuilder.getValidatorKeys()); @@ -258,7 +273,7 @@ void shouldDisplayIncorrectHeadBlockRoot() { verify(log).performance(expectedAttestationPerformance.toString()); } - @Test + @TestTemplate void shouldClearOldSentObjects() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(10)); performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(1))); @@ -267,20 +282,51 @@ void shouldClearOldSentObjects() { chainUpdater.chainBuilder.getBlockAtSlot(1).getSlotAndBlockRoot()); performanceTracker.saveProducedBlock( chainUpdater.chainBuilder.getBlockAtSlot(2).getSlotAndBlockRoot()); - performanceTracker.saveProducedAttestation( - spec.getGenesisSchemaDefinitions() - .getAttestationSchema() - .create( - dataStructureUtil.randomBitlist(), - dataStructureUtil.randomAttestationData(UInt64.ONE), - BLSTestUtil.randomSignature(0))); + performanceTracker.saveProducedAttestation(dataStructureUtil.randomAttestation(UInt64.ONE)); + performanceTracker.onSlot(spec.computeStartSlotAtEpoch(UInt64.valueOf(2))); + assertThat(performanceTracker.producedAttestationsByEpoch).isEmpty(); + assertThat(performanceTracker.producedBlocksByEpoch).isEmpty(); + assertThat(performanceTracker.blockProductionAttemptsByEpoch).isEmpty(); + } + + @TestTemplate + void shouldClearObjectsAfterFailure() { + chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(10)); + final CombinedChainDataClient combinedChainDataClientMock = mock(CombinedChainDataClient.class); + // Make the attestation performance calculation fail + when(combinedChainDataClientMock.getBestState()) + .thenReturn(Optional.of(SafeFuture.failedFuture(new IllegalArgumentException("failure")))); + final ChainHead chainHeadMock = mock(ChainHead.class); + // Make the block performance calculation fail + when(chainHeadMock.asStateAndBlockSummary()) + .thenReturn(SafeFuture.failedFuture(new IllegalArgumentException("failure"))); + when(combinedChainDataClientMock.getChainHead()).thenReturn(Optional.of(chainHeadMock)); + performanceTracker = + new DefaultPerformanceTracker( + combinedChainDataClientMock, + log, + validatorPerformanceMetrics, + ValidatorPerformanceTrackingMode.ALL, + validatorTracker, + syncCommitteePerformanceTracker, + spec, + mock(SettableGauge.class)); + performanceTracker.start(UInt64.ZERO); + performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(1))); + performanceTracker.reportBlockProductionAttempt(spec.computeEpochAtSlot(UInt64.valueOf(2))); + performanceTracker.saveProducedBlock( + chainUpdater.chainBuilder.getBlockAtSlot(1).getSlotAndBlockRoot()); + performanceTracker.saveProducedBlock( + chainUpdater.chainBuilder.getBlockAtSlot(2).getSlotAndBlockRoot()); + performanceTracker.saveProducedAttestation(dataStructureUtil.randomAttestation(UInt64.ZERO)); + performanceTracker.saveProducedAttestation(dataStructureUtil.randomAttestation(UInt64.ONE)); performanceTracker.onSlot(spec.computeStartSlotAtEpoch(UInt64.valueOf(2))); assertThat(performanceTracker.producedAttestationsByEpoch).isEmpty(); assertThat(performanceTracker.producedBlocksByEpoch).isEmpty(); assertThat(performanceTracker.blockProductionAttemptsByEpoch).isEmpty(); } - @Test + @TestTemplate void shouldNotCountDuplicateAttestationsIncludedOnChain() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1)); @@ -308,7 +354,7 @@ void shouldNotCountDuplicateAttestationsIncludedOnChain() { verify(log).performance(expectedAttestationPerformance.toString()); } - @Test + @TestTemplate void shouldNotSkipValidationForAttestationsWithSameDataButDifferentBitlists() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1)); @@ -342,7 +388,7 @@ void shouldNotSkipValidationForAttestationsWithSameDataButDifferentBitlists() { verify(log).performance(expectedAttestationPerformance.toString()); } - @Test + @TestTemplate void shouldReportExpectedAttestationOnlyForTheGivenEpoch() { when(validatorTracker.getNumberOfValidatorsForEpoch(UInt64.valueOf(2))).thenReturn(2); when(validatorTracker.getNumberOfValidatorsForEpoch(UInt64.valueOf(3))).thenReturn(1); @@ -354,14 +400,14 @@ void shouldReportExpectedAttestationOnlyForTheGivenEpoch() { verify(log).performance(expectedAttestationPerformance.toString()); } - @Test + @TestTemplate void shouldNotReportAttestationPerformanceIfNoValidatorsInEpoch() { when(validatorTracker.getNumberOfValidatorsForEpoch(UInt64.valueOf(2))).thenReturn(0); performanceTracker.onSlot(spec.computeStartSlotAtEpoch(ATTESTATION_INCLUSION_RANGE.plus(2))); verify(log, never()).performance(anyString()); } - @Test + @TestTemplate void shouldReportSyncCommitteePerformance() { final UInt64 epoch = UInt64.valueOf(2); final SyncCommitteePerformance performance = new SyncCommitteePerformance(epoch, 10, 9, 8, 7); @@ -373,7 +419,7 @@ void shouldReportSyncCommitteePerformance() { verify(validatorPerformanceMetrics).updateSyncCommitteePerformance(performance); } - @Test + @TestTemplate void shouldHandleErrorsWhenReportTasksFail() { chainUpdater.updateBestBlock(chainUpdater.advanceChainUntil(1)); final Attestation attestation = createAttestationForParentBlockOnSlot(1);