From bd1774bb6bdf5bff693c118196675af087468ae9 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Tue, 5 Feb 2019 15:57:09 -0800 Subject: [PATCH 1/3] scan limited entry logger file for gc --- .../bookie/GarbageCollectorThread.java | 89 ++++++++++++------- .../bookkeeper/conf/ServerConfiguration.java | 19 ++++ 2 files changed, 74 insertions(+), 34 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 4b1c834443a..01a93489e86 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -73,6 +73,7 @@ public class GarbageCollectorThread extends SafeRunnable { // This is how often we want to run the Garbage Collector Thread (in milliseconds). final long gcWaitTime; + final int maxEntryLoggersPerScan; // Compaction parameters boolean enableMinorCompaction = false; @@ -112,6 +113,8 @@ public class GarbageCollectorThread extends SafeRunnable { // track the last scanned successfully log id long scannedLogId = 0; + boolean moreEntryLoggers = true; + private final boolean verifyMetadataOnGc; // Boolean to trigger a forced GC. final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false); @@ -153,10 +156,13 @@ public GarbageCollectorThread(ServerConfiguration conf, throws IOException { this.gcExecutor = gcExecutor; this.conf = conf; + + this.verifyMetadataOnGc = conf.getVerifyMetadataOnGC(); this.entryLogger = ledgerStorage.getEntryLogger(); this.ledgerStorage = ledgerStorage; this.gcWaitTime = conf.getGcWaitTime(); + this.maxEntryLoggersPerScan = conf.getMaxEntryLoggerScanOnGc(); this.garbageCleaner = ledgerId -> { try { @@ -340,6 +346,9 @@ public void safeRun() { } public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor) { + + LOG.info("Garbage collector starting with max-entry-logger scan {}", maxEntryLoggersPerScan); + long threadStart = MathUtils.nowInNano(); if (force) { LOG.info("Garbage collector thread forced to perform GC before expiry of wait time."); @@ -347,43 +356,47 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin // Recover and clean up previous state if using transactional compaction compactor.cleanUpAndRecover(); - // Extract all of the ledger ID's that comprise all of the entry logs - // (except for the current new one which is still being written to). - entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap); + do { + // Extract all of the ledger ID's that comprise all of the entry + // logs + // (except for the current new one which is still being written to). + entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap); - // gc inactive/deleted ledgers - doGcLedgers(); + // gc inactive/deleted ledgers + doGcLedgers(); - // gc entry logs - doGcEntryLogs(); + // gc entry logs + doGcEntryLogs(); - if (suspendMajor) { - LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); - } - if (suspendMinor) { - LOG.info("Disk full, suspend minor compaction to slow down filling disk."); - } + if (suspendMajor) { + LOG.info("Disk almost full, suspend major compaction to slow down filling disk."); + } + if (suspendMinor) { + LOG.info("Disk full, suspend minor compaction to slow down filling disk."); + } - long curTime = MathUtils.now(); - if (enableMajorCompaction && (!suspendMajor) - && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { - // enter major compaction - LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); - doCompactEntryLogs(majorCompactionThreshold); - lastMajorCompactionTime = MathUtils.now(); - // and also move minor compaction time - lastMinorCompactionTime = lastMajorCompactionTime; - majorCompactionCounter.inc(); - } else if (enableMinorCompaction && (!suspendMinor) - && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { - // enter minor compaction - LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); - doCompactEntryLogs(minorCompactionThreshold); - lastMinorCompactionTime = MathUtils.now(); - minorCompactionCounter.inc(); - } - this.gcThreadRuntime.registerSuccessfulEvent( - MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); + long curTime = MathUtils.now(); + if (enableMajorCompaction && (!suspendMajor) + && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { + // enter major compaction + LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); + doCompactEntryLogs(majorCompactionThreshold); + lastMajorCompactionTime = MathUtils.now(); + // and also move minor compaction time + lastMinorCompactionTime = lastMajorCompactionTime; + majorCompactionCounter.inc(); + } else if (enableMinorCompaction && (!suspendMinor) + && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { + // enter minor compaction + LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); + doCompactEntryLogs(minorCompactionThreshold); + lastMinorCompactionTime = MathUtils.now(); + minorCompactionCounter.inc(); + } + this.gcThreadRuntime.registerSuccessfulEvent(MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); + } while (moreEntryLoggers); + + LOG.info("GC process completed"); } /** @@ -540,12 +553,20 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) { * @throws IOException */ protected Map extractMetaFromEntryLogs(Map entryLogMetaMap) { + moreEntryLoggers = false; // Extract it for every entry log except for the current one. // Entry Log ID's are just a long value that starts at 0 and increments // by 1 when the log fills up and we roll to a new one. long curLogId = entryLogger.getLeastUnflushedLogId(); boolean hasExceptionWhenScan = false; - for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) { + int entryLogFileCount = 0; + for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++, entryLogFileCount++) { + if (entryLogFileCount > maxEntryLoggersPerScan && verifyMetadataOnGc) { + moreEntryLoggers = true; + LOG.info("Reached max-entry-logger {}, so will be perform gc in next iteration", + maxEntryLoggersPerScan); + break; + } // Comb the current entry log file if it has not already been extracted. if (entryLogMetaMap.containsKey(entryLogId)) { continue; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 3e4749c6bbd..b48772d4f9b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -55,6 +55,7 @@ public class ServerConfiguration extends AbstractConfiguration Date: Tue, 5 Feb 2019 17:57:28 -0800 Subject: [PATCH 2/3] start iteration from last entry-log --- .../bookie/GarbageCollectorThread.java | 19 +++++++++++++------ .../ScanAndCompareGarbageCollector.java | 15 +++++++++++++-- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 01a93489e86..8016be87f29 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -113,6 +113,7 @@ public class GarbageCollectorThread extends SafeRunnable { // track the last scanned successfully log id long scannedLogId = 0; + long lastIterationLogId = 0; boolean moreEntryLoggers = true; private final boolean verifyMetadataOnGc; @@ -347,7 +348,8 @@ public void safeRun() { public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor) { - LOG.info("Garbage collector starting with max-entry-logger scan {}", maxEntryLoggersPerScan); + LOG.info("Garbage collector starting with max-entry-logger scan {} and verify-zk {}", maxEntryLoggersPerScan, + verifyMetadataOnGc); long threadStart = MathUtils.nowInNano(); if (force) { @@ -356,11 +358,12 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin // Recover and clean up previous state if using transactional compaction compactor.cleanUpAndRecover(); + boolean isIteration = false; do { // Extract all of the ledger ID's that comprise all of the entry // logs // (except for the current new one which is still being written to). - entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap); + entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap, isIteration); // gc inactive/deleted ledgers doGcLedgers(); @@ -394,6 +397,7 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin minorCompactionCounter.inc(); } this.gcThreadRuntime.registerSuccessfulEvent(MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); + isIteration = true; } while (moreEntryLoggers); LOG.info("GC process completed"); @@ -552,7 +556,7 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) { * Existing EntryLogs to Meta * @throws IOException */ - protected Map extractMetaFromEntryLogs(Map entryLogMetaMap) { + protected Map extractMetaFromEntryLogs(Map entryLogMetaMap, boolean isIteration) { moreEntryLoggers = false; // Extract it for every entry log except for the current one. // Entry Log ID's are just a long value that starts at 0 and increments @@ -560,11 +564,13 @@ protected Map extractMetaFromEntryLogs(Map maxEntryLoggersPerScan && verifyMetadataOnGc) { + lastIterationLogId = entryLogId; moreEntryLoggers = true; - LOG.info("Reached max-entry-logger {}, so will be perform gc in next iteration", - maxEntryLoggersPerScan); + LOG.info("Reached max-entry-logger {}, so will be perform gc in next iteration starts from {}", + entryLogFileCount, maxEntryLoggersPerScan, scannedLogId); break; } // Comb the current entry log file if it has not already been extracted. @@ -575,6 +581,7 @@ protected Map extractMetaFromEntryLogs(Map Date: Tue, 5 Feb 2019 19:58:02 -0800 Subject: [PATCH 3/3] add debug logs --- .../org/apache/bookkeeper/bookie/GarbageCollectorThread.java | 2 +- .../bookkeeper/bookie/ScanAndCompareGarbageCollector.java | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 8016be87f29..f14afad8f9c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -570,7 +570,7 @@ protected Map extractMetaFromEntryLogs(Map bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, @@ -217,6 +219,8 @@ public void gc(GarbageCleaner garbageCleaner) { // ignore exception, collecting garbage next time LOG.warn("Exception when iterating over the metadata {}", t); } finally { + long endTime = System.currentTimeMillis(); + LOG.info("Total GC time {}", TimeUnit.MILLISECONDS.toSeconds(endTime - startTime)); if (zk != null) { try { zk.close();