Skip to content

Commit

Permalink
Add logic in master service to optimize performance and retain detail…
Browse files Browse the repository at this point in the history
…ed logging for critical cluster operations.

Signed-off-by: Sumit Bansal <[email protected]>
  • Loading branch information
sumitasr committed Oct 22, 2024
1 parent ebcf5e3 commit d8d10ef
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,15 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);
final String longSummary;
final String shortSummary;
if (taskInputs.updateTasks.size() <= 10000) {
longSummary = taskInputs.taskSummaryGenerator.apply(true);
shortSummary = longSummary;
} else {
longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
shortSummary = taskInputs.taskSummaryGenerator.apply(false);
}

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,37 @@ void runIfNotProcessed(BatchedTask updateTask) {
if (toExecute.isEmpty() == false) {
Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
if (longSummaryRequired == null || !longSummaryRequired) {
return buildShortSummary(updateTask.batchingKey, toExecute.size());
final List<BatchedTask> sampleTasks = toExecute.stream()
.limit(Math.min(10, toExecute.size()))
.collect(Collectors.toList());
return buildShortSummary(updateTask.batchingKey, toExecute.size(), getSummary(updateTask, sampleTasks));
}
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
return getSummary(updateTask, toExecute);
};
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
}
}
}

private String buildShortSummary(final Object batchingKey, final int taskCount) {
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
private String getSummary(final BatchedTask updateTask, final List<BatchedTask> toExecute) {
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}

private String buildShortSummary(final Object batchingKey, final int taskCount, final String sampleTasks) {
return "Tasks batched with key: "
+ batchingKey.toString().split("\\$")[0]
+ ", count:"
+ taskCount
+ " and sample tasks: "
+ sampleTasks;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,15 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception {
"test1 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [1s] to compute cluster state update for [test1]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test1 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [0s] to notify listeners on unchanged cluster state for [test1]"
)
);

Expand All @@ -416,23 +416,23 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception {
"test2 failure",
MasterService.class.getCanonicalName(),
Level.TRACE,
"failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]*"
"failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test2 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [2s] to compute cluster state update for [test2]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test2 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [0s] to notify listeners on unchanged cluster state for [test2]"
)
);

Expand All @@ -449,15 +449,15 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception {
"test3 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [3s] to compute cluster state update for [test3]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test3 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]"
)
);

Expand Down Expand Up @@ -548,23 +548,23 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception {
"test1 start",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"executing cluster state update for [test1]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test1 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [1s] to compute cluster state update for [test1]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test1 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [0s] to notify listeners on unchanged cluster state for [test1]"
)
);

Expand All @@ -573,31 +573,31 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception {
"test2 start",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"executing cluster state update for [test2]"
)
);
mockAppender.addExpectation(
new MockLogAppender.UnseenEventExpectation(
"test2 failure",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]*"
"failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test2 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [2s] to compute cluster state update for [test2]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test2 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [0s] to notify listeners on unchanged cluster state for [test2]"
)
);

Expand All @@ -606,23 +606,23 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception {
"test3 start",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"executing cluster state update for [test3]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test3 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [3s] to compute cluster state update for [test3]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test3 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]"
)
);

Expand All @@ -631,7 +631,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception {
"test4",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"executing cluster state update for [test4]"
)
);

Expand Down Expand Up @@ -1238,23 +1238,23 @@ public void testLongClusterStateUpdateLogging() throws Exception {
"test2",
MasterService.class.getCanonicalName(),
Level.WARN,
"*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"*took [*], which is over [10s], to compute cluster state update for [test2]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test3",
MasterService.class.getCanonicalName(),
Level.WARN,
"*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"*took [*], which is over [10s], to compute cluster state update for [test3]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test4",
MasterService.class.getCanonicalName(),
Level.WARN,
"*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]"
"*took [*], which is over [10s], to compute cluster state update for [test4]"
)
);
mockAppender.addExpectation(
Expand Down Expand Up @@ -1432,7 +1432,7 @@ public void testLongClusterStateUpdateLoggingForFailedPublication() throws Excep
"test1 should log due to slow and failing publication",
MasterService.class.getCanonicalName(),
Level.WARN,
"took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]:*"
"took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [test1]:*"
)
);

Expand Down

0 comments on commit d8d10ef

Please sign in to comment.