Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-4162][flink]Fix set statement is not effective in application mode #4163

Open
wants to merge 2 commits into
base: 1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@
package org.dinky.app.flinksql;

import org.dinky.app.db.DBUtil;
import org.dinky.app.model.StatementParam;
import org.dinky.app.model.SysConfig;
import org.dinky.app.util.FlinkAppUtil;
import org.dinky.assertion.Asserts;
import org.dinky.classloader.DinkyClassLoader;
import org.dinky.config.Dialect;
import org.dinky.constant.CustomerConfigureOptions;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.data.app.AppParamConfig;
import org.dinky.data.app.AppTask;
import org.dinky.data.constant.DirConstant;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.job.JobStatement;
import org.dinky.data.job.SqlType;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.Executor;
import org.dinky.executor.ExecutorConfig;
import org.dinky.executor.ExecutorFactory;
import org.dinky.explainer.Explainer;
import org.dinky.job.JobRunnerFactory;
import org.dinky.job.JobStatementPlan;
import org.dinky.resource.BaseResourceManager;
import org.dinky.trans.Operations;
import org.dinky.trans.dml.ExecuteJarOperation;
Expand All @@ -60,7 +62,6 @@
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;

import java.io.File;
import java.io.IOException;
Expand All @@ -72,7 +73,6 @@
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -141,7 +141,7 @@ public static void submit(AppParamConfig config) throws SQLException {
if (Dialect.FLINK_JAR == appTask.getDialect()) {
jobClient = executeJarJob(appTask.getType(), executor, statements);
} else {
jobClient = executeJob(executor, statements);
jobClient = executeJob(executor, sql);
}
} finally {
log.info("Start Monitor Job");
Expand Down Expand Up @@ -305,85 +305,31 @@ public static Optional<JobClient> executeJarJob(String type, Executor executor,
return jobClient;
}

public static Optional<JobClient> executeJob(Executor executor, String[] statements) {
public static Optional<JobClient> executeJob(Executor executor, String statements) {
Optional<JobClient> jobClient = Optional.empty();

ExecutorConfig executorConfig = executor.getExecutorConfig();
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
List<StatementParam> execute = new ArrayList<>();

for (String item : statements) {
if (item.isEmpty()) {
continue;
}

SqlType operationType = Operations.getOperationType(item);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(item, operationType));
if (!executorConfig.isUseStatementSet()) {
break;
}
} else if (operationType.equals(SqlType.EXECUTE)) {
execute.add(new StatementParam(item, operationType));
if (!executorConfig.isUseStatementSet()) {
break;
}
} else {
ddl.add(new StatementParam(item, operationType));
}
}

for (StatementParam item : ddl) {
log.info("Executing FlinkSQL: {}", item.getValue());
executor.executeSql(item.getValue());
log.info("Execution succeeded.");
}

if (!trans.isEmpty()) {
if (executorConfig.isUseStatementSet()) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : trans) {
if (item.getType().equals(SqlType.INSERT)) {
inserts.add(item.getValue());
}
}
log.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts));
TableResult tableResult = executor.executeStatementSet(inserts);
jobClient = tableResult.getJobClient();
log.info("Execution succeeded.");
} else {
// UseStatementSet defaults to true, where the logic is never executed
StatementParam item = trans.get(0);
log.info("Executing FlinkSQL: {}", item.getValue());
TableResult tableResult = executor.executeSql(item.getValue());
jobClient = tableResult.getJobClient();
JobStatementPlan jobStatementPlan =
Explainer.build(executor).parseStatementsForApplicationMode(SqlUtil.getStatements(statements));
jobStatementPlan.buildFinalExecutableStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(executor);
String currentSql = "";
try {
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
currentSql = jobStatement.getStatement();
log.info("Executing FlinkSQL: {}", currentSql);
Optional<JobClient> optionalJobClient = jobRunnerFactory
.getJobRunner(jobStatement.getStatementType())
.execute(jobStatement);
log.info("Execution succeeded.");
}
}

if (!execute.isEmpty()) {
List<String> executes = new ArrayList<>();
for (StatementParam item : execute) {
executes.add(item.getValue());
executor.executeSql(item.getValue());
if (!executorConfig.isUseStatementSet()) {
if (optionalJobClient.isPresent()) {
jobClient = optionalJobClient;
break;
}
}

log.info(
"The FlinkSQL statement set is being executed: {}",
String.join(FlinkSQLConstant.SEPARATOR, executes));
try {
JobClient client = executor.executeAsync(executorConfig.getJobName());
jobClient = Optional.of(client);
log.info("The execution was successful");
} catch (Exception e) {
log.error("Execution failed, {}", e.getMessage(), e);
}
} catch (Exception e) {
log.error("Execution failed. Current statement: {} \n Error: {}", currentSql, e.getMessage(), e);
}
log.info("{} The task is successfully submitted", LocalDateTime.now());
log.info("The task is successfully submitted.");
return jobClient;
}
}
22 changes: 14 additions & 8 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,23 @@
public class Explainer {

private Executor executor;
private boolean useStatementSet;
private JobManager jobManager;

public Explainer(Executor executor, boolean useStatementSet, JobManager jobManager) {
public Explainer(Executor executor) {
this.executor = executor;
}

public Explainer(Executor executor, JobManager jobManager) {
this.executor = executor;
this.useStatementSet = useStatementSet;
this.jobManager = jobManager;
}

public static Explainer build(JobManager jobManager) {
return new Explainer(jobManager.getExecutor(), true, jobManager);
public static Explainer build(Executor executor) {
return new Explainer(executor);
}

public static Explainer build(Executor executor, boolean useStatementSet, JobManager jobManager) {
return new Explainer(executor, useStatementSet, jobManager);
public static Explainer build(JobManager jobManager) {
return new Explainer(jobManager.getExecutor(), jobManager);
}

public JobStatementPlan parseStatements(String[] statements) {
Expand All @@ -93,6 +95,10 @@ public JobStatementPlan parseStatements(String[] statements) {
return jobStatementPlanWithMock;
}

public JobStatementPlan parseStatementsForApplicationMode(String[] statements) {
return executor.parseStatementIntoJobStatementPlan(statements);
}

private void generateUDFStatement(JobStatementPlan jobStatementPlan) {
List<String> udfStatements = new ArrayList<>();
Optional.ofNullable(jobManager.getConfig().getUdfRefer())
Expand Down Expand Up @@ -185,7 +191,7 @@ public List<LineageRel> getLineage(String statement) {
.type(GatewayType.LOCAL.getLongValue())
.useRemote(false)
.fragment(true)
.statementSet(useStatementSet)
.statementSet(false)
.parallelism(1)
.udfRefer(jobManager.getConfig().getUdfRefer())
.configJson(executor.getTableConfig().getConfiguration().toMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,19 @@
public class LineageBuilder {

public static LineageResult getColumnLineageByLogicalPlan(String statement, JobConfig jobConfig) {
JobManager jobManager = JobManager.buildPlanMode(jobConfig);
Explainer explainer = new Explainer(jobManager.getExecutor(), false, jobManager);
return getColumnLineageByLogicalPlan(statement, explainer);
Explainer explainer = Explainer.build(JobManager.buildPlanMode(jobConfig));
return getColumnLineageByLogicalPlan(explainer.getLineage(statement));
}

public static LineageResult getColumnLineageByLogicalPlan(String statement, ExecutorConfig executorConfig) {
JobManager jobManager = JobManager.buildPlanMode(JobConfig.buildPlanConfig());
Executor executor = ExecutorFactory.buildExecutor(executorConfig, jobManager.getDinkyClassLoader());
jobManager.setExecutor(executor);
Explainer explainer = new Explainer(executor, false, jobManager);
return getColumnLineageByLogicalPlan(statement, explainer);
Explainer explainer = Explainer.build(jobManager);
return getColumnLineageByLogicalPlan(explainer.getLineage(statement));
}

public static LineageResult getColumnLineageByLogicalPlan(String statement, Explainer explainer) {
List<LineageRel> lineageRelList = explainer.getLineage(statement);
public static LineageResult getColumnLineageByLogicalPlan(List<LineageRel> lineageRelList) {
List<LineageRelation> relations = new ArrayList<>();
Map<String, LineageTable> tableMap = new HashMap<>();
int tableIndex = 1;
Expand Down
7 changes: 0 additions & 7 deletions dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ public class JobConfig {
notes = "Flag indicating whether to mock sink function")
private boolean mockSinkFunction;

@ApiModelProperty(
value = "Flag indicating whether to be submission mode",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to be submission mode")
private boolean isSubmissionMode;

@ApiModelProperty(value = "Gateway configuration", dataType = "GatewayConfig", notes = "Gateway configuration")
private GatewayConfig gatewayConfig;

Expand Down
13 changes: 5 additions & 8 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.node.ObjectNode;

Expand Down Expand Up @@ -242,17 +241,14 @@ public boolean close() {

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJarSql(String statement) throws Exception {
List<String> statements = Arrays.stream(SqlUtil.getStatements(statement))
.map(t -> executor.pretreatStatement(t))
.collect(Collectors.toList());
statement = String.join(";\n", statements);
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.buildFinalStatement();
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
try {
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.buildFinalStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
setCurrentSql(jobStatement.getStatement());
jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement);
}
if (job.isFailed()) {
Expand Down Expand Up @@ -284,6 +280,7 @@ public JobResult executeSql(String statement) throws Exception {
jobStatementPlan.buildFinalStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
setCurrentSql(jobStatement.getStatement());
jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement);
}
job.setEndTime(LocalDateTime.now());
Expand Down
5 changes: 5 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
import org.dinky.data.job.JobStatement;
import org.dinky.data.result.SqlExplainResult;

import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.Optional;

public interface JobRunner {

Optional<JobClient> execute(JobStatement jobStatement) throws Exception;

void run(JobStatement jobStatement) throws Exception;

SqlExplainResult explain(JobStatement jobStatement);
Expand Down
13 changes: 13 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.job;

import org.dinky.data.job.JobStatementType;
import org.dinky.executor.Executor;
import org.dinky.job.runner.JobDDLRunner;
import org.dinky.job.runner.JobJarRunner;
import org.dinky.job.runner.JobPipelineRunner;
Expand All @@ -42,6 +43,14 @@ public JobRunnerFactory(JobManager jobManager) {
this.jobJarRunner = new JobJarRunner(jobManager);
}

public JobRunnerFactory(Executor executor) {
this.jobSetRunner = new JobSetRunner(executor);
this.jobSqlRunner = new JobSqlRunner(executor);
this.jobPipelineRunner = new JobPipelineRunner(executor);
this.jobDDLRunner = new JobDDLRunner(executor);
this.jobJarRunner = new JobJarRunner(executor);
}

public JobRunner getJobRunner(JobStatementType jobStatementType) {
switch (jobStatementType) {
case SET:
Expand All @@ -58,6 +67,10 @@ public JobRunner getJobRunner(JobStatementType jobStatementType) {
}
}

public static JobRunnerFactory create(Executor executor) {
return new JobRunnerFactory(executor);
}

public static JobRunnerFactory create(JobManager jobManager) {
return new JobRunnerFactory(jobManager);
}
Expand Down
24 changes: 24 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ public void buildFinalStatement() {
}
}

public void buildFinalExecutableStatement() {
checkStatement();

int executableIndex = -1;
for (int i = 0; i < jobStatementList.size(); i++) {
if (jobStatementList.get(i).getSqlType().isPipeline()) {
executableIndex = i;
}
}
if (executableIndex >= 0) {
jobStatementList.get(executableIndex).asFinalExecutableStatement();
} else {
// If there is no INSERT/CTAS/RTAS/CALL statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the
// final
// statement.
for (int i = 0; i < jobStatementList.size(); i++) {
if (jobStatementList.get(i).getStatementType().equals(JobStatementType.SQL)) {
jobStatementList.get(i).asFinalExecutableStatement();
break;
}
}
}
}

public void checkStatement() {
checkEmptyStatement();
checkPipelineStatement();
Expand Down
Loading
Loading