Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scan limited entry logger file for gc #1

Open
wants to merge 3 commits into
base: 4.7-master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class GarbageCollectorThread extends SafeRunnable {

// This is how often we want to run the Garbage Collector Thread (in milliseconds).
final long gcWaitTime;
final int maxEntryLoggersPerScan;

// Compaction parameters
boolean enableMinorCompaction = false;
Expand Down Expand Up @@ -112,6 +113,9 @@ public class GarbageCollectorThread extends SafeRunnable {

// track the last scanned successfully log id
long scannedLogId = 0;
long lastIterationLogId = 0;
boolean moreEntryLoggers = true;
private final boolean verifyMetadataOnGc;

// Boolean to trigger a forced GC.
final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false);
Expand Down Expand Up @@ -153,10 +157,13 @@ public GarbageCollectorThread(ServerConfiguration conf,
throws IOException {
this.gcExecutor = gcExecutor;
this.conf = conf;


this.verifyMetadataOnGc = conf.getVerifyMetadataOnGC();
this.entryLogger = ledgerStorage.getEntryLogger();
this.ledgerStorage = ledgerStorage;
this.gcWaitTime = conf.getGcWaitTime();
this.maxEntryLoggersPerScan = conf.getMaxEntryLoggerScanOnGc();

this.garbageCleaner = ledgerId -> {
try {
Expand Down Expand Up @@ -340,50 +347,60 @@ public void safeRun() {
}

public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor) {

LOG.info("Garbage collector starting with max-entry-logger scan {} and verify-zk {}", maxEntryLoggersPerScan,
verifyMetadataOnGc);

long threadStart = MathUtils.nowInNano();
if (force) {
LOG.info("Garbage collector thread forced to perform GC before expiry of wait time.");
}
// Recover and clean up previous state if using transactional compaction
compactor.cleanUpAndRecover();

// Extract all of the ledger ID's that comprise all of the entry logs
// (except for the current new one which is still being written to).
entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap);
boolean isIteration = false;
do {
// Extract all of the ledger ID's that comprise all of the entry
// logs
// (except for the current new one which is still being written to).
entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap, isIteration);

// gc inactive/deleted ledgers
doGcLedgers();
// gc inactive/deleted ledgers
doGcLedgers();

// gc entry logs
doGcEntryLogs();
// gc entry logs
doGcEntryLogs();

if (suspendMajor) {
LOG.info("Disk almost full, suspend major compaction to slow down filling disk.");
}
if (suspendMinor) {
LOG.info("Disk full, suspend minor compaction to slow down filling disk.");
}
if (suspendMajor) {
LOG.info("Disk almost full, suspend major compaction to slow down filling disk.");
}
if (suspendMinor) {
LOG.info("Disk full, suspend minor compaction to slow down filling disk.");
}

long curTime = MathUtils.now();
if (enableMajorCompaction && (!suspendMajor)
&& (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
// enter major compaction
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
doCompactEntryLogs(majorCompactionThreshold);
lastMajorCompactionTime = MathUtils.now();
// and also move minor compaction time
lastMinorCompactionTime = lastMajorCompactionTime;
majorCompactionCounter.inc();
} else if (enableMinorCompaction && (!suspendMinor)
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
// enter minor compaction
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
doCompactEntryLogs(minorCompactionThreshold);
lastMinorCompactionTime = MathUtils.now();
minorCompactionCounter.inc();
}
this.gcThreadRuntime.registerSuccessfulEvent(
MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
long curTime = MathUtils.now();
if (enableMajorCompaction && (!suspendMajor)
&& (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
// enter major compaction
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
doCompactEntryLogs(majorCompactionThreshold);
lastMajorCompactionTime = MathUtils.now();
// and also move minor compaction time
lastMinorCompactionTime = lastMajorCompactionTime;
majorCompactionCounter.inc();
} else if (enableMinorCompaction && (!suspendMinor)
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
// enter minor compaction
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
doCompactEntryLogs(minorCompactionThreshold);
lastMinorCompactionTime = MathUtils.now();
minorCompactionCounter.inc();
}
this.gcThreadRuntime.registerSuccessfulEvent(MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
isIteration = true;
} while (moreEntryLoggers);

LOG.info("GC process completed");
}

/**
Expand Down Expand Up @@ -539,13 +556,23 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) {
* Existing EntryLogs to Meta
* @throws IOException
*/
protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap) {
protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap, boolean isIteration) {
moreEntryLoggers = false;
// Extract it for every entry log except for the current one.
// Entry Log ID's are just a long value that starts at 0 and increments
// by 1 when the log fills up and we roll to a new one.
long curLogId = entryLogger.getLeastUnflushedLogId();
boolean hasExceptionWhenScan = false;
for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) {
int entryLogFileCount = 0;
long entryLogId = isIteration ? lastIterationLogId : scannedLogId;
for (; entryLogId < curLogId; entryLogId++, entryLogFileCount++) {
if (entryLogFileCount > maxEntryLoggersPerScan && verifyMetadataOnGc) {
lastIterationLogId = entryLogId;
moreEntryLoggers = true;
LOG.info("Reached max-entry-logger {}, so will be perform gc in next iteration starts from {}",
entryLogFileCount, maxEntryLoggersPerScan, entryLogId);
break;
}
// Comb the current entry log file if it has not already been extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
Expand All @@ -554,6 +581,7 @@ protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLo
// check whether log file exists or not
// if it doesn't exist, this log file might have been garbage collected.
if (!entryLogger.logExists(entryLogId)) {
entryLogFileCount--;
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadata;
Expand Down Expand Up @@ -134,6 +135,8 @@ public void gc(GarbageCleaner garbageCleaner) {
return;
}

long startTime = System.currentTimeMillis();
LOG.info("starting gc - zk traversing");
try {
// Get a set of all ledgers on the bookie
NavigableSet<Long> bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0,
Expand Down Expand Up @@ -188,15 +191,25 @@ public void gc(GarbageCleaner garbageCleaner) {
metaRC.set(rc);
latch.countDown();
});
latch.await();
if (metaRC.get() != BKException.Code.NoSuchLedgerExistsException) {
boolean timeout = false;
try {
latch.await(2, TimeUnit.SECONDS);
} catch (Exception e) {
timeout = true;
LOG.warn("Failed to check zk metadata {}", e.getMessage());
}

if (timeout || metaRC.get() != BKException.Code.NoSuchLedgerExistsException) {
LOG.warn(
"Ledger {} Missing in metadata list, but ledgerManager returned rc: {}.",
bkLid,
metaRC.get());
continue;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting bkLid {}", bkLid);
}
deletedLedgerCounter.inc();
garbageCleaner.clean(bkLid);
}
Expand All @@ -206,6 +219,8 @@ public void gc(GarbageCleaner garbageCleaner) {
// ignore exception, collecting garbage next time
LOG.warn("Exception when iterating over the metadata {}", t);
} finally {
long endTime = System.currentTimeMillis();
LOG.info("Total GC time {}", TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
if (zk != null) {
try {
zk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime";
protected static final String USE_TRANSACTIONAL_COMPACTION = "useTransactionalCompaction";
protected static final String VERIFY_METADATA_ON_GC = "verifyMetadataOnGC";
protected static final String MAX_ENTRY_LOGGER_SCAN_ON_GC = "maxEntryLoggerScanOnGc";
// Sync Parameters
protected static final String FLUSH_INTERVAL = "flushInterval";
protected static final String FLUSH_ENTRYLOG_INTERVAL_BYTES = "flushEntrylogBytes";
Expand Down Expand Up @@ -337,6 +338,24 @@ public ServerConfiguration setVerifyMetadataOnGc(boolean verifyMetadataOnGC) {
return this;
}

/**
* Get Max entry-logger file must be scanned while doing gc
*
*/
public int getMaxEntryLoggerScanOnGc() {
return this.getInt(MAX_ENTRY_LOGGER_SCAN_ON_GC, 100);
}

/**
* Set Max entry-logger file must be scanned while doing gc
*
* @return use transactional compaction
*/
public ServerConfiguration setMaxEntryLoggerScanOnGc(int maxEntryLoggerForScan) {
this.setProperty(MAX_ENTRY_LOGGER_SCAN_ON_GC, maxEntryLoggerForScan);
return this;
}

/**
* Get flush interval. Default value is 10 second. It isn't useful to decrease
* this value, since ledger storage only checkpoints when an entry logger file
Expand Down