From 979493f80e940f70ff0cbcf51306055a9583120a Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Thu, 19 Dec 2024 12:22:00 +0100 Subject: [PATCH] Fix possible incomplete txpool restore from dump file (#7991) Signed-off-by: Fabio Di Fabio --- CHANGELOG.md | 1 + .../eth/transactions/TransactionPool.java | 129 +++++++++++++----- 2 files changed, 96 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5202ffb9c8a..cfef1d8e50d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ ### Bug fixes - Fix serialization of state overrides when `movePrecompileToAddress` is present [#8204](https://github.com/hyperledger/besu/pull/8024) - Revise the approach for setting level_compaction_dynamic_level_bytes RocksDB configuration option [#8037](https://github.com/hyperledger/besu/pull/8037) +- Fix possible incomplete txpool restore from dump file [#7991](https://github.com/hyperledger/besu/pull/7991) ## 24.12.2 Hotfix diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index 3acf49894a3..1adc5977bb6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -54,11 +54,11 @@ import java.io.IOException; import java.math.BigInteger; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.IntSummaryStatistics; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -73,6 +73,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -645,15 +646,16 @@ public CompletableFuture setDisabled() { isPoolEnabled.set(false); subscribeConnectId.ifPresent(ethContext.getEthPeers()::unsubscribeConnect); pendingTransactionsListenersProxy.unsubscribe(); - final PendingTransactions pendingTransactionsToSave = pendingTransactions; + final CompletableFuture saveOperation = + saveRestoreManager + .saveToDisk(pendingTransactions) + .exceptionally( + t -> { + LOG.error("Error while saving transaction pool to disk", t); + return null; + }); pendingTransactions = new DisabledPendingTransactions(); - return saveRestoreManager - .saveToDisk(pendingTransactionsToSave) - .exceptionally( - t -> { - LOG.error("Error while saving transaction pool to disk", t); - return null; - }); + return saveOperation; } return CompletableFuture.completedFuture(null); } @@ -750,6 +752,7 @@ class SaveRestoreManager { private final AtomicBoolean isCancelled = new AtomicBoolean(false); CompletableFuture saveToDisk(final PendingTransactions pendingTransactionsToSave) { + cancelInProgressReadOperation(); return serializeAndDedupOperation( () -> executeSaveToDisk(pendingTransactionsToSave), writeInProgress); } @@ -758,20 +761,31 @@ CompletableFuture loadFromDisk() { return serializeAndDedupOperation(this::executeLoadFromDisk, readInProgress); } + private void cancelInProgressReadOperation() { + if (!readInProgress.get().isDone()) { + LOG.debug("Cancelling in progress read operation"); + isCancelled.set(true); + try { + waitUntilReadOperationIsCancelled(); + LOG.debug("In progress read operation cancelled"); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Error while cancelling in progress read operation", e); + throw new RuntimeException(e); + } + } + } + + private void waitUntilReadOperationIsCancelled() + throws InterruptedException, ExecutionException { + readInProgress.get().get(); + } + private CompletableFuture serializeAndDedupOperation( final Runnable operation, final AtomicReference> operationInProgress) { if (configuration.getEnableSaveRestore()) { try { if (diskAccessLock.tryAcquire(1, TimeUnit.MINUTES)) { - if (!operationInProgress.get().isDone()) { - isCancelled.set(true); - try { - operationInProgress.get().get(); - } catch (ExecutionException ee) { - // nothing to do - } - } isCancelled.set(false); operationInProgress.set( @@ -791,12 +805,17 @@ private CompletableFuture serializeAndDedupOperation( private void executeSaveToDisk(final PendingTransactions pendingTransactionsToSave) { final File saveFile = configuration.getSaveFile(); + final boolean appending = saveFile.exists(); try (final BufferedWriter bw = - new BufferedWriter(new FileWriter(saveFile, StandardCharsets.US_ASCII))) { + new BufferedWriter(new FileWriter(saveFile, StandardCharsets.US_ASCII, appending))) { final var allTxs = pendingTransactionsToSave.getPendingTransactions(); - LOG.info("Saving {} transactions to file {}", allTxs.size(), saveFile); + LOG.info( + "{} {} transactions to file {}", + appending ? "Appending" : "Saving", + allTxs.size(), + saveFile); - final long savedTxs = + final long processedTxCount = allTxs.parallelStream() .takeWhile(unused -> !isCancelled.get()) .map( @@ -819,13 +838,19 @@ private void executeSaveToDisk(final PendingTransactions pendingTransactionsToSa return 1; }) .sum(); + if (isCancelled.get()) { LOG.info( - "Saved {} transactions to file {}, before operation was cancelled", - savedTxs, + "{} {} transactions to file {}, before operation was cancelled", + appending ? "Appended" : "Saved", + processedTxCount, saveFile); } else { - LOG.info("Saved {} transactions to file {}", savedTxs, saveFile); + LOG.info( + "{} {} transactions to file {}", + appending ? "Appended" : "Saved", + processedTxCount, + saveFile); } } catch (IOException e) { LOG.error("Error while saving txpool content to disk", e); @@ -839,10 +864,10 @@ private void executeLoadFromDisk() { LOG.info("Loading transaction pool content from file {}", saveFile); try (final BufferedReader br = new BufferedReader(new FileReader(saveFile, StandardCharsets.US_ASCII))) { - final IntSummaryStatistics stats = + final Map stats = br.lines() .takeWhile(unused -> !isCancelled.get()) - .mapToInt( + .map( line -> { final boolean isLocal = line.charAt(0) == 'l'; final Transaction tx = @@ -850,30 +875,66 @@ private void executeLoadFromDisk() { final ValidationResult result = addTransaction(tx, isLocal); - - return result.isValid() ? 1 : 0; + return result.isValid() ? "OK" : result.getInvalidReason().name(); }) - .summaryStatistics(); + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + br.close(); + + final var added = stats.getOrDefault("OK", 0L); + final var processedLines = stats.values().stream().mapToLong(Long::longValue).sum(); + + LOG.debug("Restored transactions stats {}", stats); if (isCancelled.get()) { LOG.info( "Added {} transactions of {} loaded from file {}, before operation was cancelled", - stats.getSum(), - stats.getCount(), + added, + processedLines, saveFile); + removeProcessedLines(saveFile, processedLines); } else { LOG.info( - "Added {} transactions of {} loaded from file {}", - stats.getSum(), - stats.getCount(), + "Added {} transactions of {} loaded from file {}, deleting file", + added, + processedLines, saveFile); + saveFile.delete(); } } catch (IOException e) { LOG.error("Error while saving txpool content to disk", e); } } - saveFile.delete(); } } + + private void removeProcessedLines(final File saveFile, final long processedLines) + throws IOException { + + LOG.debug("Removing processed lines from save file"); + + final var tmp = File.createTempFile(saveFile.getName(), ".tmp"); + + try (final BufferedReader reader = + Files.newBufferedReader(saveFile.toPath(), StandardCharsets.US_ASCII); + final BufferedWriter writer = + Files.newBufferedWriter(tmp.toPath(), StandardCharsets.US_ASCII)) { + reader + .lines() + .skip(processedLines) + .forEach( + line -> { + try { + writer.write(line); + writer.newLine(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + saveFile.delete(); + Files.move(tmp.toPath(), saveFile.toPath()); + } } }