diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 713de8cdd0fda..455e7301a490d 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -299,33 +299,37 @@ public static boolean assertNotMasterUpdateThread(String reason) { } private void runTasks(TaskInputs taskInputs) { - final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : ""; - final String shortSummary = taskInputs.taskSummaryGenerator.apply(false); + final String summary; + if (logger.isTraceEnabled()) { + summary = taskInputs.taskSummaryGenerator.apply(true); + } else { + summary = taskInputs.taskSummaryGenerator.apply(false); + } if (!lifecycle.started()) { - logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary); + logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary); return; } if (logger.isTraceEnabled()) { - logger.trace("executing cluster state update for [{}]", longSummary); + logger.trace("executing cluster state update for [{}]", summary); } else { - logger.debug("executing cluster state update for [{}]", shortSummary); + logger.debug("executing cluster state update for [{}]", summary); } final ClusterState previousClusterState = state(); if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) { - logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary); + logger.debug("failing [{}]: local node is no longer cluster-manager", summary); taskInputs.onNoLongerClusterManager(); return; } final long computationStartTime = threadPool.preciseRelativeTimeInNanos(); - final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary); + final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary); taskOutputs.notifyFailedTasks(); final TimeValue computationTime = getTimeSince(computationStartTime); - logExecutionTime(computationTime, "compute cluster state update", shortSummary); + logExecutionTime(computationTime, "compute cluster state update", summary); clusterManagerMetrics.recordLatency( clusterManagerMetrics.clusterStateComputeHistogram, @@ -337,17 +341,17 @@ private void runTasks(TaskInputs taskInputs) { final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); final TimeValue executionTime = getTimeSince(notificationStartTime); - logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary); + logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); } else { final ClusterState newClusterState = taskOutputs.newClusterState; if (logger.isTraceEnabled()) { - logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState); + logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); } else { - logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary); + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); } final long publicationStartTime = threadPool.preciseRelativeTimeInNanos(); try { - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState); + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { @@ -355,7 +359,7 @@ private void runTasks(TaskInputs taskInputs) { if (nodesDeltaSummary.length() > 0) { logger.info( "{}, term: {}, version: {}, delta: {}", - shortSummary, + summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary @@ -366,7 +370,7 @@ private void runTasks(TaskInputs taskInputs) { logger.debug("publishing cluster state version [{}]", newClusterState.version()); publish(clusterChangedEvent, taskOutputs, publicationStartTime); } catch (Exception e) { - handleException(shortSummary, publicationStartTime, newClusterState, e); + handleException(summary, publicationStartTime, newClusterState, e); } } } diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java index 3513bfffb7157..ac54693b8ad1e 100644 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java @@ -195,16 +195,12 @@ void runIfNotProcessed(BatchedTask updateTask) { if (toExecute.isEmpty() == false) { Function taskSummaryGenerator = (longSummaryRequired) -> { if (longSummaryRequired == null || !longSummaryRequired) { - return buildShortSummary(updateTask.batchingKey, toExecute.size()); + final List sampleTasks = toExecute.stream() + .limit(Math.min(1000, toExecute.size())) + .collect(Collectors.toList()); + return buildShortSummary(updateTask.batchingKey, toExecute.size(), getSummary(updateTask, sampleTasks)); } - final Map> processTasksBySource = new HashMap<>(); - for (final BatchedTask task : toExecute) { - processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); - } - return processTasksBySource.entrySet().stream().map(entry -> { - String tasks = updateTask.describeTasks(entry.getValue()); - return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; - }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); + return getSummary(updateTask, toExecute); }; taskBatcherListener.onBeginProcessing(toExecute); run(updateTask.batchingKey, toExecute, taskSummaryGenerator); @@ -212,8 +208,24 @@ void runIfNotProcessed(BatchedTask updateTask) { } } - private String buildShortSummary(final Object batchingKey, final int taskCount) { - return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount; + private String getSummary(final BatchedTask updateTask, final List toExecute) { + final Map> processTasksBySource = new HashMap<>(); + for (final BatchedTask task : toExecute) { + processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); + } + return processTasksBySource.entrySet().stream().map(entry -> { + String tasks = updateTask.describeTasks(entry.getValue()); + return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; + }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); + } + + private String buildShortSummary(final Object batchingKey, final int taskCount, final String sampleTasks) { + return "Tasks batched with key: " + + batchingKey.toString().split("\\$")[0] + + ", count:" + + taskCount + + " and sample tasks: " + + sampleTasks; } /** diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index db9abe0310e40..bb9e34d93431f 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -391,7 +391,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test1 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [1s] to compute cluster state update for [test1]" ) ); mockAppender.addExpectation( @@ -399,7 +399,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test1 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [0s] to notify listeners on unchanged cluster state for [test1]" ) ); @@ -416,7 +416,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test2 failure", MasterService.class.getCanonicalName(), Level.TRACE, - "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]*" + "failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*" ) ); mockAppender.addExpectation( @@ -424,7 +424,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test2 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [2s] to compute cluster state update for [test2]" ) ); mockAppender.addExpectation( @@ -432,7 +432,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test2 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [0s] to notify listeners on unchanged cluster state for [test2]" ) ); @@ -449,7 +449,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test3 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [3s] to compute cluster state update for [test3]" ) ); mockAppender.addExpectation( @@ -457,7 +457,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test3 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]" ) ); @@ -548,7 +548,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test1 start", MasterService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" ) ); mockAppender.addExpectation( @@ -556,7 +556,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test1 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" ) ); mockAppender.addExpectation( @@ -564,7 +564,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test1 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" ) ); @@ -573,7 +573,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test2 start", MasterService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( @@ -581,7 +581,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test2 failure", MasterService.class.getCanonicalName(), Level.DEBUG, - "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]*" + "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]*" ) ); mockAppender.addExpectation( @@ -589,7 +589,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test2 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( @@ -597,7 +597,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test2 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" ) ); @@ -606,7 +606,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test3 start", MasterService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( @@ -614,7 +614,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test3 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( @@ -622,7 +622,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test3 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" ) ); @@ -631,7 +631,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test4", MasterService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test4]" ) ); @@ -1238,7 +1238,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test2", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( @@ -1246,7 +1246,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test3", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( @@ -1254,7 +1254,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test4", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test4]" ) ); mockAppender.addExpectation( @@ -1432,7 +1432,7 @@ public void testLongClusterStateUpdateLoggingForFailedPublication() throws Excep "test1 should log due to slow and failing publication", MasterService.class.getCanonicalName(), Level.WARN, - "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]:*" + "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]:*" ) );