Skip to content

Commit

Permalink
Fix possible incomplete txpool restore from dump file (#7991)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored and garyschulte committed Dec 20, 2024
1 parent 366341e commit 979493f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
### Bug fixes
- Fix serialization of state overrides when `movePrecompileToAddress` is present [#8204](https://github.com/hyperledger/besu/pull/8024)
- Revise the approach for setting level_compaction_dynamic_level_bytes RocksDB configuration option [#8037](https://github.com/hyperledger/besu/pull/8037)
- Fix possible incomplete txpool restore from dump file [#7991](https://github.com/hyperledger/besu/pull/7991)

## 24.12.2 Hotfix

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand All @@ -73,6 +73,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -645,15 +646,16 @@ public CompletableFuture<Void> setDisabled() {
isPoolEnabled.set(false);
subscribeConnectId.ifPresent(ethContext.getEthPeers()::unsubscribeConnect);
pendingTransactionsListenersProxy.unsubscribe();
final PendingTransactions pendingTransactionsToSave = pendingTransactions;
final CompletableFuture<Void> saveOperation =
saveRestoreManager
.saveToDisk(pendingTransactions)
.exceptionally(
t -> {
LOG.error("Error while saving transaction pool to disk", t);
return null;
});
pendingTransactions = new DisabledPendingTransactions();
return saveRestoreManager
.saveToDisk(pendingTransactionsToSave)
.exceptionally(
t -> {
LOG.error("Error while saving transaction pool to disk", t);
return null;
});
return saveOperation;
}
return CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -750,6 +752,7 @@ class SaveRestoreManager {
private final AtomicBoolean isCancelled = new AtomicBoolean(false);

CompletableFuture<Void> saveToDisk(final PendingTransactions pendingTransactionsToSave) {
cancelInProgressReadOperation();
return serializeAndDedupOperation(
() -> executeSaveToDisk(pendingTransactionsToSave), writeInProgress);
}
Expand All @@ -758,20 +761,31 @@ CompletableFuture<Void> loadFromDisk() {
return serializeAndDedupOperation(this::executeLoadFromDisk, readInProgress);
}

private void cancelInProgressReadOperation() {
if (!readInProgress.get().isDone()) {
LOG.debug("Cancelling in progress read operation");
isCancelled.set(true);
try {
waitUntilReadOperationIsCancelled();
LOG.debug("In progress read operation cancelled");
} catch (InterruptedException | ExecutionException e) {
LOG.warn("Error while cancelling in progress read operation", e);
throw new RuntimeException(e);
}
}
}

private void waitUntilReadOperationIsCancelled()
throws InterruptedException, ExecutionException {
readInProgress.get().get();
}

private CompletableFuture<Void> serializeAndDedupOperation(
final Runnable operation,
final AtomicReference<CompletableFuture<Void>> operationInProgress) {
if (configuration.getEnableSaveRestore()) {
try {
if (diskAccessLock.tryAcquire(1, TimeUnit.MINUTES)) {
if (!operationInProgress.get().isDone()) {
isCancelled.set(true);
try {
operationInProgress.get().get();
} catch (ExecutionException ee) {
// nothing to do
}
}

isCancelled.set(false);
operationInProgress.set(
Expand All @@ -791,12 +805,17 @@ private CompletableFuture<Void> serializeAndDedupOperation(

private void executeSaveToDisk(final PendingTransactions pendingTransactionsToSave) {
final File saveFile = configuration.getSaveFile();
final boolean appending = saveFile.exists();
try (final BufferedWriter bw =
new BufferedWriter(new FileWriter(saveFile, StandardCharsets.US_ASCII))) {
new BufferedWriter(new FileWriter(saveFile, StandardCharsets.US_ASCII, appending))) {
final var allTxs = pendingTransactionsToSave.getPendingTransactions();
LOG.info("Saving {} transactions to file {}", allTxs.size(), saveFile);
LOG.info(
"{} {} transactions to file {}",
appending ? "Appending" : "Saving",
allTxs.size(),
saveFile);

final long savedTxs =
final long processedTxCount =
allTxs.parallelStream()
.takeWhile(unused -> !isCancelled.get())
.map(
Expand All @@ -819,13 +838,19 @@ private void executeSaveToDisk(final PendingTransactions pendingTransactionsToSa
return 1;
})
.sum();

if (isCancelled.get()) {
LOG.info(
"Saved {} transactions to file {}, before operation was cancelled",
savedTxs,
"{} {} transactions to file {}, before operation was cancelled",
appending ? "Appended" : "Saved",
processedTxCount,
saveFile);
} else {
LOG.info("Saved {} transactions to file {}", savedTxs, saveFile);
LOG.info(
"{} {} transactions to file {}",
appending ? "Appended" : "Saved",
processedTxCount,
saveFile);
}
} catch (IOException e) {
LOG.error("Error while saving txpool content to disk", e);
Expand All @@ -839,41 +864,77 @@ private void executeLoadFromDisk() {
LOG.info("Loading transaction pool content from file {}", saveFile);
try (final BufferedReader br =
new BufferedReader(new FileReader(saveFile, StandardCharsets.US_ASCII))) {
final IntSummaryStatistics stats =
final Map<String, Long> stats =
br.lines()
.takeWhile(unused -> !isCancelled.get())
.mapToInt(
.map(
line -> {
final boolean isLocal = line.charAt(0) == 'l';
final Transaction tx =
Transaction.readFrom(Bytes.fromBase64String(line.substring(1)));

final ValidationResult<TransactionInvalidReason> result =
addTransaction(tx, isLocal);

return result.isValid() ? 1 : 0;
return result.isValid() ? "OK" : result.getInvalidReason().name();
})
.summaryStatistics();
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

br.close();

final var added = stats.getOrDefault("OK", 0L);
final var processedLines = stats.values().stream().mapToLong(Long::longValue).sum();

LOG.debug("Restored transactions stats {}", stats);

if (isCancelled.get()) {
LOG.info(
"Added {} transactions of {} loaded from file {}, before operation was cancelled",
stats.getSum(),
stats.getCount(),
added,
processedLines,
saveFile);
removeProcessedLines(saveFile, processedLines);
} else {
LOG.info(
"Added {} transactions of {} loaded from file {}",
stats.getSum(),
stats.getCount(),
"Added {} transactions of {} loaded from file {}, deleting file",
added,
processedLines,
saveFile);
saveFile.delete();
}
} catch (IOException e) {
LOG.error("Error while saving txpool content to disk", e);
}
}
saveFile.delete();
}
}

private void removeProcessedLines(final File saveFile, final long processedLines)
throws IOException {

LOG.debug("Removing processed lines from save file");

final var tmp = File.createTempFile(saveFile.getName(), ".tmp");

try (final BufferedReader reader =
Files.newBufferedReader(saveFile.toPath(), StandardCharsets.US_ASCII);
final BufferedWriter writer =
Files.newBufferedWriter(tmp.toPath(), StandardCharsets.US_ASCII)) {
reader
.lines()
.skip(processedLines)
.forEach(
line -> {
try {
writer.write(line);
writer.newLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

saveFile.delete();
Files.move(tmp.toPath(), saveFile.toPath());
}
}
}

0 comments on commit 979493f

Please sign in to comment.