Skip to content

Commit

Permalink
scan limited entry logger file for gc
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Feb 6, 2019
1 parent 4372aa5 commit 785426b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class GarbageCollectorThread extends SafeRunnable {

// This is how often we want to run the Garbage Collector Thread (in milliseconds).
final long gcWaitTime;
final int maxEntryLoggersPerScan;

// Compaction parameters
boolean enableMinorCompaction = false;
Expand Down Expand Up @@ -112,7 +113,10 @@ public class GarbageCollectorThread extends SafeRunnable {

// track the last scanned successfully log id
long scannedLogId = 0;
boolean moreEntryLoggers = true;
private final boolean verifyMetadataOnGc;

final AtomicBoolean gcRunning = new AtomicBoolean(false);
// Boolean to trigger a forced GC.
final AtomicBoolean forceGarbageCollection = new AtomicBoolean(false);
// Boolean to disable major compaction, when disk is almost full
Expand Down Expand Up @@ -153,10 +157,13 @@ public GarbageCollectorThread(ServerConfiguration conf,
throws IOException {
this.gcExecutor = gcExecutor;
this.conf = conf;


this.verifyMetadataOnGc = conf.getVerifyMetadataOnGC();
this.entryLogger = ledgerStorage.getEntryLogger();
this.ledgerStorage = ledgerStorage;
this.gcWaitTime = conf.getGcWaitTime();
this.maxEntryLoggersPerScan = conf.getMaxEntryLoggerScanOnGc();

this.garbageCleaner = ledgerId -> {
try {
Expand Down Expand Up @@ -340,50 +347,58 @@ public void safeRun() {
}

public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor) {

if(gcRunning.compareAndSet(false, true)) {
LOG.info("Garbage collector is already running.");
return;
}

long threadStart = MathUtils.nowInNano();
if (force) {
LOG.info("Garbage collector thread forced to perform GC before expiry of wait time.");
}
// Recover and clean up previous state if using transactional compaction
compactor.cleanUpAndRecover();

// Extract all of the ledger ID's that comprise all of the entry logs
// (except for the current new one which is still being written to).
entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap);
do {
// Extract all of the ledger ID's that comprise all of the entry
// logs
// (except for the current new one which is still being written to).
entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap);

// gc inactive/deleted ledgers
doGcLedgers();
// gc inactive/deleted ledgers
doGcLedgers();

// gc entry logs
doGcEntryLogs();
// gc entry logs
doGcEntryLogs();

if (suspendMajor) {
LOG.info("Disk almost full, suspend major compaction to slow down filling disk.");
}
if (suspendMinor) {
LOG.info("Disk full, suspend minor compaction to slow down filling disk.");
}
if (suspendMajor) {
LOG.info("Disk almost full, suspend major compaction to slow down filling disk.");
}
if (suspendMinor) {
LOG.info("Disk full, suspend minor compaction to slow down filling disk.");
}

long curTime = MathUtils.now();
if (enableMajorCompaction && (!suspendMajor)
&& (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
// enter major compaction
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
doCompactEntryLogs(majorCompactionThreshold);
lastMajorCompactionTime = MathUtils.now();
// and also move minor compaction time
lastMinorCompactionTime = lastMajorCompactionTime;
majorCompactionCounter.inc();
} else if (enableMinorCompaction && (!suspendMinor)
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
// enter minor compaction
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
doCompactEntryLogs(minorCompactionThreshold);
lastMinorCompactionTime = MathUtils.now();
minorCompactionCounter.inc();
}
this.gcThreadRuntime.registerSuccessfulEvent(
MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
long curTime = MathUtils.now();
if (enableMajorCompaction && (!suspendMajor)
&& (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
// enter major compaction
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
doCompactEntryLogs(majorCompactionThreshold);
lastMajorCompactionTime = MathUtils.now();
// and also move minor compaction time
lastMinorCompactionTime = lastMajorCompactionTime;
majorCompactionCounter.inc();
} else if (enableMinorCompaction && (!suspendMinor)
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
// enter minor compaction
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
doCompactEntryLogs(minorCompactionThreshold);
lastMinorCompactionTime = MathUtils.now();
minorCompactionCounter.inc();
}
this.gcThreadRuntime.registerSuccessfulEvent(MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
} while (moreEntryLoggers);
}

/**
Expand Down Expand Up @@ -540,12 +555,18 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) {
* @throws IOException
*/
protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap) {
moreEntryLoggers = false;
// Extract it for every entry log except for the current one.
// Entry Log ID's are just a long value that starts at 0 and increments
// by 1 when the log fills up and we roll to a new one.
long curLogId = entryLogger.getLeastUnflushedLogId();
boolean hasExceptionWhenScan = false;
for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) {
int entryLogFileCount = 0;
for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++, entryLogFileCount++) {
if (entryLogFileCount > maxEntryLoggersPerScan && verifyMetadataOnGc) {
moreEntryLoggers = true;
break;
}
// Comb the current entry log file if it has not already been extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime";
protected static final String USE_TRANSACTIONAL_COMPACTION = "useTransactionalCompaction";
protected static final String VERIFY_METADATA_ON_GC = "verifyMetadataOnGC";
protected static final String MAX_ENTRY_LOGGER_SCAN_ON_GC = "maxEntryLoggerScanOnGc";
// Sync Parameters
protected static final String FLUSH_INTERVAL = "flushInterval";
protected static final String FLUSH_ENTRYLOG_INTERVAL_BYTES = "flushEntrylogBytes";
Expand Down Expand Up @@ -337,6 +338,24 @@ public ServerConfiguration setVerifyMetadataOnGc(boolean verifyMetadataOnGC) {
return this;
}

/**
* Get Max entry-logger file must be scanned while doing gc
*
*/
public int getMaxEntryLoggerScanOnGc() {
return this.getInt(MAX_ENTRY_LOGGER_SCAN_ON_GC, 10_000_000);
}

/**
* Set Max entry-logger file must be scanned while doing gc
*
* @return use transactional compaction
*/
public ServerConfiguration setMaxEntryLoggerScanOnGc(int maxEntryLoggerForScan) {
this.setProperty(MAX_ENTRY_LOGGER_SCAN_ON_GC, maxEntryLoggerForScan);
return this;
}

/**
* Get flush interval. Default value is 10 second. It isn't useful to decrease
* this value, since ledger storage only checkpoints when an entry logger file
Expand Down

0 comments on commit 785426b

Please sign in to comment.