From 785426b13ff750cb0931c79cb385aa9e1f19dc5b Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Tue, 5 Feb 2019 15:57:09 -0800 Subject: [PATCH] scan limited entry logger file for gc --- .../bookie/GarbageCollectorThread.java | 89 ++++++++++++------- .../bookkeeper/conf/ServerConfiguration.java | 19 ++++ 2 files changed, 74 insertions(+), 34 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 4b1c834443a..032f535f5e4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -73,6 +73,7 @@ public class GarbageCollectorThread extends SafeRunnable { // This is how often we want to run the Garbage Collector Thread (in milliseconds). final long gcWaitTime; + final int maxEntryLoggersPerScan; // Compaction parameters boolean enableMinorCompaction = false; @@ -112,7 +113,10 @@ public class GarbageCollectorThread extends SafeRunnable { // track the last scanned successfully log id long scannedLogId = 0; + boolean moreEntryLoggers = true; + private final boolean verifyMetadataOnGc; + final AtomicBoolean gcRunning = new AtomicBoolean(false); // Boolean to trigger a forced GC. final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false); // Boolean to disable major compaction, when disk is almost full @@ -153,10 +157,13 @@ public GarbageCollectorThread(ServerConfiguration conf, throws IOException { this.gcExecutor = gcExecutor; this.conf = conf; + + this.verifyMetadataOnGc = conf.getVerifyMetadataOnGC(); this.entryLogger = ledgerStorage.getEntryLogger(); this.ledgerStorage = ledgerStorage; this.gcWaitTime = conf.getGcWaitTime(); + this.maxEntryLoggersPerScan = conf.getMaxEntryLoggerScanOnGc(); this.garbageCleaner = ledgerId -> { try { @@ -340,6 +347,12 @@ public void safeRun() { } public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor) { + + if(gcRunning.compareAndSet(false, true)) { + LOG.info("Garbage collector is already running."); + return; + } + long threadStart = MathUtils.nowInNano(); if (force) { LOG.info("Garbage collector thread forced to perform GC before expiry of wait time."); @@ -347,43 +360,45 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin // Recover and clean up previous state if using transactional compaction compactor.cleanUpAndRecover(); - // Extract all of the ledger ID's that comprise all of the entry logs - // (except for the current new one which is still being written to). - entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap); + do { + // Extract all of the ledger ID's that comprise all of the entry + // logs + // (except for the current new one which is still being written to). + entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap); - // gc inactive/deleted ledgers - doGcLedgers(); + // gc inactive/deleted ledgers + doGcLedgers(); - // gc entry logs - doGcEntryLogs(); + // gc entry logs + doGcEntryLogs(); - if (suspendMajor) { - LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); - } - if (suspendMinor) { - LOG.info("Disk full, suspend minor compaction to slow down filling disk."); - } + if (suspendMajor) { + LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); + } + if (suspendMinor) { + LOG.info("Disk full, suspend minor compaction to slow down filling disk."); + } - long curTime = MathUtils.now(); - if (enableMajorCompaction && (!suspendMajor) - && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { - // enter major compaction - LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); - doCompactEntryLogs(majorCompactionThreshold); - lastMajorCompactionTime = MathUtils.now(); - // and also move minor compaction time - lastMinorCompactionTime = lastMajorCompactionTime; - majorCompactionCounter.inc(); - } else if (enableMinorCompaction && (!suspendMinor) - && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { - // enter minor compaction - LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); - doCompactEntryLogs(minorCompactionThreshold); - lastMinorCompactionTime = MathUtils.now(); - minorCompactionCounter.inc(); - } - this.gcThreadRuntime.registerSuccessfulEvent( - MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); + long curTime = MathUtils.now(); + if (enableMajorCompaction && (!suspendMajor) + && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { + // enter major compaction + LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); + doCompactEntryLogs(majorCompactionThreshold); + lastMajorCompactionTime = MathUtils.now(); + // and also move minor compaction time + lastMinorCompactionTime = lastMajorCompactionTime; + majorCompactionCounter.inc(); + } else if (enableMinorCompaction && (!suspendMinor) + && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { + // enter minor compaction + LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); + doCompactEntryLogs(minorCompactionThreshold); + lastMinorCompactionTime = MathUtils.now(); + minorCompactionCounter.inc(); + } + this.gcThreadRuntime.registerSuccessfulEvent(MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); + } while (moreEntryLoggers); } /** @@ -540,12 +555,18 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) { * @throws IOException */ protected Map extractMetaFromEntryLogs(Map entryLogMetaMap) { + moreEntryLoggers = false; // Extract it for every entry log except for the current one. // Entry Log ID's are just a long value that starts at 0 and increments // by 1 when the log fills up and we roll to a new one. long curLogId = entryLogger.getLeastUnflushedLogId(); boolean hasExceptionWhenScan = false; - for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) { + int entryLogFileCount = 0; + for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++, entryLogFileCount++) { + if (entryLogFileCount > maxEntryLoggersPerScan && verifyMetadataOnGc) { + moreEntryLoggers = true; + break; + } // Comb the current entry log file if it has not already been extracted. if (entryLogMetaMap.containsKey(entryLogId)) { continue; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 3e4749c6bbd..f325642bca0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -55,6 +55,7 @@ public class ServerConfiguration extends AbstractConfiguration