diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 4b1c834443a..f14afad8f9c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -73,6 +73,7 @@ public class GarbageCollectorThread extends SafeRunnable { // This is how often we want to run the Garbage Collector Thread (in milliseconds). final long gcWaitTime; + final int maxEntryLoggersPerScan; // Compaction parameters boolean enableMinorCompaction = false; @@ -112,6 +113,9 @@ public class GarbageCollectorThread extends SafeRunnable { // track the last scanned successfully log id long scannedLogId = 0; + long lastIterationLogId = 0; + boolean moreEntryLoggers = true; + private final boolean verifyMetadataOnGc; // Boolean to trigger a forced GC. final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false); @@ -153,10 +157,13 @@ public GarbageCollectorThread(ServerConfiguration conf, throws IOException { this.gcExecutor = gcExecutor; this.conf = conf; + + this.verifyMetadataOnGc = conf.getVerifyMetadataOnGC(); this.entryLogger = ledgerStorage.getEntryLogger(); this.ledgerStorage = ledgerStorage; this.gcWaitTime = conf.getGcWaitTime(); + this.maxEntryLoggersPerScan = conf.getMaxEntryLoggerScanOnGc(); this.garbageCleaner = ledgerId -> { try { @@ -340,6 +347,10 @@ public void safeRun() { } public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor) { + + LOG.info("Garbage collector starting with max-entry-logger scan {} and verify-zk {}", maxEntryLoggersPerScan, + verifyMetadataOnGc); + long threadStart = MathUtils.nowInNano(); if (force) { LOG.info("Garbage collector thread forced to perform GC before expiry of wait time."); @@ -347,43 +358,49 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin // Recover and clean up previous state if using transactional compaction compactor.cleanUpAndRecover(); - // Extract all of the ledger ID's that comprise all of the entry logs - // (except for the current new one which is still being written to). - entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap); + boolean isIteration = false; + do { + // Extract all of the ledger ID's that comprise all of the entry + // logs + // (except for the current new one which is still being written to). + entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap, isIteration); - // gc inactive/deleted ledgers - doGcLedgers(); + // gc inactive/deleted ledgers + doGcLedgers(); - // gc entry logs - doGcEntryLogs(); + // gc entry logs + doGcEntryLogs(); - if (suspendMajor) { - LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); - } - if (suspendMinor) { - LOG.info("Disk full, suspend minor compaction to slow down filling disk."); - } + if (suspendMajor) { + LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); + } + if (suspendMinor) { + LOG.info("Disk full, suspend minor compaction to slow down filling disk."); + } - long curTime = MathUtils.now(); - if (enableMajorCompaction && (!suspendMajor) - && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { - // enter major compaction - LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); - doCompactEntryLogs(majorCompactionThreshold); - lastMajorCompactionTime = MathUtils.now(); - // and also move minor compaction time - lastMinorCompactionTime = lastMajorCompactionTime; - majorCompactionCounter.inc(); - } else if (enableMinorCompaction && (!suspendMinor) - && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { - // enter minor compaction - LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); - doCompactEntryLogs(minorCompactionThreshold); - lastMinorCompactionTime = MathUtils.now(); - minorCompactionCounter.inc(); - } - this.gcThreadRuntime.registerSuccessfulEvent( - MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); + long curTime = MathUtils.now(); + if (enableMajorCompaction && (!suspendMajor) + && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { + // enter major compaction + LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); + doCompactEntryLogs(majorCompactionThreshold); + lastMajorCompactionTime = MathUtils.now(); + // and also move minor compaction time + lastMinorCompactionTime = lastMajorCompactionTime; + majorCompactionCounter.inc(); + } else if (enableMinorCompaction && (!suspendMinor) + && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { + // enter minor compaction + LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); + doCompactEntryLogs(minorCompactionThreshold); + lastMinorCompactionTime = MathUtils.now(); + minorCompactionCounter.inc(); + } + this.gcThreadRuntime.registerSuccessfulEvent(MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); + isIteration = true; + } while (moreEntryLoggers); + + LOG.info("GC process completed"); } /** @@ -539,13 +556,23 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) { * Existing EntryLogs to Meta * @throws IOException */ - protected Map extractMetaFromEntryLogs(Map entryLogMetaMap) { + protected Map extractMetaFromEntryLogs(Map entryLogMetaMap, boolean isIteration) { + moreEntryLoggers = false; // Extract it for every entry log except for the current one. // Entry Log ID's are just a long value that starts at 0 and increments // by 1 when the log fills up and we roll to a new one. long curLogId = entryLogger.getLeastUnflushedLogId(); boolean hasExceptionWhenScan = false; - for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) { + int entryLogFileCount = 0; + long entryLogId = isIteration ? lastIterationLogId : scannedLogId; + for (; entryLogId < curLogId; entryLogId++, entryLogFileCount++) { + if (entryLogFileCount > maxEntryLoggersPerScan && verifyMetadataOnGc) { + lastIterationLogId = entryLogId; + moreEntryLoggers = true; + LOG.info("Reached max-entry-logger {}, so will be perform gc in next iteration starts from {}", + entryLogFileCount, maxEntryLoggersPerScan, entryLogId); + break; + } // Comb the current entry log file if it has not already been extracted. if (entryLogMetaMap.containsKey(entryLogId)) { continue; @@ -554,6 +581,7 @@ protected Map extractMetaFromEntryLogs(Map bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, @@ -188,8 +191,15 @@ public void gc(GarbageCleaner garbageCleaner) { metaRC.set(rc); latch.countDown(); }); - latch.await(); - if (metaRC.get() != BKException.Code.NoSuchLedgerExistsException) { + boolean timeout = false; + try { + latch.await(2, TimeUnit.SECONDS); + } catch (Exception e) { + timeout = true; + LOG.warn("Failed to check zk metadata {}", e.getMessage()); + } + + if (timeout || metaRC.get() != BKException.Code.NoSuchLedgerExistsException) { LOG.warn( "Ledger {} Missing in metadata list, but ledgerManager returned rc: {}.", bkLid, @@ -197,6 +207,9 @@ public void gc(GarbageCleaner garbageCleaner) { continue; } } + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting bkLid {}", bkLid); + } deletedLedgerCounter.inc(); garbageCleaner.clean(bkLid); } @@ -206,6 +219,8 @@ public void gc(GarbageCleaner garbageCleaner) { // ignore exception, collecting garbage next time LOG.warn("Exception when iterating over the metadata {}", t); } finally { + long endTime = System.currentTimeMillis(); + LOG.info("Total GC time {}", TimeUnit.MILLISECONDS.toSeconds(endTime - startTime)); if (zk != null) { try { zk.close(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 3e4749c6bbd..b48772d4f9b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -55,6 +55,7 @@ public class ServerConfiguration extends AbstractConfiguration