From 621432dbcf0520c35e113b6812245a87e379e967 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 23 Jul 2024 02:13:20 +0530 Subject: [PATCH] Reduce logging in DEBUG for MasterService:run (#14795) (#14864) * Reduce logging in DEBUG for MasteService:run by introducing short and long summary in Taskbatcher Signed-off-by: Sumit Bansal (cherry picked from commit b35690c886f42d2ca01fa3081e80cb4ba4aa19d9) Signed-off-by: Sumit Bansal Signed-off-by: kkewwei --- CHANGELOG.md | 1 + .../cluster/service/MasterService.java | 55 ++-- .../cluster/service/TaskBatcher.java | 28 +- .../cluster/service/MasterServiceTests.java | 285 ++++++++++++++++-- .../cluster/service/TaskBatcherTests.java | 3 +- 5 files changed, 309 insertions(+), 63 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d470c334576b..c6138af7bbd4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800))) - Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749)) - Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668)) +- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795)) ### Dependencies - Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576)) diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 686e9793a8fd3..4ab8255df7658 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -84,6 +84,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -221,10 +222,10 @@ protected void onTimeout(List tasks, TimeValue timeout) { } @Override - protected void run(Object batchingKey, List tasks, String tasksSummary) { + protected void run(Object batchingKey, List tasks, Function taskSummaryGenerator) { ClusterStateTaskExecutor taskExecutor = (ClusterStateTaskExecutor) batchingKey; List updateTasks = (List) tasks; - runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary)); + runTasks(new TaskInputs(taskExecutor, updateTasks, taskSummaryGenerator)); } class UpdateTask extends BatchedTask { @@ -297,26 +298,33 @@ public static boolean assertNotMasterUpdateThread(String reason) { } private void runTasks(TaskInputs taskInputs) { - final String summary = taskInputs.summary; + final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : ""; + final String shortSummary = taskInputs.taskSummaryGenerator.apply(false); + if (!lifecycle.started()) { - logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary); + logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary); return; } - logger.debug("executing cluster state update for [{}]", summary); + if (logger.isTraceEnabled()) { + logger.trace("executing cluster state update for [{}]", longSummary); + } else { + logger.debug("executing cluster state update for [{}]", shortSummary); + } + final ClusterState previousClusterState = state(); if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) { - logger.debug("failing [{}]: local node is no longer cluster-manager", summary); + logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary); taskInputs.onNoLongerClusterManager(); return; } final long computationStartTime = threadPool.preciseRelativeTimeInNanos(); - final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState); + final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary); taskOutputs.notifyFailedTasks(); final TimeValue computationTime = getTimeSince(computationStartTime); - logExecutionTime(computationTime, "compute cluster state update", summary); + logExecutionTime(computationTime, "compute cluster state update", shortSummary); clusterManagerMetrics.recordLatency( clusterManagerMetrics.clusterStateComputeHistogram, @@ -328,17 +336,17 @@ private void runTasks(TaskInputs taskInputs) { final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); final TimeValue executionTime = getTimeSince(notificationStartTime); - logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); + logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary); } else { final ClusterState newClusterState = taskOutputs.newClusterState; if (logger.isTraceEnabled()) { - logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); + logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState); } else { - logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary); } final long publicationStartTime = threadPool.preciseRelativeTimeInNanos(); try { - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { @@ -346,7 +354,7 @@ private void runTasks(TaskInputs taskInputs) { if (nodesDeltaSummary.length() > 0) { logger.info( "{}, term: {}, version: {}, delta: {}", - summary, + shortSummary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary @@ -357,7 +365,7 @@ private void runTasks(TaskInputs taskInputs) { logger.debug("publishing cluster state version [{}]", newClusterState.version()); publish(clusterChangedEvent, taskOutputs, publicationStartTime); } catch (Exception e) { - handleException(summary, publicationStartTime, newClusterState, e); + handleException(shortSummary, publicationStartTime, newClusterState, e); } } } @@ -452,8 +460,8 @@ private void handleException(String summary, long startTimeMillis, ClusterState // TODO: do we want to call updateTask.onFailure here? } - private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) { - ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, previousClusterState); + private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) { + ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary); ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult); return new TaskOutputs( taskInputs, @@ -897,7 +905,7 @@ public void onTimeout() { } } - private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) { + private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) { ClusterTasksResult clusterTasksResult; try { List inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); @@ -913,7 +921,7 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterSt "failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}", previousClusterState.version(), previousClusterState.stateUUID(), - taskInputs.summary, + taskSummary, previousClusterState.nodes(), previousClusterState.routingTable(), previousClusterState.getRoutingNodes() @@ -955,14 +963,19 @@ private List getNonFailedTasks(TaskInputs taskInputs, Cluste * Represents a set of tasks to be processed together with their executor */ private class TaskInputs { - final String summary; + final List updateTasks; final ClusterStateTaskExecutor executor; + final Function taskSummaryGenerator; - TaskInputs(ClusterStateTaskExecutor executor, List updateTasks, String summary) { - this.summary = summary; + TaskInputs( + ClusterStateTaskExecutor executor, + List updateTasks, + final Function taskSummaryGenerator + ) { this.executor = executor; this.updateTasks = updateTasks; + this.taskSummaryGenerator = taskSummaryGenerator; } boolean runOnlyWhenClusterManager() { diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java index 5e58f495a16fb..3513bfffb7157 100644 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java @@ -177,7 +177,6 @@ void runIfNotProcessed(BatchedTask updateTask) { // to give other tasks with different batching key a chance to execute. if (updateTask.processed.get() == false) { final List toExecute = new ArrayList<>(); - final Map> processTasksBySource = new HashMap<>(); // While removing task, need to remove task first from taskMap and then remove identity from identityMap. // Changing this order might lead to duplicate task during submission. LinkedHashSet pending = tasksPerBatchingKey.remove(updateTask.batchingKey); @@ -187,7 +186,6 @@ void runIfNotProcessed(BatchedTask updateTask) { if (task.processed.getAndSet(true) == false) { logger.trace("will process {}", task); toExecute.add(task); - processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); } else { logger.trace("skipping {}, already processed", task); } @@ -195,22 +193,34 @@ void runIfNotProcessed(BatchedTask updateTask) { } if (toExecute.isEmpty() == false) { - final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> { - String tasks = updateTask.describeTasks(entry.getValue()); - return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; - }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); - + Function taskSummaryGenerator = (longSummaryRequired) -> { + if (longSummaryRequired == null || !longSummaryRequired) { + return buildShortSummary(updateTask.batchingKey, toExecute.size()); + } + final Map> processTasksBySource = new HashMap<>(); + for (final BatchedTask task : toExecute) { + processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); + } + return processTasksBySource.entrySet().stream().map(entry -> { + String tasks = updateTask.describeTasks(entry.getValue()); + return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; + }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); + }; taskBatcherListener.onBeginProcessing(toExecute); - run(updateTask.batchingKey, toExecute, tasksSummary); + run(updateTask.batchingKey, toExecute, taskSummaryGenerator); } } } + private String buildShortSummary(final Object batchingKey, final int taskCount) { + return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount; + } + /** * Action to be implemented by the specific batching implementation * All tasks have the given batching key. */ - protected abstract void run(Object batchingKey, List tasks, String tasksSummary); + protected abstract void run(Object batchingKey, List tasks, Function taskSummaryGenerator); /** * Represents a runnable task that supports batching. diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 8c84ac365dfd1..7562dfc2e9d33 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -376,13 +376,13 @@ public void onFailure(String source, Exception e) {} } @TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level") - public void testClusterStateUpdateLogging() throws Exception { + public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 start", MasterService.class.getCanonicalName(), - Level.DEBUG, + Level.TRACE, "executing cluster state update for [test1]" ) ); @@ -391,7 +391,7 @@ public void testClusterStateUpdateLogging() throws Exception { "test1 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [1s] to compute cluster state update for [test1]" + "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); mockAppender.addExpectation( @@ -399,7 +399,7 @@ public void testClusterStateUpdateLogging() throws Exception { "test1 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [test1]" + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); @@ -407,7 +407,7 @@ public void testClusterStateUpdateLogging() throws Exception { new MockLogAppender.SeenEventExpectation( "test2 start", MasterService.class.getCanonicalName(), - Level.DEBUG, + Level.TRACE, "executing cluster state update for [test2]" ) ); @@ -416,7 +416,7 @@ public void testClusterStateUpdateLogging() throws Exception { "test2 failure", MasterService.class.getCanonicalName(), Level.TRACE, - "failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*" + "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]*" ) ); mockAppender.addExpectation( @@ -424,7 +424,7 @@ public void testClusterStateUpdateLogging() throws Exception { "test2 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [2s] to compute cluster state update for [test2]" + "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); mockAppender.addExpectation( @@ -432,7 +432,7 @@ public void testClusterStateUpdateLogging() throws Exception { "test2 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [test2]" + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); @@ -440,7 +440,7 @@ public void testClusterStateUpdateLogging() throws Exception { new MockLogAppender.SeenEventExpectation( "test3 start", MasterService.class.getCanonicalName(), - Level.DEBUG, + Level.TRACE, "executing cluster state update for [test3]" ) ); @@ -449,7 +449,7 @@ public void testClusterStateUpdateLogging() throws Exception { "test3 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [3s] to compute cluster state update for [test3]" + "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); mockAppender.addExpectation( @@ -457,7 +457,7 @@ public void testClusterStateUpdateLogging() throws Exception { "test3 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]" + "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); @@ -465,7 +465,7 @@ public void testClusterStateUpdateLogging() throws Exception { new MockLogAppender.SeenEventExpectation( "test4", MasterService.class.getCanonicalName(), - Level.DEBUG, + Level.TRACE, "executing cluster state update for [test4]" ) ); @@ -540,6 +540,171 @@ public void onFailure(String source, Exception e) { } } + @TestLogging(value = "org.opensearch.cluster.service:DEBUG", reason = "to ensure that we log cluster state events on DEBUG level") + public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test1 start", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test1 computation", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test1 notification", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test2 start", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + mockAppender.addExpectation( + new MockLogAppender.UnseenEventExpectation( + "test2 failure", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]*" + ) + ); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test2 computation", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test2 notification", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test3 start", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test3 computation", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test3 notification", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test4", + MasterService.class.getCanonicalName(), + Level.DEBUG, + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + ) + ); + + try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) { + clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + timeDiffInMillis += TimeValue.timeValueSeconds(1).millis(); + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {} + + @Override + public void onFailure(String source, Exception e) { + fail(); + } + }); + clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + timeDiffInMillis += TimeValue.timeValueSeconds(2).millis(); + throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + fail(); + } + + @Override + public void onFailure(String source, Exception e) {} + }); + clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + timeDiffInMillis += TimeValue.timeValueSeconds(3).millis(); + return ClusterState.builder(currentState).incrementVersion().build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + timeDiffInMillis += TimeValue.timeValueSeconds(4).millis(); + } + + @Override + public void onFailure(String source, Exception e) { + fail(); + } + }); + clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {} + + @Override + public void onFailure(String source, Exception e) { + fail(); + } + }); + assertBusy(mockAppender::assertAllExpectationsMatched); + // verify stats values after state is published + assertEquals(1, clusterManagerService.getClusterStateStats().getUpdateSuccess()); + assertEquals(0, clusterManagerService.getClusterStateStats().getUpdateFailed()); + } + } + } + public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException { AtomicInteger counter = new AtomicInteger(); class Task { @@ -1073,7 +1238,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test2", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [test2]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); mockAppender.addExpectation( @@ -1081,7 +1246,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test3", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [test3]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); mockAppender.addExpectation( @@ -1089,7 +1254,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test4", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [test4]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" ) ); mockAppender.addExpectation( @@ -1100,14 +1265,6 @@ public void testLongClusterStateUpdateLogging() throws Exception { "*took*test5*" ) ); - mockAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "test6 should log due to slow and failing publication", - MasterService.class.getCanonicalName(), - Level.WARN, - "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [test6]:*" - ) - ); try ( ClusterManagerService clusterManagerService = new ClusterManagerService( @@ -1139,19 +1296,13 @@ public void testLongClusterStateUpdateLogging() throws Exception { Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); } - if (event.source().contains("test6")) { - timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( - Settings.EMPTY - ).millis() + randomLongBetween(1, 1000000); - throw new OpenSearchException("simulated error during slow publication which should trigger logging"); - } clusterStateRef.set(event.state()); publishListener.onResponse(null); }); clusterManagerService.setClusterStateSupplier(clusterStateRef::get); clusterManagerService.start(); - final CountDownLatch latch = new CountDownLatch(6); + final CountDownLatch latch = new CountDownLatch(5); final CountDownLatch processedFirstTask = new CountDownLatch(1); clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override @@ -1249,7 +1400,77 @@ public void onFailure(String source, Exception e) { fail(); } }); + // Additional update task to make sure all previous logging made it to the loggerName + // We don't check logging for this on since there is no guarantee that it will occur before our check clusterManagerService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Exception e) { + fail(); + } + }); + latch.await(); + } + mockAppender.assertAllExpectationsMatched(); + } + } + + @TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log failed cluster state events on WARN level") + public void testLongClusterStateUpdateLoggingForFailedPublication() throws Exception { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test1 should log due to slow and failing publication", + MasterService.class.getCanonicalName(), + Level.WARN, + "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]:*" + ) + ); + + try ( + ClusterManagerService clusterManagerService = new ClusterManagerService( + Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(Node.NODE_NAME_SETTING.getKey(), "test_node") + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool, + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ) + ) { + + final DiscoveryNode localNode = new DiscoveryNode( + "node1", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ); + final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .build(); + final AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); + clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { + timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + Settings.EMPTY + ).millis() + randomLongBetween(1, 1000000); + throw new OpenSearchException("simulated error during slow publication which should trigger logging"); + }); + clusterManagerService.setClusterStateSupplier(clusterStateRef::get); + clusterManagerService.start(); + + final CountDownLatch latch = new CountDownLatch(1); + clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).incrementVersion().build(); @@ -1262,12 +1483,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS @Override public void onFailure(String source, Exception e) { - fail(); // maybe we should notify here? + fail(); } }); // Additional update task to make sure all previous logging made it to the loggerName // We don't check logging for this on since there is no guarantee that it will occur before our check - clusterManagerService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return currentState; diff --git a/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java b/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java index b0916ce9236f7..0ebcb51b557ae 100644 --- a/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java @@ -55,6 +55,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Semaphore; +import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; @@ -78,7 +79,7 @@ static class TestTaskBatcher extends TaskBatcher { } @Override - protected void run(Object batchingKey, List tasks, String tasksSummary) { + protected void run(Object batchingKey, List tasks, Function taskSummaryGenerator) { List updateTasks = (List) tasks; ((TestExecutor) batchingKey).execute(updateTasks.stream().map(t -> t.task).collect(Collectors.toList())); updateTasks.forEach(updateTask -> updateTask.listener.processed(updateTask.source));