diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java index 98586b60dcc69..da942fba67bd8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java @@ -56,7 +56,7 @@ public void testWritesRejectedDueToBytesLagBreach() throws Exception { public void testWritesRejectedDueToTimeLagBreach() throws Exception { // Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the // indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections. - validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag"); + validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 3, "time_lag"); } private void validateBackpressure( @@ -133,11 +133,13 @@ private RemoteSegmentTransferTracker.Stats stats() { return matches.get(0).getSegmentStats(); } - private void indexDocAndRefresh(BytesReference source, int iterations) { + private void indexDocAndRefresh(BytesReference source, int iterations) throws InterruptedException { for (int i = 0; i < iterations; i++) { client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get(); refresh(INDEX_NAME); } + Thread.sleep(100); + client().prepareIndex(INDEX_NAME).setSource(source, MediaTypeRegistry.JSON).get(); } /** diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 2a703f17aa953..a12242f17c808 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -76,11 +76,6 @@ public class RemoteSegmentTransferTracker extends RemoteTransferTracker { */ private volatile long refreshSeqNoLag; - /** - * Keeps the time (ms) lag computed so that we do not compute it for every request. - */ - private volatile long timeMsLag; - /** * Keeps track of the total bytes of segment files which were uploaded to remote store during last successful remote refresh */ @@ -132,7 +127,7 @@ public RemoteSegmentTransferTracker( logger = Loggers.getLogger(getClass(), shardId); // Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises. long currentClockTimeMs = System.currentTimeMillis(); - long currentTimeMs = System.nanoTime() / 1_000_000L; + long currentTimeMs = currentTimeMsUsingSystemNanos(); localRefreshTimeMs = currentTimeMs; remoteRefreshTimeMs = currentTimeMs; localRefreshClockTimeMs = currentClockTimeMs; @@ -140,6 +135,10 @@ public RemoteSegmentTransferTracker( this.directoryFileTransferTracker = directoryFileTransferTracker; } + public static long currentTimeMsUsingSystemNanos() { + return System.nanoTime() / 1_000_000L; + } + @Override public void incrementTotalUploadsFailed() { super.incrementTotalUploadsFailed(); @@ -180,7 +179,7 @@ public long getLocalRefreshClockTimeMs() { */ public void updateLocalRefreshTimeAndSeqNo() { updateLocalRefreshClockTimeMs(System.currentTimeMillis()); - updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L); + updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos()); updateLocalRefreshSeqNo(getLocalRefreshSeqNo() + 1); } @@ -192,7 +191,6 @@ void updateLocalRefreshTimeMs(long localRefreshTimeMs) { + "currentLocalRefreshTimeMs=" + this.localRefreshTimeMs; this.localRefreshTimeMs = localRefreshTimeMs; - computeTimeMsLag(); } private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) { @@ -228,7 +226,6 @@ public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { + "currentRemoteRefreshTimeMs=" + this.remoteRefreshTimeMs; this.remoteRefreshTimeMs = remoteRefreshTimeMs; - computeTimeMsLag(); } public void updateRemoteRefreshClockTimeMs(long remoteRefreshClockTimeMs) { @@ -243,12 +240,12 @@ public long getRefreshSeqNoLag() { return refreshSeqNoLag; } - private void computeTimeMsLag() { - timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs; - } - public long getTimeMsLag() { - return timeMsLag; + if (remoteRefreshTimeMs == localRefreshTimeMs) { + logger.info("remoteRefreshTimeMs={} localRefreshTimeMs={}", remoteRefreshTimeMs, localRefreshTimeMs); + return 0; + } + return currentTimeMsUsingSystemNanos() - localRefreshTimeMs; } public long getBytesLag() { @@ -354,7 +351,7 @@ public RemoteSegmentTransferTracker.Stats stats() { shardId, localRefreshClockTimeMs, remoteRefreshClockTimeMs, - timeMsLag, + getTimeMsLag(), localRefreshSeqNo, remoteRefreshSeqNo, uploadBytesStarted.get(), diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index 2920b33921869..cd11c05e1521f 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -177,15 +177,17 @@ private TimeLagValidator(RemoteStorePressureSettings pressureSettings) { @Override public boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) { if (pressureTracker.getRefreshSeqNoLag() <= 1) { + logger.info("not ready"); return true; } if (pressureTracker.isUploadTimeMovingAverageReady() == false) { - logger.trace("upload time moving average is not ready"); + logger.info("upload time moving average is not ready"); return true; } long timeLag = pressureTracker.getTimeMsLag(); double dynamicTimeLagThreshold = pressureTracker.getUploadTimeMovingAverage() * pressureSettings .getUploadTimeLagVarianceFactor(); + logger.info("timeLag={} dynamicTimeLagThreshold={}", timeLag, dynamicTimeLagThreshold); return timeLag <= dynamicTimeLagThreshold; } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java index 0bf00f9e48137..d1c258aedce04 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.Map; +import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos; + public class RemoteSegmentTransferTrackerTests extends OpenSearchTestCase { private RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private ClusterService clusterService; @@ -92,7 +94,7 @@ public void testUpdateLocalRefreshTimeMs() { directoryFileTransferTracker, remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - long refreshTimeMs = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100); + long refreshTimeMs = currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100); transferTracker.updateLocalRefreshTimeMs(refreshTimeMs); assertEquals(refreshTimeMs, transferTracker.getLocalRefreshTimeMs()); } @@ -103,7 +105,7 @@ public void testUpdateRemoteRefreshTimeMs() { directoryFileTransferTracker, remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - long refreshTimeMs = System.nanoTime() / 1_000_000 + randomIntBetween(10, 100); + long refreshTimeMs = currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100); transferTracker.updateRemoteRefreshTimeMs(refreshTimeMs); assertEquals(refreshTimeMs, transferTracker.getRemoteRefreshTimeMs()); } @@ -133,20 +135,28 @@ public void testComputeSeqNoLagOnUpdate() { assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, transferTracker.getRefreshSeqNoLag()); } - public void testComputeTimeLagOnUpdate() { + public void testComputeTimeLagOnUpdate() throws InterruptedException { transferTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, remoteStoreStatsTrackerFactory.getMovingAverageWindowSize() ); - long currentLocalRefreshTimeMs = transferTracker.getLocalRefreshTimeMs(); - long currentTimeMs = System.nanoTime() / 1_000_000L; - long localRefreshTimeMs = currentTimeMs + randomIntBetween(100, 500); - long remoteRefreshTimeMs = currentTimeMs + randomIntBetween(50, 99); - transferTracker.updateLocalRefreshTimeMs(localRefreshTimeMs); - assertEquals(localRefreshTimeMs - currentLocalRefreshTimeMs, transferTracker.getTimeMsLag()); - transferTracker.updateRemoteRefreshTimeMs(remoteRefreshTimeMs); - assertEquals(localRefreshTimeMs - remoteRefreshTimeMs, transferTracker.getTimeMsLag()); + + // No lag if there is a remote upload corresponding to a local refresh + assertEquals(0, transferTracker.getTimeMsLag()); + + // Set a local refresh time that is higher than remote refresh time + Thread.sleep(1); + transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos()); + + // Sleep for 100ms and then the lag should be within 100ms +/- 20ms + Thread.sleep(100); + assertTrue(Math.abs(transferTracker.getTimeMsLag() - 100) <= 20); + + transferTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos()); + long random = randomIntBetween(50, 200); + Thread.sleep(random); + assertTrue(Math.abs(transferTracker.getTimeMsLag() - random) <= 20); } public void testAddUploadBytesStarted() { @@ -519,7 +529,7 @@ public void testStatsObjectCreation() { transferTracker = constructTracker(); RemoteSegmentTransferTracker.Stats transferTrackerStats = transferTracker.stats(); assertEquals(transferTracker.getShardId(), transferTrackerStats.shardId); - assertEquals(transferTracker.getTimeMsLag(), (int) transferTrackerStats.refreshTimeLagMs); + assertTrue(Math.abs(transferTracker.getTimeMsLag() - transferTrackerStats.refreshTimeLagMs) <= 20); assertEquals(transferTracker.getLocalRefreshSeqNo(), (int) transferTrackerStats.localRefreshNumber); assertEquals(transferTracker.getRemoteRefreshSeqNo(), (int) transferTrackerStats.remoteRefreshNumber); assertEquals(transferTracker.getBytesLag(), (int) transferTrackerStats.bytesLag); @@ -591,9 +601,9 @@ private RemoteSegmentTransferTracker constructTracker() { ); transferTracker.incrementTotalUploadsStarted(); transferTracker.incrementTotalUploadsFailed(); - transferTracker.updateUploadTimeMovingAverage(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); + transferTracker.updateUploadTimeMovingAverage(currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100)); transferTracker.updateUploadBytesMovingAverage(99); - transferTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); + transferTracker.updateRemoteRefreshTimeMs(currentTimeMsUsingSystemNanos() + randomIntBetween(10, 100)); transferTracker.incrementRejectionCount(); transferTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(10); transferTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(10, System.currentTimeMillis()); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index de610083f3327..27ee8fe7c80f4 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -21,8 +21,11 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.IntStream; +import static org.opensearch.index.remote.RemoteSegmentTransferTracker.currentTimeMsUsingSystemNanos; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexShard; public class RemoteStorePressureServiceTests extends OpenSearchTestCase { @@ -68,7 +71,7 @@ public void testIsSegmentsUploadBackpressureEnabled() { assertTrue(pressureService.isSegmentsUploadBackpressureEnabled()); } - public void testValidateSegmentUploadLag() { + public void testValidateSegmentUploadLag() throws InterruptedException { // Create the pressure tracker IndexShard indexShard = createIndexShard(shardId, true); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY); @@ -86,14 +89,27 @@ public void testValidateSegmentUploadLag() { sum.addAndGet(i); }); double avg = (double) sum.get() / 20; - long currentMs = System.nanoTime() / 1_000_000; - pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 12 * avg)); - pressureTracker.updateRemoteRefreshTimeMs(currentMs); + logger.info("avg={}", avg); + + // We run this to ensure that the local and remote refresh time are not same anymore + while (pressureTracker.getLocalRefreshTimeMs() == currentTimeMsUsingSystemNanos()) { + Thread.sleep(10); + } + long localRefreshTimeMs = currentTimeMsUsingSystemNanos(); + pressureTracker.updateLocalRefreshTimeMs(localRefreshTimeMs); + + while (currentTimeMsUsingSystemNanos() - localRefreshTimeMs <= 20 * avg) { + Thread.sleep((long) (4 * avg)); + } Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); - assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("time_lag:114 ms dynamic_time_lag_threshold:95.0 ms")); - - pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg)); + String regex = "^rejected execution on primary shard:\\[index]\\[0] due to remote segments lagging behind " + + "local segments.time_lag:[0-9]{2,3} ms dynamic_time_lag_threshold:95\\.0 ms$"; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(e.getMessage()); + assertTrue(matcher.matches()); + + pressureTracker.updateLocalRefreshTimeMs(currentTimeMsUsingSystemNanos()); + Thread.sleep((long) (2 * avg)); pressureService.validateSegmentsUploadLag(shardId); // 2. bytes lag more than dynamic threshold