Skip to content

Commit

Permalink
Incorporate PR review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 21, 2023
1 parent c691e08 commit 4a21b61
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private void indexDocAndRefresh(BytesReference source, int iterations) throws In
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
refresh(INDEX_NAME);
}
Thread.sleep(100);
Thread.sleep(250);
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -66,6 +67,12 @@ public class RemoteSegmentTransferTracker extends RemoteTransferTracker {
*/
private volatile long remoteRefreshTimeMs;

/**
* This is the time of first local refresh after the last successful remote refresh. When the remote store is in
* sync with local refresh, this will be reset to -1.
*/
private volatile long remoteRefreshStartTimeMs = -1;

/**
* The refresh time(clock) of most recent remote refresh.
*/
Expand Down Expand Up @@ -130,13 +137,14 @@ public RemoteSegmentTransferTracker(
long currentTimeMs = currentTimeMsUsingSystemNanos();
localRefreshTimeMs = currentTimeMs;
remoteRefreshTimeMs = currentTimeMs;
remoteRefreshStartTimeMs = currentTimeMs;
localRefreshClockTimeMs = currentClockTimeMs;
remoteRefreshClockTimeMs = currentClockTimeMs;
this.directoryFileTransferTracker = directoryFileTransferTracker;
}

public static long currentTimeMsUsingSystemNanos() {
return System.nanoTime() / 1_000_000L;
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}

@Override
Expand Down Expand Up @@ -184,13 +192,17 @@ public void updateLocalRefreshTimeAndSeqNo() {
}

// Visible for testing
void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
synchronized void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs="
+ localRefreshTimeMs
+ " < "
+ "currentLocalRefreshTimeMs="
+ this.localRefreshTimeMs;
boolean isRemoteInSyncBeforeLocalRefresh = this.localRefreshTimeMs == this.remoteRefreshTimeMs;
this.localRefreshTimeMs = localRefreshTimeMs;
if (isRemoteInSyncBeforeLocalRefresh) {
this.remoteRefreshStartTimeMs = localRefreshTimeMs;
}
}

private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
Expand Down Expand Up @@ -219,13 +231,18 @@ long getRemoteRefreshClockTimeMs() {
return remoteRefreshClockTimeMs;
}

public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
assert remoteRefreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
+ remoteRefreshTimeMs
public synchronized void updateRemoteRefreshTimeMs(long refreshTimeMs) {
assert refreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
+ refreshTimeMs
+ " < "
+ "currentRemoteRefreshTimeMs="
+ this.remoteRefreshTimeMs;
this.remoteRefreshTimeMs = remoteRefreshTimeMs;
this.remoteRefreshTimeMs = refreshTimeMs;
// When multiple refreshes have failed, there is a possibility that retry is ongoing while another refresh gets
// triggered. After the segments have been uploaded and before the below code runs, the updateLocalRefreshTimeAndSeqNo
// method is triggered, which will update the local localRefreshTimeMs. Now, the lag would basically become the
// time since the last refresh happened locally.
this.remoteRefreshStartTimeMs = refreshTimeMs == this.localRefreshTimeMs ? -1 : this.localRefreshTimeMs;
}

public void updateRemoteRefreshClockTimeMs(long remoteRefreshClockTimeMs) {
Expand All @@ -242,10 +259,9 @@ public long getRefreshSeqNoLag() {

public long getTimeMsLag() {
if (remoteRefreshTimeMs == localRefreshTimeMs) {
logger.info("remoteRefreshTimeMs={} localRefreshTimeMs={}", remoteRefreshTimeMs, localRefreshTimeMs);
return 0;
}
return currentTimeMsUsingSystemNanos() - localRefreshTimeMs;
return currentTimeMsUsingSystemNanos() - remoteRefreshStartTimeMs;
}

public long getBytesLag() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,14 @@ private TimeLagValidator(RemoteStorePressureSettings pressureSettings) {
@Override
public boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) {
if (pressureTracker.getRefreshSeqNoLag() <= 1) {
logger.info("not ready");
return true;
}
if (pressureTracker.isUploadTimeMovingAverageReady() == false) {
logger.info("upload time moving average is not ready");
return true;
}
long timeLag = pressureTracker.getTimeMsLag();
double dynamicTimeLagThreshold = pressureTracker.getUploadTimeMovingAverage() * pressureSettings
.getUploadTimeLagVarianceFactor();
logger.info("timeLag={} dynamicTimeLagThreshold={}", timeLag, dynamicTimeLagThreshold);
return timeLag <= dynamicTimeLagThreshold;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public void testComputeTimeLagOnUpdate() throws InterruptedException {
Thread.sleep(100);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - 100) <= 20);

transferTracker.updateRemoteRefreshTimeMs(transferTracker.getLocalRefreshTimeMs());
transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
long random = randomIntBetween(50, 200);
Thread.sleep(random);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public void testValidateSegmentUploadLag() throws InterruptedException {
sum.addAndGet(i);
});
double avg = (double) sum.get() / 20;
logger.info("avg={}", avg);

// We run this to ensure that the local and remote refresh time are not same anymore
while (pressureTracker.getLocalRefreshTimeMs() == currentTimeMsUsingSystemNanos()) {
Expand All @@ -108,6 +107,7 @@ public void testValidateSegmentUploadLag() throws InterruptedException {
Matcher matcher = pattern.matcher(e.getMessage());
assertTrue(matcher.matches());

pressureTracker.updateRemoteRefreshTimeMs(pressureTracker.getLocalRefreshTimeMs());
pressureTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
Thread.sleep((long) (2 * avg));
pressureService.validateSegmentsUploadLag(shardId);
Expand Down

0 comments on commit 4a21b61

Please sign in to comment.