Skip to content

Commit

Permalink
Refactor code to move one block's worth of state/storage changes at a…
Browse files Browse the repository at this point in the history
… time. Add more tests

Signed-off-by: Matthew Whitehead <[email protected]>
  • Loading branch information
matthew1001 committed Oct 2, 2024
1 parent 518bb6d commit 5752732
Show file tree
Hide file tree
Showing 3 changed files with 1,198 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogManager;
import org.hyperledger.besu.plugin.services.trielogs.TrieLog;

import java.util.Comparator;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,12 +48,12 @@ public class BonsaiArchiveFreezer implements BlockAddedObserver {
private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage;
private final Blockchain blockchain;
private final Consumer<Runnable> executeAsync;
private static final int PRELOAD_LIMIT = 1000;
private static final int CATCHUP_LIMIT = 1000;
private static final int DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE = 10;
private final TrieLogManager trieLogManager;

private final Multimap<Long, Hash> blocksToMoveToFreezer =
TreeMultimap.create(Comparator.reverseOrder(), Comparator.naturalOrder());
private final Map<Long, Hash> pendingBlocksToArchive =
Collections.synchronizedMap(new TreeMap<>());

public BonsaiArchiveFreezer(
final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage,
Expand All @@ -67,9 +66,7 @@ public BonsaiArchiveFreezer(
this.trieLogManager = trieLogManager;
}

public void initialize() {
// On startup there will be recent blocks whose state and storage hasn't been archived yet.
// Pre-load them ready for freezing state once enough new blocks have been added to the chain.
private void preloadCatchupBlocks() {
Optional<Long> frozenBlocksHead = Optional.empty();

Optional<Long> latestFrozenBlock = rootWorldStateStorage.getLatestArchiveFrozenBlock();
Expand All @@ -86,7 +83,7 @@ public void initialize() {
if (frozenBlocksHead.isPresent()) {
int preLoadedBlocks = 0;
Optional<Block> nextBlock = blockchain.getBlockByNumber(frozenBlocksHead.get());
for (int i = 0; i < PRELOAD_LIMIT; i++) {
for (int i = 0; i < CATCHUP_LIMIT; i++) {
if (nextBlock.isPresent()) {
addToFreezerQueue(
nextBlock.get().getHeader().getNumber(), nextBlock.get().getHeader().getHash());
Expand All @@ -97,13 +94,27 @@ public void initialize() {
}
}
LOG.atInfo()
.setMessage("Preloaded {} blocks to move their state and storage to the archive freezer")
.setMessage(
"Preloaded {} blocks from {} to move their state and storage to the archive freezer")
.addArgument(preLoadedBlocks)
.addArgument(frozenBlocksHead.get())
.log();
}
}

public void initialize() {
// On startup there will be recent blocks whose state and storage hasn't been archived yet.
// Pre-load them ready for freezing state once enough new blocks have been added to the chain.
preloadCatchupBlocks();

// Keep catching up until we move less to the freezer than the catchup limit
while (moveBlockStateToFreezer() == CATCHUP_LIMIT) {
preloadCatchupBlocks();
}
}

// Start processing any backlog on startup - don't wait for a new block to be imported.
moveBlockStateToFreezer();
public int getPendingBlocksCount() {
return pendingBlocksToArchive.size();
}

public synchronized void addToFreezerQueue(final long blockNumber, final Hash blockHash) {
Expand All @@ -113,10 +124,17 @@ public synchronized void addToFreezerQueue(final long blockNumber, final Hash bl
.addArgument(blockNumber)
.addArgument(blockHash)
.log();
blocksToMoveToFreezer.put(blockNumber, blockHash);
pendingBlocksToArchive.put(blockNumber, blockHash);
}

private synchronized void removeArchivedFromQueue(final Map<Long, Hash> archivedBlocks) {
archivedBlocks.keySet().forEach(e -> pendingBlocksToArchive.remove(e));
}

public synchronized int moveBlockStateToFreezer() {
// Move state and storage entries from their primary DB segments to the freezer segments. This is
// intended to maintain good performance for new block imports by keeping the primary DB segments
// to live state only. Returns the number of state and storage entries moved.
public int moveBlockStateToFreezer() {
final long retainAboveThisBlock =
blockchain.getChainHeadBlockNumber() - DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE;

Expand All @@ -135,100 +153,99 @@ public synchronized int moveBlockStateToFreezer() {
.addArgument(retainAboveThisBlock)
.log();

final var accountsToMove =
blocksToMoveToFreezer.asMap().entrySet().stream()
.dropWhile((e) -> e.getKey() > retainAboveThisBlock);
// Typically we will move all storage and state for a single block i.e. when a new block is
// imported, move state for block-N. There are cases where we catch-up and move old state
// for a number of blocks so we may iterate over a number of blocks freezing their state,
// not just a single one.

final Multimap<Long, Hash> accountStateFreezerActionsComplete = ArrayListMultimap.create();
final Multimap<Long, Hash> accountStorageFreezerActionsComplete = ArrayListMultimap.create();
final Map<Long, Hash> blocksToFreeze = new TreeMap<>();
pendingBlocksToArchive.entrySet().stream()
.filter((e) -> e.getKey() <= retainAboveThisBlock)
.forEach(
(e) -> {
blocksToFreeze.put(e.getKey(), e.getValue());
});

// Determine which world state keys have changed in the last N blocks by looking at the
// trie logs for the blocks. Then move the old keys to the freezer segment (if and only if they
// have changed)
accountsToMove
.parallel()
blocksToFreeze
.entrySet()
.forEach(
(block) -> {
for (Hash blockHash : block.getValue()) {
Optional<TrieLog> trieLog = trieLogManager.getTrieLogLayer(blockHash);
if (trieLog.isPresent()) {
trieLog
.get()
.getAccountChanges()
.forEach(
(address, change) -> {
// Move any previous state for this account
frozenAccountStateCount.addAndGet(
rootWorldStateStorage.freezePreviousAccountState(
blockchain.getBlockHeader(
blockchain.getBlockHeader(blockHash).get().getParentHash()),
address.addressHash()));
});
}
accountStateFreezerActionsComplete.put(block.getKey(), blockHash);
if (pendingBlocksToArchive.size() > 0 && pendingBlocksToArchive.size() % 100 == 0) {
// Log progress in case catching up causes there to be a large number of keys
// to move
LOG.atInfo()
.setMessage("state for blocks {} to {} archived")
.addArgument(block.getKey())
.addArgument(block.getKey() + pendingBlocksToArchive.size())
.log();
}
});

final var storageToMove =
blocksToMoveToFreezer.asMap().entrySet().stream()
.dropWhile((e) -> e.getKey() > retainAboveThisBlock);

storageToMove
.parallel()
.forEach(
(block) -> {
for (Hash blockHash : block.getValue()) {
Optional<TrieLog> trieLog = trieLogManager.getTrieLogLayer(blockHash);
if (trieLog.isPresent()) {
trieLog
.get()
.getStorageChanges()
.forEach(
(address, storageSlotKey) -> {
storageSlotKey.forEach(
(slotKey, slotValue) -> {
// Move any previous state for this account
frozenAccountStorageCount.addAndGet(
rootWorldStateStorage.freezePreviousStorageState(
blockchain.getBlockHeader(
blockchain
.getBlockHeader(blockHash)
.get()
.getParentHash()),
Bytes.concatenate(
address.addressHash(), slotKey.getSlotHash())));
});
});
}
accountStorageFreezerActionsComplete.put(block.getKey(), blockHash);
Hash blockHash = block.getValue();
LOG.atDebug()
.setMessage("Freezing all account state for block {}")
.addArgument(block.getKey())
.log();
Optional<TrieLog> trieLog = trieLogManager.getTrieLogLayer(blockHash);
if (trieLog.isPresent()) {
trieLog
.get()
.getAccountChanges()
.forEach(
(address, change) -> {
// Move any previous state for this account
frozenAccountStateCount.addAndGet(
rootWorldStateStorage.freezePreviousAccountState(
blockchain.getBlockHeader(
blockchain.getBlockHeader(blockHash).get().getParentHash()),
address.addressHash()));
});
LOG.atDebug()
.setMessage("Freezing all storage state for block {}")
.addArgument(block.getKey())
.log();
trieLog
.get()
.getStorageChanges()
.forEach(
(address, storageSlotKey) -> {
storageSlotKey.forEach(
(slotKey, slotValue) -> {
// Move any previous state for this account
frozenAccountStorageCount.addAndGet(
rootWorldStateStorage.freezePreviousStorageState(
blockchain.getBlockHeader(
blockchain
.getBlockHeader(blockHash)
.get()
.getParentHash()),
Bytes.concatenate(
address.addressHash(), slotKey.getSlotHash())));
});
});
}
LOG.atDebug()
.setMessage("All account state and storage frozen for block {}")
.addArgument(block.getKey())
.log();
rootWorldStateStorage.setLatestArchiveFrozenBlock(block.getKey());
});

// For us to consider all state and storage changes for a block complete, it must have been
// recorded in both accountState and accountStorage lists. If only one finished we need to try
// freezing state/storage for that block again on the next loop
AtomicInteger frozenBlocksCompleted = new AtomicInteger();
accountStateFreezerActionsComplete
.keySet()
.forEach(
(b) -> {
if (accountStorageFreezerActionsComplete.containsKey(b)) {
frozenBlocksCompleted.getAndIncrement();
rootWorldStateStorage.setLatestArchiveFrozenBlock(b);
blocksToMoveToFreezer.removeAll(b);
}
});
LOG.atDebug()
.setMessage(
"finished moving cold state to freezer storage for range (chainHeadNumber: {} - numberOfBlocksToKeepInWarmStorage: {}) = {}. Froze {} account state entries, {} account storage entries from {} blocks")
.addArgument(blockchain::getChainHeadBlockNumber)
.addArgument(DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE)
.addArgument(retainAboveThisBlock)
.addArgument(frozenAccountStateCount.get())
.addArgument(frozenAccountStorageCount.get())
.addArgument(blocksToFreeze.size())
.log();

if (frozenAccountStateCount.get() > 0 || frozenAccountStorageCount.get() > 0) {
LOG.atDebug()
.setMessage("Froze {} account state entries, {} account storage entries for {} blocks")
.addArgument(frozenAccountStateCount.get())
.addArgument(frozenAccountStorageCount.get())
.addArgument(frozenBlocksCompleted.get())
.log();
}
removeArchivedFromQueue(blocksToFreeze);

return frozenBlocksCompleted.get();
return frozenAccountStateCount.get() + frozenAccountStorageCount.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ public int freezePreviousAccountState(
composedWorldStateStorage
.getNearestBefore(ACCOUNT_INFO_STATE, previousKey)
.filter(
found -> accountHash.commonPrefixLength(found.key()) >= accountHash.size()))
found ->
found.value().isPresent()
&& accountHash.commonPrefixLength(found.key())
>= accountHash.size()))
.isPresent()) {
nextMatch.stream()
.forEach(
Expand All @@ -249,8 +252,18 @@ public int freezePreviousAccountState(
}

if (frozenStateCount.get() == 0) {
// A lot of entries will have no previous history, so use trace to log when no previous
// storage was found
LOG.atTrace()
.setMessage("no previous state found for block {}, address hash {}")
.addArgument(previousBlockHeader.get().getNumber())
.addArgument(accountHash)
.log();
} else {
LOG.atDebug()
.setMessage("no previous state for account {} found to move to cold storage")
.setMessage("{} storage entries frozen for block {}, address hash {}")
.addArgument(frozenStateCount.get())
.addArgument(previousBlockHeader.get().getNumber())
.addArgument(accountHash)
.log();
}
Expand Down Expand Up @@ -294,12 +307,25 @@ public int freezePreviousStorageState(
.getNearestBefore(ACCOUNT_STORAGE_STORAGE, previousKey)
.filter(
found ->
storageSlotKey.commonPrefixLength(found.key())
>= storageSlotKey.size()))
found.value().isPresent()
&& storageSlotKey.commonPrefixLength(found.key())
>= storageSlotKey.size()))
.isPresent()) {
nextMatch.stream()
.forEach(
(nearestKey) -> {
if (frozenStorageCount.get() > 0 && frozenStorageCount.get() % 100 == 0) {
// Log progress in case catching up causes there to be a large number of keys
// to move
LOG.atDebug()
.setMessage(
"{} storage entries frozen for block {}, slot hash {}, latest key {}")
.addArgument(frozenStorageCount.get())
.addArgument(previousBlockHeader.get().getNumber())
.addArgument(storageSlotKey)
.addArgument(nearestKey.key())
.log();
}
moveDBEntry(
ACCOUNT_STORAGE_STORAGE,
ACCOUNT_STORAGE_FREEZER,
Expand All @@ -310,8 +336,18 @@ public int freezePreviousStorageState(
}

if (frozenStorageCount.get() == 0) {
// A lot of entries will have no previous history, so use trace to log when no previous
// storage was found
LOG.atTrace()
.setMessage("no previous storage found for block {}, slot hash {}")
.addArgument(previousBlockHeader.get().getNumber())
.addArgument(storageSlotKey)
.log();
} else {
LOG.atDebug()
.setMessage("no previous state for storage {} found to move to cold storage")
.setMessage("{} storage entries frozen for block {}, slot hash {}")
.addArgument(frozenStorageCount.get())
.addArgument(previousBlockHeader.get().getNumber())
.addArgument(storageSlotKey)
.log();
}
Expand All @@ -337,10 +373,7 @@ private void moveDBEntry(
tx.commit();
break;
} catch (StorageException se) {
if (se.getMessage().contains("RocksDBException: Busy")) {
if (retried) {
break;
}
if (!retried && se.getMessage().contains("RocksDBException: Busy")) {
retried = true;
} else {
break;
Expand Down
Loading

0 comments on commit 5752732

Please sign in to comment.