diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index b37ba9afc7..d05e4f0ba6 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -20,23 +20,25 @@ package org.dinky.app.flinksql; import org.dinky.app.db.DBUtil; -import org.dinky.app.model.StatementParam; import org.dinky.app.model.SysConfig; import org.dinky.app.util.FlinkAppUtil; import org.dinky.assertion.Asserts; import org.dinky.classloader.DinkyClassLoader; import org.dinky.config.Dialect; import org.dinky.constant.CustomerConfigureOptions; -import org.dinky.constant.FlinkSQLConstant; import org.dinky.data.app.AppParamConfig; import org.dinky.data.app.AppTask; import org.dinky.data.constant.DirConstant; import org.dinky.data.enums.GatewayType; +import org.dinky.data.job.JobStatement; import org.dinky.data.job.SqlType; import org.dinky.data.model.SystemConfiguration; import org.dinky.executor.Executor; import org.dinky.executor.ExecutorConfig; import org.dinky.executor.ExecutorFactory; +import org.dinky.explainer.Explainer; +import org.dinky.job.JobRunnerFactory; +import org.dinky.job.JobStatementPlan; import org.dinky.resource.BaseResourceManager; import org.dinky.trans.Operations; import org.dinky.trans.dml.ExecuteJarOperation; @@ -60,7 +62,6 @@ import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.table.api.TableResult; import java.io.File; import java.io.IOException; @@ -72,7 +73,6 @@ import java.nio.charset.Charset; import java.sql.SQLException; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -141,7 +141,7 @@ public static void submit(AppParamConfig config) throws SQLException { if (Dialect.FLINK_JAR == appTask.getDialect()) { jobClient = executeJarJob(appTask.getType(), executor, statements); } else { - jobClient = executeJob(executor, statements); + jobClient = executeJob(executor, sql); } } finally { log.info("Start Monitor Job"); @@ -305,85 +305,31 @@ public static Optional executeJarJob(String type, Executor executor, return jobClient; } - public static Optional executeJob(Executor executor, String[] statements) { + public static Optional executeJob(Executor executor, String statements) { Optional jobClient = Optional.empty(); - ExecutorConfig executorConfig = executor.getExecutorConfig(); - List ddl = new ArrayList<>(); - List trans = new ArrayList<>(); - List execute = new ArrayList<>(); - - for (String item : statements) { - if (item.isEmpty()) { - continue; - } - - SqlType operationType = Operations.getOperationType(item); - if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) { - trans.add(new StatementParam(item, operationType)); - if (!executorConfig.isUseStatementSet()) { - break; - } - } else if (operationType.equals(SqlType.EXECUTE)) { - execute.add(new StatementParam(item, operationType)); - if (!executorConfig.isUseStatementSet()) { - break; - } - } else { - ddl.add(new StatementParam(item, operationType)); - } - } - - for (StatementParam item : ddl) { - log.info("Executing FlinkSQL: {}", item.getValue()); - executor.executeSql(item.getValue()); - log.info("Execution succeeded."); - } - - if (!trans.isEmpty()) { - if (executorConfig.isUseStatementSet()) { - List inserts = new ArrayList<>(); - for (StatementParam item : trans) { - if (item.getType().equals(SqlType.INSERT)) { - inserts.add(item.getValue()); - } - } - log.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts)); - TableResult tableResult = executor.executeStatementSet(inserts); - jobClient = tableResult.getJobClient(); - log.info("Execution succeeded."); - } else { - // UseStatementSet defaults to true, where the logic is never executed - StatementParam item = trans.get(0); - log.info("Executing FlinkSQL: {}", item.getValue()); - TableResult tableResult = executor.executeSql(item.getValue()); - jobClient = tableResult.getJobClient(); + JobStatementPlan jobStatementPlan = + Explainer.build(executor).parseStatementsForApplicationMode(SqlUtil.getStatements(statements)); + jobStatementPlan.buildFinalExecutableStatement(); + JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(executor); + String currentSql = ""; + try { + for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) { + currentSql = jobStatement.getStatement(); + log.info("Executing FlinkSQL: {}", currentSql); + Optional optionalJobClient = jobRunnerFactory + .getJobRunner(jobStatement.getStatementType()) + .execute(jobStatement); log.info("Execution succeeded."); - } - } - - if (!execute.isEmpty()) { - List executes = new ArrayList<>(); - for (StatementParam item : execute) { - executes.add(item.getValue()); - executor.executeSql(item.getValue()); - if (!executorConfig.isUseStatementSet()) { + if (optionalJobClient.isPresent()) { + jobClient = optionalJobClient; break; } } - - log.info( - "The FlinkSQL statement set is being executed: {}", - String.join(FlinkSQLConstant.SEPARATOR, executes)); - try { - JobClient client = executor.executeAsync(executorConfig.getJobName()); - jobClient = Optional.of(client); - log.info("The execution was successful"); - } catch (Exception e) { - log.error("Execution failed, {}", e.getMessage(), e); - } + } catch (Exception e) { + log.error("Execution failed. Current statement: {} \n Error: {}", currentSql, e.getMessage(), e); } - log.info("{} The task is successfully submitted", LocalDateTime.now()); + log.info("The task is successfully submitted."); return jobClient; } } diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 7b6e78583e..7f08810ef6 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -62,21 +62,23 @@ public class Explainer { private Executor executor; - private boolean useStatementSet; private JobManager jobManager; - public Explainer(Executor executor, boolean useStatementSet, JobManager jobManager) { + public Explainer(Executor executor) { + this.executor = executor; + } + + public Explainer(Executor executor, JobManager jobManager) { this.executor = executor; - this.useStatementSet = useStatementSet; this.jobManager = jobManager; } - public static Explainer build(JobManager jobManager) { - return new Explainer(jobManager.getExecutor(), true, jobManager); + public static Explainer build(Executor executor) { + return new Explainer(executor); } - public static Explainer build(Executor executor, boolean useStatementSet, JobManager jobManager) { - return new Explainer(executor, useStatementSet, jobManager); + public static Explainer build(JobManager jobManager) { + return new Explainer(jobManager.getExecutor(), jobManager); } public JobStatementPlan parseStatements(String[] statements) { @@ -93,6 +95,10 @@ public JobStatementPlan parseStatements(String[] statements) { return jobStatementPlanWithMock; } + public JobStatementPlan parseStatementsForApplicationMode(String[] statements) { + return executor.parseStatementIntoJobStatementPlan(statements); + } + private void generateUDFStatement(JobStatementPlan jobStatementPlan) { List udfStatements = new ArrayList<>(); Optional.ofNullable(jobManager.getConfig().getUdfRefer()) @@ -185,7 +191,7 @@ public List getLineage(String statement) { .type(GatewayType.LOCAL.getLongValue()) .useRemote(false) .fragment(true) - .statementSet(useStatementSet) + .statementSet(false) .parallelism(1) .udfRefer(jobManager.getConfig().getUdfRefer()) .configJson(executor.getTableConfig().getConfiguration().toMap()) diff --git a/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java b/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java index 8dcb03e069..85b234df19 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java +++ b/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java @@ -40,21 +40,19 @@ public class LineageBuilder { public static LineageResult getColumnLineageByLogicalPlan(String statement, JobConfig jobConfig) { - JobManager jobManager = JobManager.buildPlanMode(jobConfig); - Explainer explainer = new Explainer(jobManager.getExecutor(), false, jobManager); - return getColumnLineageByLogicalPlan(statement, explainer); + Explainer explainer = Explainer.build(JobManager.buildPlanMode(jobConfig)); + return getColumnLineageByLogicalPlan(explainer.getLineage(statement)); } public static LineageResult getColumnLineageByLogicalPlan(String statement, ExecutorConfig executorConfig) { JobManager jobManager = JobManager.buildPlanMode(JobConfig.buildPlanConfig()); Executor executor = ExecutorFactory.buildExecutor(executorConfig, jobManager.getDinkyClassLoader()); jobManager.setExecutor(executor); - Explainer explainer = new Explainer(executor, false, jobManager); - return getColumnLineageByLogicalPlan(statement, explainer); + Explainer explainer = Explainer.build(jobManager); + return getColumnLineageByLogicalPlan(explainer.getLineage(statement)); } - public static LineageResult getColumnLineageByLogicalPlan(String statement, Explainer explainer) { - List lineageRelList = explainer.getLineage(statement); + public static LineageResult getColumnLineageByLogicalPlan(List lineageRelList) { List relations = new ArrayList<>(); Map tableMap = new HashMap<>(); int tableIndex = 1; diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index e3456a25da..80d41b1596 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -190,13 +190,6 @@ public class JobConfig { notes = "Flag indicating whether to mock sink function") private boolean mockSinkFunction; - @ApiModelProperty( - value = "Flag indicating whether to be submission mode", - dataType = "boolean", - example = "true", - notes = "Flag indicating whether to be submission mode") - private boolean isSubmissionMode; - @ApiModelProperty(value = "Gateway configuration", dataType = "GatewayConfig", notes = "Gateway configuration") private GatewayConfig gatewayConfig; diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 12d9c06e05..a11b258523 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -79,7 +79,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -242,17 +241,14 @@ public boolean close() { @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) public JobResult executeJarSql(String statement) throws Exception { - List statements = Arrays.stream(SqlUtil.getStatements(statement)) - .map(t -> executor.pretreatStatement(t)) - .collect(Collectors.toList()); - statement = String.join(";\n", statements); - jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement)); - jobStatementPlan.buildFinalStatement(); job = Job.build(runMode, config, executorConfig, executor, statement, useGateway); ready(); - JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this); try { + jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement)); + jobStatementPlan.buildFinalStatement(); + JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this); for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) { + setCurrentSql(jobStatement.getStatement()); jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement); } if (job.isFailed()) { @@ -284,6 +280,7 @@ public JobResult executeSql(String statement) throws Exception { jobStatementPlan.buildFinalStatement(); JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this); for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) { + setCurrentSql(jobStatement.getStatement()); jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement); } job.setEndTime(LocalDateTime.now()); diff --git a/dinky-core/src/main/java/org/dinky/job/JobRunner.java b/dinky-core/src/main/java/org/dinky/job/JobRunner.java index 6adf173f9b..0318721cc5 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/JobRunner.java @@ -22,11 +22,16 @@ import org.dinky.data.job.JobStatement; import org.dinky.data.result.SqlExplainResult; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.streaming.api.graph.StreamGraph; +import java.util.Optional; + public interface JobRunner { + Optional execute(JobStatement jobStatement) throws Exception; + void run(JobStatement jobStatement) throws Exception; SqlExplainResult explain(JobStatement jobStatement); diff --git a/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java b/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java index bad0a8c456..0354c2c882 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java +++ b/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java @@ -20,6 +20,7 @@ package org.dinky.job; import org.dinky.data.job.JobStatementType; +import org.dinky.executor.Executor; import org.dinky.job.runner.JobDDLRunner; import org.dinky.job.runner.JobJarRunner; import org.dinky.job.runner.JobPipelineRunner; @@ -42,6 +43,14 @@ public JobRunnerFactory(JobManager jobManager) { this.jobJarRunner = new JobJarRunner(jobManager); } + public JobRunnerFactory(Executor executor) { + this.jobSetRunner = new JobSetRunner(executor); + this.jobSqlRunner = new JobSqlRunner(executor); + this.jobPipelineRunner = new JobPipelineRunner(executor); + this.jobDDLRunner = new JobDDLRunner(executor); + this.jobJarRunner = new JobJarRunner(executor); + } + public JobRunner getJobRunner(JobStatementType jobStatementType) { switch (jobStatementType) { case SET: @@ -58,6 +67,10 @@ public JobRunner getJobRunner(JobStatementType jobStatementType) { } } + public static JobRunnerFactory create(Executor executor) { + return new JobRunnerFactory(executor); + } + public static JobRunnerFactory create(JobManager jobManager) { return new JobRunnerFactory(jobManager); } diff --git a/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java b/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java index 6cd1f6aba5..5d1a4ddb7b 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java +++ b/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java @@ -83,6 +83,30 @@ public void buildFinalStatement() { } } + public void buildFinalExecutableStatement() { + checkStatement(); + + int executableIndex = -1; + for (int i = 0; i < jobStatementList.size(); i++) { + if (jobStatementList.get(i).getSqlType().isPipeline()) { + executableIndex = i; + } + } + if (executableIndex >= 0) { + jobStatementList.get(executableIndex).asFinalExecutableStatement(); + } else { + // If there is no INSERT/CTAS/RTAS/CALL statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the + // final + // statement. + for (int i = 0; i < jobStatementList.size(); i++) { + if (jobStatementList.get(i).getStatementType().equals(JobStatementType.SQL)) { + jobStatementList.get(i).asFinalExecutableStatement(); + break; + } + } + } + } + public void checkStatement() { checkEmptyStatement(); checkPipelineStatement(); diff --git a/dinky-core/src/main/java/org/dinky/job/runner/AbstractJobRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/AbstractJobRunner.java index 1348d18e1a..755596a3c0 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/AbstractJobRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/AbstractJobRunner.java @@ -21,15 +21,18 @@ import org.dinky.data.job.JobStatement; import org.dinky.data.result.SqlExplainResult; +import org.dinky.executor.Executor; import org.dinky.job.JobManager; import org.dinky.job.JobRunner; import org.dinky.utils.LogUtil; import org.dinky.utils.SqlUtil; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.streaming.api.graph.StreamGraph; import java.time.LocalDateTime; +import java.util.Optional; import cn.hutool.core.text.StrFormatter; import lombok.extern.slf4j.Slf4j; @@ -38,6 +41,12 @@ public abstract class AbstractJobRunner implements JobRunner { protected JobManager jobManager; + protected Executor executor; + + public Optional execute(JobStatement jobStatement) throws Exception { + run(jobStatement); + return Optional.empty(); + } public SqlExplainResult explain(JobStatement jobStatement) { SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java index 19eb7b2a5f..2d10deb8d2 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java @@ -26,6 +26,7 @@ import org.dinky.data.model.SystemConfiguration; import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.CustomTableEnvironment; +import org.dinky.executor.Executor; import org.dinky.function.constant.PathConstant; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; @@ -38,6 +39,7 @@ import org.dinky.utils.URLUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.table.catalog.FunctionLanguage; @@ -47,6 +49,7 @@ import java.nio.file.Path; import java.time.LocalDateTime; import java.util.Arrays; +import java.util.Optional; import java.util.Set; import cn.hutool.core.collection.CollUtil; @@ -58,13 +61,32 @@ @Slf4j public class JobDDLRunner extends AbstractJobRunner { + public JobDDLRunner(Executor executor) { + this.executor = executor; + } + public JobDDLRunner(JobManager jobManager) { this.jobManager = jobManager; + this.executor = jobManager.getExecutor(); + } + + @Override + public Optional execute(JobStatement jobStatement) throws Exception { + switch (jobStatement.getSqlType()) { + case ADD: + case ADD_FILE: + break; + case ADD_JAR: + Configuration combinationConfig = getCombinationConfig(); + FileSystem.initialize(combinationConfig, null); + default: + executor.executeSql(jobStatement.getStatement()); + } + return Optional.empty(); } @Override public void run(JobStatement jobStatement) throws Exception { - jobManager.setCurrentSql(jobStatement.getStatement()); switch (jobStatement.getSqlType()) { case ADD: executeAdd(jobStatement.getStatement()); @@ -81,7 +103,7 @@ public void run(JobStatement jobStatement) throws Exception { break; } default: - jobManager.getExecutor().executeSql(jobStatement.getStatement()); + executor.executeSql(jobStatement.getStatement()); } } @@ -103,7 +125,7 @@ public SqlExplainResult explain(JobStatement jobStatement) { case CREATE: if (UDFUtil.isUdfStatement(jobStatement.getStatement())) { executeCreateFunction(jobStatement.getStatement()); - recordResult = jobManager.getExecutor().explainSqlRecord(jobStatement.getStatement()); + recordResult = executor.explainSqlRecord(jobStatement.getStatement()); break; } default: @@ -136,34 +158,34 @@ public SqlExplainResult explain(JobStatement jobStatement) { private void executeAdd(String statement) { Set allFilePath = AddJarSqlParseStrategy.getAllFilePath(statement); allFilePath.forEach(t -> jobManager.getUdfPathContextHolder().addOtherPlugins(t)); - (jobManager.getExecutor().getDinkyClassLoader()) + (executor.getDinkyClassLoader()) .addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getOtherPluginsFiles())); - jobManager.getExecutor().addJar(ArrayUtil.toArray(allFilePath, File.class)); + executor.addJar(ArrayUtil.toArray(allFilePath, File.class)); } private void executeAddFile(String statement) { Set allFilePath = AddFileSqlParseStrategy.getAllFilePath(statement); allFilePath.forEach(t -> jobManager.getUdfPathContextHolder().addFile(t)); - (jobManager.getExecutor().getDinkyClassLoader()) + (executor.getDinkyClassLoader()) .addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getFiles())); - jobManager.getExecutor().addJar(ArrayUtil.toArray(allFilePath, File.class)); + executor.addJar(ArrayUtil.toArray(allFilePath, File.class)); } private void executeAddJar(String statement) { Set allFilePath = AddFileSqlParseStrategy.getAllFilePath(statement); Configuration combinationConfig = getCombinationConfig(); FileSystem.initialize(combinationConfig, null); - jobManager.getExecutor().addJar(ArrayUtil.toArray(allFilePath, File.class)); - jobManager.getExecutor().executeSql(statement); + executor.addJar(ArrayUtil.toArray(allFilePath, File.class)); + executor.executeSql(statement); } private void executeCreateFunction(String udfStatement) { - UDF udf = toUDF(udfStatement, jobManager.getExecutor().getDinkyClassLoader()); + UDF udf = toUDF(udfStatement, executor.getDinkyClassLoader()); if (udf != null) { // 创建文件路径快捷链接 copyUdfFileLinkAndAddToClassloader(udf, udf.getName()); } - jobManager.getExecutor().executeSql(udfStatement); + executor.executeSql(udfStatement); } /** @@ -198,7 +220,7 @@ private void copyUdfFileLinkAndAddToClassloader(UDF udf, String udfName) { } else { jobManager.getUdfPathContextHolder().addUdfPath(udfLinkFile); jobManager.getDinkyClassLoader().addURLs(CollUtil.newArrayList(udfLinkFile)); - jobManager.getExecutor().addJar(udfLinkFile); + executor.addJar(udfLinkFile); } } @@ -219,19 +241,19 @@ private SqlExplainResult explainAddFile(String statement) { } private SqlExplainResult explainAddJar(String statement) { - SqlExplainResult sqlExplainResult = jobManager.getExecutor().explainSqlRecord(statement); + SqlExplainResult sqlExplainResult = executor.explainSqlRecord(statement); executeAddJar(statement); return sqlExplainResult; } private SqlExplainResult explainOtherDDL(String statement) { - SqlExplainResult sqlExplainResult = jobManager.getExecutor().explainSqlRecord(statement); - jobManager.getExecutor().executeSql(statement); + SqlExplainResult sqlExplainResult = executor.explainSqlRecord(statement); + executor.executeSql(statement); return sqlExplainResult; } private Configuration getCombinationConfig() { - CustomTableEnvironment cte = jobManager.getExecutor().getCustomTableEnvironment(); + CustomTableEnvironment cte = executor.getCustomTableEnvironment(); Configuration rootConfig = cte.getRootConfiguration(); Configuration config = cte.getConfig().getConfiguration(); Configuration combinationConfig = new Configuration(); diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java index 8df3fd519d..dc456dd859 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java @@ -27,6 +27,7 @@ import org.dinky.data.model.JarSubmitParam; import org.dinky.data.result.InsertResult; import org.dinky.data.result.SqlExplainResult; +import org.dinky.executor.Executor; import org.dinky.gateway.Gateway; import org.dinky.gateway.config.GatewayConfig; import org.dinky.gateway.result.GatewayResult; @@ -61,6 +62,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import cn.hutool.core.lang.Assert; @@ -71,10 +73,22 @@ public class JobJarRunner extends AbstractJobRunner { private final Configuration configuration; + public JobJarRunner(Executor executor) { + this.executor = executor; + configuration = executor.getCustomTableEnvironment().getConfig().getConfiguration(); + } + public JobJarRunner(JobManager jobManager) { this.jobManager = jobManager; - configuration = - jobManager.getExecutor().getCustomTableEnvironment().getConfig().getConfiguration(); + this.executor = jobManager.getExecutor(); + configuration = executor.getCustomTableEnvironment().getConfig().getConfiguration(); + } + + @Override + public Optional execute(JobStatement jobStatement) throws Exception { + JobClient jobClient = FlinkStreamEnvironmentUtil.executeAsync( + getPipeline(jobStatement), executor.getCustomTableEnvironment()); + return Optional.ofNullable(jobClient); } @Override @@ -175,7 +189,7 @@ private Pipeline getPipeline(JobStatement jobStatement) { private void submitNormal(JobStatement jobStatement) throws Exception { JobClient jobClient = FlinkStreamEnvironmentUtil.executeAsync( - getPipeline(jobStatement), jobManager.getExecutor().getCustomTableEnvironment()); + getPipeline(jobStatement), executor.getCustomTableEnvironment()); if (Asserts.isNotNull(jobClient)) { jobManager.getJob().setJobId(jobClient.getJobID().toHexString()); jobManager @@ -192,7 +206,7 @@ public Pipeline getJarStreamGraph(String statement, DinkyClassLoader dinkyClassL String[] statements = SqlUtil.getStatements(statement); ExecuteJarOperation executeJarOperation = null; for (String sql : statements) { - String sqlStatement = jobManager.getExecutor().pretreatStatement(sql); + String sqlStatement = executor.pretreatStatement(sql); if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) { executeJarOperation = new ExecuteJarOperation(sqlStatement); break; @@ -200,27 +214,27 @@ public Pipeline getJarStreamGraph(String statement, DinkyClassLoader dinkyClassL SqlType operationType = Operations.getOperationType(sqlStatement); if (operationType.equals(SqlType.SET) && SetSqlParseStrategy.INSTANCE.match(sqlStatement)) { CustomSetOperation customSetOperation = new CustomSetOperation(sqlStatement); - customSetOperation.execute(jobManager.getExecutor().getCustomTableEnvironment()); + customSetOperation.execute(executor.getCustomTableEnvironment()); } else if (operationType.equals(SqlType.ADD)) { Set files = AddJarSqlParseStrategy.getAllFilePath(sqlStatement); - files.forEach(jobManager.getExecutor()::addJar); + files.forEach(executor::addJar); files.forEach(jobManager.getUdfPathContextHolder()::addOtherPlugins); } else if (operationType.equals(SqlType.ADD_FILE)) { Set files = AddFileSqlParseStrategy.getAllFilePath(sqlStatement); - files.forEach(jobManager.getExecutor()::addJar); + files.forEach(executor::addJar); files.forEach(jobManager.getUdfPathContextHolder()::addFile); } } Assert.notNull(executeJarOperation, () -> new DinkyException("Not found execute jar operation.")); List urLs = jobManager.getAllFileSet(); - return executeJarOperation.explain(jobManager.getExecutor().getCustomTableEnvironment(), urLs); + return executeJarOperation.explain(executor.getCustomTableEnvironment(), urLs); } public List getUris(String statement) { String[] statements = SqlUtil.getStatements(statement); List uriList = new ArrayList<>(); for (String sql : statements) { - String sqlStatement = jobManager.getExecutor().pretreatStatement(sql); + String sqlStatement = executor.pretreatStatement(sql); if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) { uriList.add(JarSubmitParam.getInfo(statement).getUri()); break; diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java index 15cbb8e689..15e341bc2c 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java @@ -51,6 +51,7 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -60,11 +61,34 @@ public class JobPipelineRunner extends AbstractJobRunner { private List statements; private TableResult tableResult; + public JobPipelineRunner(Executor executor) { + this.executor = executor; + this.statements = new ArrayList<>(); + } + public JobPipelineRunner(JobManager jobManager) { this.jobManager = jobManager; + this.executor = jobManager.getExecutor(); this.statements = new ArrayList<>(); } + @Override + public Optional execute(JobStatement jobStatement) throws Exception { + if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) { + JobJarRunner jobJarRunner = new JobJarRunner(jobManager); + return jobJarRunner.execute(jobStatement); + } + statements.add(jobStatement); + tableResult = executor.executeSql(jobStatement.getStatement()); + if (statements.size() == 1) { + JobClient jobClient = + executor.executeAsync(executor.getExecutorConfig().getJobName()); + return Optional.ofNullable(jobClient); + } + log.error("Only one pipeline job is executed. The statement has be skipped: " + jobStatement.getStatement()); + return Optional.empty(); + } + @Override public void run(JobStatement jobStatement) throws Exception { if (ExecuteJarParseStrategy.INSTANCE.match(jobStatement.getStatement())) { @@ -73,7 +97,7 @@ public void run(JobStatement jobStatement) throws Exception { return; } statements.add(jobStatement); - tableResult = jobManager.getExecutor().executeSql(jobStatement.getStatement()); + tableResult = executor.executeSql(jobStatement.getStatement()); if (statements.size() == 1) { if (jobManager.isUseGateway()) { processWithGateway(); @@ -83,7 +107,6 @@ public void run(JobStatement jobStatement) throws Exception { } else { log.error( "Only one pipeline job is executed. The statement has be skipped: " + jobStatement.getStatement()); - return; } } @@ -96,12 +119,11 @@ public SqlExplainResult explain(JobStatement jobStatement) { SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); statements.add(jobStatement); // pipeline job execute to generate stream graph. - jobManager.getExecutor().executeSql(jobStatement.getStatement()); + executor.executeSql(jobStatement.getStatement()); if (statements.size() == 1) { try { resultBuilder - .explain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON( - jobManager.getExecutor().getStreamGraph())) + .explain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON(executor.getStreamGraph())) .type(jobStatement.getSqlType().getType()) .parseTrue(true) .explainTrue(true) @@ -147,9 +169,9 @@ public StreamGraph getStreamGraph(JobStatement jobStatement) { } statements.add(jobStatement); // pipeline job execute to generate stream graph. - jobManager.getExecutor().executeSql(jobStatement.getStatement()); + executor.executeSql(jobStatement.getStatement()); if (statements.size() == 1) { - return jobManager.getExecutor().getStreamGraph(); + return executor.getStreamGraph(); } else { throw new DinkyException( "Only one pipeline job is explained. The statement has be skipped: " + jobStatement.getStatement()); @@ -164,9 +186,9 @@ public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { } statements.add(jobStatement); // pipeline job execute to generate stream graph. - jobManager.getExecutor().executeSql(jobStatement.getStatement()); + executor.executeSql(jobStatement.getStatement()); if (statements.size() == 1) { - return jobManager.getExecutor().getJobPlanInfo(); + return executor.getJobPlanInfo(); } else { throw new DinkyException( "Only one pipeline job is explained. The statement has be skipped: " + jobStatement.getStatement()); @@ -174,7 +196,6 @@ public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { } private void processWithGateway() throws Exception { - Executor executor = jobManager.getExecutor(); JobConfig config = jobManager.getConfig(); Job job = jobManager.getJob(); config.addGatewayConfig(executor.getSetConfig()); @@ -206,7 +227,6 @@ private void processWithGateway() throws Exception { } private void processWithoutGateway() throws Exception { - Executor executor = jobManager.getExecutor(); JobConfig config = jobManager.getConfig(); Job job = jobManager.getJob(); JobClient jobClient = executor.executeAsync(config.getJobName()); diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobSetRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobSetRunner.java index 03e95f158a..4264e5f299 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobSetRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobSetRunner.java @@ -22,6 +22,7 @@ import org.dinky.data.job.JobStatement; import org.dinky.data.job.SqlType; import org.dinky.data.result.SqlExplainResult; +import org.dinky.executor.Executor; import org.dinky.job.JobManager; import org.dinky.trans.ddl.CustomSetOperation; import org.dinky.utils.LogUtil; @@ -34,15 +35,20 @@ @Slf4j public class JobSetRunner extends AbstractJobRunner { + public JobSetRunner(Executor executor) { + this.executor = executor; + } + public JobSetRunner(JobManager jobManager) { this.jobManager = jobManager; + this.executor = jobManager.getExecutor(); } @Override public void run(JobStatement jobStatement) throws Exception { if (SqlType.SET.equals(jobStatement.getSqlType())) { CustomSetOperation customSetOperation = new CustomSetOperation(jobStatement.getStatement()); - customSetOperation.execute(jobManager.getExecutor().getCustomTableEnvironment()); + customSetOperation.execute(executor.getCustomTableEnvironment()); } else if (SqlType.RESET.equals(jobStatement.getSqlType())) { // todo: reset throw new RuntimeException("Not support reset operation."); @@ -54,8 +60,8 @@ public SqlExplainResult explain(JobStatement jobStatement) { SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); try { CustomSetOperation customSetOperation = new CustomSetOperation(jobStatement.getStatement()); - String explain = customSetOperation.explain(jobManager.getExecutor().getCustomTableEnvironment()); - customSetOperation.execute(jobManager.getExecutor().getCustomTableEnvironment()); + String explain = customSetOperation.explain(executor.getCustomTableEnvironment()); + customSetOperation.execute(executor.getCustomTableEnvironment()); resultBuilder .explain(explain) .parseTrue(true) diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java index d2a6785ee6..af9b864525 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java @@ -42,6 +42,7 @@ import org.dinky.utils.URLUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.messages.JobPlanInfo; @@ -56,6 +57,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -66,11 +68,39 @@ public class JobSqlRunner extends AbstractJobRunner { private List statements; + public JobSqlRunner(Executor executor) { + this.executor = executor; + this.statements = new ArrayList<>(); + } + public JobSqlRunner(JobManager jobManager) { this.jobManager = jobManager; + this.executor = jobManager.getExecutor(); this.statements = new ArrayList<>(); } + @Override + public Optional execute(JobStatement jobStatement) throws Exception { + statements.add(jobStatement); + if (jobStatement.isFinalExecutableStatement()) { + if (inferStatementSet()) { + TableResult tableResult = executor.executeStatements(statements); + return tableResult.getJobClient(); + } else { + FlinkInterceptorResult flinkInterceptorResult = + FlinkInterceptor.build(executor, statements.get(0).getStatement()); + if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) { + return flinkInterceptorResult.getTableResult().getJobClient(); + } else if (!flinkInterceptorResult.isNoExecute()) { + TableResult tableResult = + executor.executeSql(statements.get(0).getStatement()); + return tableResult.getJobClient(); + } + } + } + return Optional.empty(); + } + @Override public void run(JobStatement jobStatement) throws Exception { statements.add(jobStatement); @@ -89,8 +119,7 @@ public SqlExplainResult explain(JobStatement jobStatement) { // show and desc if (!jobStatement.getSqlType().isPipeline()) { try { - resultBuilder = SqlExplainResult.newBuilder( - jobManager.getExecutor().explainSqlRecord(jobStatement.getStatement())); + resultBuilder = SqlExplainResult.newBuilder(executor.explainSqlRecord(jobStatement.getStatement())); resultBuilder.parseTrue(true).explainTrue(true); } catch (Exception e) { String error = LogUtil.getError( @@ -122,8 +151,7 @@ public SqlExplainResult explain(JobStatement jobStatement) { if (!inserts.isEmpty()) { String sqlSet = StringUtils.join(inserts, FlinkSQLConstant.SEPARATOR); try { - resultBuilder = - SqlExplainResult.newBuilder(jobManager.getExecutor().explainStatementSet(statements)); + resultBuilder = SqlExplainResult.newBuilder(executor.explainStatementSet(statements)); } catch (Exception e) { String error = LogUtil.getError( "Exception in explaining FlinkSQL:\n" + SqlUtil.addLineNumber(jobStatement.getStatement()), @@ -149,8 +177,7 @@ public SqlExplainResult explain(JobStatement jobStatement) { return resultBuilder.invalid().build(); } else { try { - resultBuilder = SqlExplainResult.newBuilder( - jobManager.getExecutor().explainSqlRecord(jobStatement.getStatement())); + resultBuilder = SqlExplainResult.newBuilder(executor.explainSqlRecord(jobStatement.getStatement())); resultBuilder.parseTrue(true).explainTrue(true); } catch (Exception e) { String error = LogUtil.getError( @@ -181,7 +208,7 @@ public StreamGraph getStreamGraph(JobStatement jobStatement) { return null; } if (!statements.isEmpty()) { - return jobManager.getExecutor().getStreamGraphFromStatement(statements); + return executor.getStreamGraphFromStatement(statements); } throw new DinkyException("None jobs in statement."); } @@ -193,7 +220,7 @@ public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { return null; } if (!statements.isEmpty()) { - return jobManager.getExecutor().getJobPlanInfoFromStatements(statements); + return executor.getJobPlanInfoFromStatements(statements); } throw new DinkyException("None jobs in statement."); } @@ -226,19 +253,13 @@ private void handleNonStatementSet() throws Exception { } private void processWithGateway() { - List inserts = - statements.stream().map(JobStatement::getStatement).collect(Collectors.toList()); - jobManager.setCurrentSql(StringUtils.join(inserts, FlinkSQLConstant.SEPARATOR)); GatewayResult gatewayResult = submitByGateway(statements); setJobResultFromGatewayResult(gatewayResult); } private void processWithoutGateway() { if (!statements.isEmpty()) { - List inserts = - statements.stream().map(JobStatement::getStatement).collect(Collectors.toList()); - jobManager.setCurrentSql(StringUtils.join(inserts, FlinkSQLConstant.SEPARATOR)); - TableResult tableResult = jobManager.getExecutor().executeStatements(statements); + TableResult tableResult = executor.executeStatements(statements); updateJobWithTableResult(tableResult); } } @@ -246,7 +267,6 @@ private void processWithoutGateway() { private void processSingleInsertWithGateway() { List singleInsert = Collections.singletonList(statements.get(0)); jobManager.getJob().setPipeline(statements.get(0).getSqlType().isPipeline()); - jobManager.setCurrentSql(statements.get(0).getStatement()); GatewayResult gatewayResult = submitByGateway(singleInsert); setJobResultFromGatewayResult(gatewayResult); } @@ -258,17 +278,15 @@ private void processFirstStatement() throws Exception { // Only process the first statement when not using statement set JobStatement item = statements.get(0); jobManager.getJob().setPipeline(item.getSqlType().isPipeline()); - jobManager.setCurrentSql(item.getStatement()); processSingleStatement(item); } private void processSingleStatement(JobStatement item) { - FlinkInterceptorResult flinkInterceptorResult = - FlinkInterceptor.build(jobManager.getExecutor(), item.getStatement()); + FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getStatement()); if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) { updateJobWithTableResult(flinkInterceptorResult.getTableResult(), item.getSqlType()); } else if (!flinkInterceptorResult.isNoExecute()) { - TableResult tableResult = jobManager.getExecutor().executeSql(item.getStatement()); + TableResult tableResult = executor.executeSql(item.getStatement()); updateJobWithTableResult(tableResult, item.getSqlType()); } } @@ -322,7 +340,7 @@ private void updateJobWithTableResult(TableResult tableResult, SqlType sqlType) jobManager.getConfig().getMaxRowNum(), jobManager.getConfig().isUseChangeLog(), jobManager.getConfig().isUseAutoCancel(), - jobManager.getExecutor().getTimeZone(), + executor.getTimeZone(), jobManager.getConfig().isMockSinkFunction()) .getResultWithPersistence(tableResult, jobManager.getHandler()); jobManager.getJob().setResult(result); @@ -332,7 +350,6 @@ private void updateJobWithTableResult(TableResult tableResult, SqlType sqlType) private GatewayResult submitByGateway(List inserts) { JobConfig config = jobManager.getConfig(); GatewayType runMode = jobManager.getRunMode(); - Executor executor = jobManager.getExecutor(); GatewayResult gatewayResult = null;