Skip to content

Commit

Permalink
[Remote Store] Use time elapsed since last successful local refresh f…
Browse files Browse the repository at this point in the history
…or time lag

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 21, 2023
1 parent 1e28738 commit c691e08
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testWritesRejectedDueToBytesLagBreach() throws Exception {
public void testWritesRejectedDueToTimeLagBreach() throws Exception {
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag");
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 3, "time_lag");
}

private void validateBackpressure(
Expand Down Expand Up @@ -133,11 +133,13 @@ private RemoteSegmentTransferTracker.Stats stats() {
return matches.get(0).getSegmentStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
private void indexDocAndRefresh(BytesReference source, int iterations) throws InterruptedException {
for (int i = 0; i < iterations; i++) {
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
refresh(INDEX_NAME);
}
Thread.sleep(100);
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ public class RemoteSegmentTransferTracker extends RemoteTransferTracker {
*/
private volatile long refreshSeqNoLag;

/**
* Keeps the time (ms) lag computed so that we do not compute it for every request.
*/
private volatile long timeMsLag;

/**
* Keeps track of the total bytes of segment files which were uploaded to remote store during last successful remote refresh
*/
Expand Down Expand Up @@ -132,14 +127,18 @@ public RemoteSegmentTransferTracker(
logger = Loggers.getLogger(getClass(), shardId);
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
long currentClockTimeMs = System.currentTimeMillis();
long currentTimeMs = System.nanoTime() / 1_000_000L;
long currentTimeMs = currentTimeMsUsingSystemNanos();
localRefreshTimeMs = currentTimeMs;
remoteRefreshTimeMs = currentTimeMs;
localRefreshClockTimeMs = currentClockTimeMs;
remoteRefreshClockTimeMs = currentClockTimeMs;
this.directoryFileTransferTracker = directoryFileTransferTracker;
}

public static long currentTimeMsUsingSystemNanos() {
return System.nanoTime() / 1_000_000L;
}

@Override
public void incrementTotalUploadsFailed() {
super.incrementTotalUploadsFailed();
Expand Down Expand Up @@ -180,7 +179,7 @@ public long getLocalRefreshClockTimeMs() {
*/
public void updateLocalRefreshTimeAndSeqNo() {
updateLocalRefreshClockTimeMs(System.currentTimeMillis());
updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
updateLocalRefreshSeqNo(getLocalRefreshSeqNo() + 1);
}

Expand All @@ -192,7 +191,6 @@ void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
+ "currentLocalRefreshTimeMs="
+ this.localRefreshTimeMs;
this.localRefreshTimeMs = localRefreshTimeMs;
computeTimeMsLag();
}

private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
Expand Down Expand Up @@ -228,7 +226,6 @@ public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
+ "currentRemoteRefreshTimeMs="
+ this.remoteRefreshTimeMs;
this.remoteRefreshTimeMs = remoteRefreshTimeMs;
computeTimeMsLag();
}

public void updateRemoteRefreshClockTimeMs(long remoteRefreshClockTimeMs) {
Expand All @@ -243,12 +240,12 @@ public long getRefreshSeqNoLag() {
return refreshSeqNoLag;
}

private void computeTimeMsLag() {
timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs;
}

public long getTimeMsLag() {
return timeMsLag;
if (remoteRefreshTimeMs == localRefreshTimeMs) {
logger.info("remoteRefreshTimeMs={} localRefreshTimeMs={}", remoteRefreshTimeMs, localRefreshTimeMs);
return 0;
}
return currentTimeMsUsingSystemNanos() - localRefreshTimeMs;
}

public long getBytesLag() {
Expand Down Expand Up @@ -354,7 +351,7 @@ public RemoteSegmentTransferTracker.Stats stats() {
shardId,
localRefreshClockTimeMs,
remoteRefreshClockTimeMs,
timeMsLag,
getTimeMsLag(),
localRefreshSeqNo,
remoteRefreshSeqNo,
uploadBytesStarted.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,17 @@ private TimeLagValidator(RemoteStorePressureSettings pressureSettings) {
@Override
public boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) {
if (pressureTracker.getRefreshSeqNoLag() <= 1) {
logger.info("not ready");
return true;
}
if (pressureTracker.isUploadTimeMovingAverageReady() == false) {
logger.trace("upload time moving average is not ready");
logger.info("upload time moving average is not ready");
return true;
}
long timeLag = pressureTracker.getTimeMsLag();
double dynamicTimeLagThreshold = pressureTracker.getUploadTimeMovingAverage() * pressureSettings
.getUploadTimeLagVarianceFactor();
logger.info("timeLag={} dynamicTimeLagThreshold={}", timeLag, dynamicTimeLagThreshold);
return timeLag <= dynamicTimeLagThreshold;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos;

public class RemoteSegmentTransferTrackerTests extends OpenSearchTestCase {
private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;
private ClusterService clusterService;
Expand Down Expand Up @@ -92,7 +94,7 @@ public void testUpdateLocalRefreshTimeMs() {
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long refreshTimeMs = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100);
long refreshTimeMs = currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100);
transferTracker.updateLocalRefreshTimeMs(refreshTimeMs);
assertEquals(refreshTimeMs, transferTracker.getLocalRefreshTimeMs());
}
Expand All @@ -103,7 +105,7 @@ public void testUpdateRemoteRefreshTimeMs() {
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long refreshTimeMs = System.nanoTime() / 1_000_000 + randomIntBetween(10, 100);
long refreshTimeMs = currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100);
transferTracker.updateRemoteRefreshTimeMs(refreshTimeMs);
assertEquals(refreshTimeMs, transferTracker.getRemoteRefreshTimeMs());
}
Expand Down Expand Up @@ -133,20 +135,28 @@ public void testComputeSeqNoLagOnUpdate() {
assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, transferTracker.getRefreshSeqNoLag());
}

public void testComputeTimeLagOnUpdate() {
public void testComputeTimeLagOnUpdate() throws InterruptedException {
transferTracker = new RemoteSegmentTransferTracker(
shardId,
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long currentLocalRefreshTimeMs = transferTracker.getLocalRefreshTimeMs();
long currentTimeMs = System.nanoTime() / 1_000_000L;
long localRefreshTimeMs = currentTimeMs + randomIntBetween(100, 500);
long remoteRefreshTimeMs = currentTimeMs + randomIntBetween(50, 99);
transferTracker.updateLocalRefreshTimeMs(localRefreshTimeMs);
assertEquals(localRefreshTimeMs - currentLocalRefreshTimeMs, transferTracker.getTimeMsLag());
transferTracker.updateRemoteRefreshTimeMs(remoteRefreshTimeMs);
assertEquals(localRefreshTimeMs - remoteRefreshTimeMs, transferTracker.getTimeMsLag());

// No lag if there is a remote upload corresponding to a local refresh
assertEquals(0, transferTracker.getTimeMsLag());

// Set a local refresh time that is higher than remote refresh time
Thread.sleep(1);
transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());

// Sleep for 100ms and then the lag should be within 100ms +/- 20ms
Thread.sleep(100);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - 100) <= 20);

transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
long random = randomIntBetween(50, 200);
Thread.sleep(random);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - random) <= 20);
}

public void testAddUploadBytesStarted() {
Expand Down Expand Up @@ -519,7 +529,7 @@ public void testStatsObjectCreation() {
transferTracker = constructTracker();
RemoteSegmentTransferTracker.Stats transferTrackerStats = transferTracker.stats();
assertEquals(transferTracker.getShardId(), transferTrackerStats.shardId);
assertEquals(transferTracker.getTimeMsLag(), (int) transferTrackerStats.refreshTimeLagMs);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - transferTrackerStats.refreshTimeLagMs) <= 20);
assertEquals(transferTracker.getLocalRefreshSeqNo(), (int) transferTrackerStats.localRefreshNumber);
assertEquals(transferTracker.getRemoteRefreshSeqNo(), (int) transferTrackerStats.remoteRefreshNumber);
assertEquals(transferTracker.getBytesLag(), (int) transferTrackerStats.bytesLag);
Expand Down Expand Up @@ -591,9 +601,9 @@ private RemoteSegmentTransferTracker constructTracker() {
);
transferTracker.incrementTotalUploadsStarted();
transferTracker.incrementTotalUploadsFailed();
transferTracker.updateUploadTimeMovingAverage(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100));
transferTracker.updateUploadTimeMovingAverage(currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100));
transferTracker.updateUploadBytesMovingAverage(99);
transferTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100));
transferTracker.updateRemoteRefreshTimeMs(currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100));
transferTracker.incrementRejectionCount();
transferTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(10);
transferTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(10, System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.IntStream;

import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos;
import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexShard;

public class RemoteStorePressureServiceTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -68,7 +71,7 @@ public void testIsSegmentsUploadBackpressureEnabled() {
assertTrue(pressureService.isSegmentsUploadBackpressureEnabled());
}

public void testValidateSegmentUploadLag() {
public void testValidateSegmentUploadLag() throws InterruptedException {
// Create the pressure tracker
IndexShard indexShard = createIndexShard(shardId, true);
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY);
Expand All @@ -86,14 +89,27 @@ public void testValidateSegmentUploadLag() {
sum.addAndGet(i);
});
double avg = (double) sum.get() / 20;
long currentMs = System.nanoTime() / 1_000_000;
pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 12 * avg));
pressureTracker.updateRemoteRefreshTimeMs(currentMs);
logger.info("avg={}", avg);

// We run this to ensure that the local and remote refresh time are not same anymore
while (pressureTracker.getLocalRefreshTimeMs() == currentTimeMsUsingSystemNanos()) {
Thread.sleep(10);
}
long localRefreshTimeMs = currentTimeMsUsingSystemNanos();
pressureTracker.updateLocalRefreshTimeMs(localRefreshTimeMs);

while (currentTimeMsUsingSystemNanos() - localRefreshTimeMs <= 20 * avg) {
Thread.sleep((long) (4 * avg));
}
Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId));
assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments"));
assertTrue(e.getMessage().contains("time_lag:114 ms dynamic_time_lag_threshold:95.0 ms"));

pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg));
String regex = "^rejected execution on primary shard:\\[index]\\[0] due to remote segments lagging behind "
+ "local segments.time_lag:[0-9]{2,3} ms dynamic_time_lag_threshold:95\\.0 ms$";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(e.getMessage());
assertTrue(matcher.matches());

pressureTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
Thread.sleep((long) (2 * avg));
pressureService.validateSegmentsUploadLag(shardId);

// 2. bytes lag more than dynamic threshold
Expand Down

0 comments on commit c691e08

Please sign in to comment.