Skip to content

Commit

Permalink
[Remote Store] Use time elapsed since last successful local refresh f…
Browse files Browse the repository at this point in the history
…or time lag
  • Loading branch information
ashking94 committed Oct 20, 2023
1 parent 1e28738 commit bf70f20
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testWritesRejectedDueToBytesLagBreach() throws Exception {
public void testWritesRejectedDueToTimeLagBreach() throws Exception {
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag");
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 3, "time_lag");
}

private void validateBackpressure(
Expand Down Expand Up @@ -133,11 +133,13 @@ private RemoteSegmentTransferTracker.Stats stats() {
return matches.get(0).getSegmentStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
private void indexDocAndRefresh(BytesReference source, int iterations) throws InterruptedException {
for (int i = 0; i < iterations; i++) {
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
refresh(INDEX_NAME);
}
Thread.sleep(100);
client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ public class RemoteSegmentTransferTracker extends RemoteTransferTracker {
*/
private volatile long refreshSeqNoLag;

/**
* Keeps the time (ms) lag computed so that we do not compute it for every request.
*/
private volatile long timeMsLag;

/**
* Keeps track of the total bytes of segment files which were uploaded to remote store during last successful remote refresh
*/
Expand Down Expand Up @@ -132,14 +127,18 @@ public RemoteSegmentTransferTracker(
logger = Loggers.getLogger(getClass(), shardId);
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
long currentClockTimeMs = System.currentTimeMillis();
long currentTimeMs = System.nanoTime() / 1_000_000L;
long currentTimeMs = currentTimeMsUsingSystemNanos();
localRefreshTimeMs = currentTimeMs;
remoteRefreshTimeMs = currentTimeMs;
localRefreshClockTimeMs = currentClockTimeMs;
remoteRefreshClockTimeMs = currentClockTimeMs;
this.directoryFileTransferTracker = directoryFileTransferTracker;
}

public static long currentTimeMsUsingSystemNanos() {
return System.nanoTime() / 1_000_000L;
}

@Override
public void incrementTotalUploadsFailed() {
super.incrementTotalUploadsFailed();
Expand Down Expand Up @@ -180,7 +179,7 @@ public long getLocalRefreshClockTimeMs() {
*/
public void updateLocalRefreshTimeAndSeqNo() {
updateLocalRefreshClockTimeMs(System.currentTimeMillis());
updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
updateLocalRefreshSeqNo(getLocalRefreshSeqNo() + 1);
}

Expand All @@ -192,7 +191,6 @@ void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
+ "currentLocalRefreshTimeMs="
+ this.localRefreshTimeMs;
this.localRefreshTimeMs = localRefreshTimeMs;
computeTimeMsLag();
}

private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
Expand Down Expand Up @@ -228,7 +226,6 @@ public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
+ "currentRemoteRefreshTimeMs="
+ this.remoteRefreshTimeMs;
this.remoteRefreshTimeMs = remoteRefreshTimeMs;
computeTimeMsLag();
}

public void updateRemoteRefreshClockTimeMs(long remoteRefreshClockTimeMs) {
Expand All @@ -243,12 +240,11 @@ public long getRefreshSeqNoLag() {
return refreshSeqNoLag;
}

private void computeTimeMsLag() {
timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs;
}

public long getTimeMsLag() {
return timeMsLag;
if (remoteRefreshTimeMs == localRefreshTimeMs) {
return 0;
}
return currentTimeMsUsingSystemNanos() - localRefreshTimeMs;
}

public long getBytesLag() {
Expand Down Expand Up @@ -354,7 +350,7 @@ public RemoteSegmentTransferTracker.Stats stats() {
shardId,
localRefreshClockTimeMs,
remoteRefreshClockTimeMs,
timeMsLag,
getTimeMsLag(),
localRefreshSeqNo,
remoteRefreshSeqNo,
uploadBytesStarted.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos;

public class RemoteSegmentTransferTrackerTests extends OpenSearchTestCase {
private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;
private ClusterService clusterService;
Expand Down Expand Up @@ -92,7 +94,7 @@ public void testUpdateLocalRefreshTimeMs() {
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long refreshTimeMs = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100);
long refreshTimeMs = currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100);
transferTracker.updateLocalRefreshTimeMs(refreshTimeMs);
assertEquals(refreshTimeMs, transferTracker.getLocalRefreshTimeMs());
}
Expand All @@ -103,7 +105,7 @@ public void testUpdateRemoteRefreshTimeMs() {
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long refreshTimeMs = System.nanoTime() / 1_000_000 + randomIntBetween(10, 100);
long refreshTimeMs = currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100);
transferTracker.updateRemoteRefreshTimeMs(refreshTimeMs);
assertEquals(refreshTimeMs, transferTracker.getRemoteRefreshTimeMs());
}
Expand Down Expand Up @@ -133,20 +135,28 @@ public void testComputeSeqNoLagOnUpdate() {
assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, transferTracker.getRefreshSeqNoLag());
}

public void testComputeTimeLagOnUpdate() {
public void testComputeTimeLagOnUpdate() throws InterruptedException {
transferTracker = new RemoteSegmentTransferTracker(
shardId,
directoryFileTransferTracker,
remoteStoreStatsTrackerFactory.getMovingAverageWindowSize()
);
long currentLocalRefreshTimeMs = transferTracker.getLocalRefreshTimeMs();
long currentTimeMs = System.nanoTime() / 1_000_000L;
long localRefreshTimeMs = currentTimeMs + randomIntBetween(100, 500);
long remoteRefreshTimeMs = currentTimeMs + randomIntBetween(50, 99);
transferTracker.updateLocalRefreshTimeMs(localRefreshTimeMs);
assertEquals(localRefreshTimeMs - currentLocalRefreshTimeMs, transferTracker.getTimeMsLag());
transferTracker.updateRemoteRefreshTimeMs(remoteRefreshTimeMs);
assertEquals(localRefreshTimeMs - remoteRefreshTimeMs, transferTracker.getTimeMsLag());

// No lag if there is a remote upload corresponding to a local refresh
assertEquals(0, transferTracker.getTimeMsLag());

// Set a local refresh time that is higher than remote refresh time
Thread.sleep(1);
transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());

// Sleep for 100ms and then the lag should be within 100ms +/- 20ms
Thread.sleep(100);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - 100) <= 20);

transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos());
long random = randomIntBetween(50, 200);
Thread.sleep(random);
assertTrue(Math.abs(transferTracker.getTimeMsLag() - random) <= 20);
}

public void testAddUploadBytesStarted() {
Expand Down Expand Up @@ -591,9 +601,9 @@ private RemoteSegmentTransferTracker constructTracker() {
);
transferTracker.incrementTotalUploadsStarted();
transferTracker.incrementTotalUploadsFailed();
transferTracker.updateUploadTimeMovingAverage(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100));
transferTracker.updateUploadTimeMovingAverage(currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100));
transferTracker.updateUploadBytesMovingAverage(99);
transferTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100));
transferTracker.updateRemoteRefreshTimeMs(currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100));
transferTracker.incrementRejectionCount();
transferTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(10);
transferTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(10, System.currentTimeMillis());
Expand Down

0 comments on commit bf70f20

Please sign in to comment.