From 74e7874e40bc29a58fc74f9523f2dfbe446e624a Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 31 Mar 2024 13:10:13 +0800 Subject: [PATCH 1/2] Optimize the logic of getSeaTunnelServer() --- .../seatunnel/engine/server/SeaTunnelServer.java | 14 ++++++++++++-- .../server/rest/RestHttpGetCommandProcessor.java | 6 +++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index e9dcdca779f..d74fe3cba93 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.core.classloader.DefaultClassLoaderService; import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.rest.RestHttpGetCommandProcessor; import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService; import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService; import org.apache.seatunnel.engine.server.service.slot.SlotService; @@ -251,16 +252,25 @@ public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) { return taskState != null && ((ExecutionState) taskState).isEndState(); } + public static void main(String[] args) { + // + } + + + + + + public boolean isMasterNode() { // must retry until the cluster have master node try { - return RetryUtils.retryWithException( + return Boolean.TRUE.equals(RetryUtils.retryWithException( () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()), new RetryUtils.RetryMaterial( Constant.OPERATION_RETRY_TIME, true, exception -> exception instanceof NullPointerException && isRunning, - Constant.OPERATION_RETRY_SLEEP)); + Constant.OPERATION_RETRY_SLEEP))); } catch (InterruptedException e) { LOGGER.info("master node check interrupted"); return false; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 5c79838bf0b..33e6627854d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -377,6 +377,7 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) { SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true); String jobMetrics; JobStatus jobStatus; + ClassLoaderService classLoaderService; if (seaTunnelServer == null) { jobMetrics = (String) @@ -390,15 +391,14 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) { getNode().nodeEngine, new GetJobStatusOperation(jobId)) .join()]; - seaTunnelServer = getSeaTunnelServer(false); + classLoaderService = getSeaTunnelServer(false).getClassLoaderService(); } else { jobMetrics = seaTunnelServer.getCoordinatorService().getJobMetrics(jobId).toJsonString(); jobStatus = seaTunnelServer.getCoordinatorService().getJobStatus(jobId); + classLoaderService = seaTunnelServer.getClassLoaderService(); } - - ClassLoaderService classLoaderService = seaTunnelServer.getClassLoaderService(); ClassLoader classLoader = classLoaderService.getClassLoader( jobId, jobImmutableInformation.getPluginJarsUrls()); From f26afe66f6c3ad16580c9189a5063c753d141ad3 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 31 Mar 2024 13:10:59 +0800 Subject: [PATCH 2/2] Optimize the logic of getSeaTunnelServer() --- .../engine/server/SeaTunnelServer.java | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index d74fe3cba93..4e256d2491f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -25,7 +25,6 @@ import org.apache.seatunnel.engine.core.classloader.DefaultClassLoaderService; import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; -import org.apache.seatunnel.engine.server.rest.RestHttpGetCommandProcessor; import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService; import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService; import org.apache.seatunnel.engine.server.service.slot.SlotService; @@ -252,25 +251,18 @@ public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) { return taskState != null && ((ExecutionState) taskState).isEndState(); } - public static void main(String[] args) { - // - } - - - - - - public boolean isMasterNode() { // must retry until the cluster have master node try { - return Boolean.TRUE.equals(RetryUtils.retryWithException( - () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()), - new RetryUtils.RetryMaterial( - Constant.OPERATION_RETRY_TIME, - true, - exception -> exception instanceof NullPointerException && isRunning, - Constant.OPERATION_RETRY_SLEEP))); + return Boolean.TRUE.equals( + RetryUtils.retryWithException( + () -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()), + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> + exception instanceof NullPointerException && isRunning, + Constant.OPERATION_RETRY_SLEEP))); } catch (InterruptedException e) { LOGGER.info("master node check interrupted"); return false;