diff --git a/pom.xml b/pom.xml
index 5ebe8c7..e28abe0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@ under the License.
flink-sql-gateway
com.ververica
flink-sql-gateway
- 0.2-SNAPSHOT
+ 0.3-SNAPSHOT
Flink SQL gateway is a service that allows other applications
@@ -34,7 +34,7 @@ under the License.
jar
- 1.11.1
+ 1.12.0
1.8
1.7.15
1.2.17
@@ -626,36 +626,36 @@ under the License.
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
- 2.17
-
-
- com.puppycrawl.tools
- checkstyle
-
- 8.14
-
-
-
-
- validate
- validate
-
- check
-
-
-
-
- /tools/maven/suppressions.xml
- true
- /tools/maven/checkstyle.xml
- true
- true
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
\ No newline at end of file
+
diff --git a/src/main/java/com/ververica/flink/table/gateway/SqlGateway.java b/src/main/java/com/ververica/flink/table/gateway/SqlGateway.java
index 8246590..f9ee1f5 100644
--- a/src/main/java/com/ververica/flink/table/gateway/SqlGateway.java
+++ b/src/main/java/com/ververica/flink/table/gateway/SqlGateway.java
@@ -23,13 +23,11 @@
import com.ververica.flink.table.gateway.rest.SqlGatewayEndpoint;
import com.ververica.flink.table.gateway.rest.session.SessionManager;
import com.ververica.flink.table.gateway.utils.SqlGatewayException;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.JarUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,12 +149,9 @@ public static void main(String[] args) {
private static void checkFlinkVersion() {
String flinkVersion = EnvironmentInformation.getVersion();
- if (!flinkVersion.startsWith("1.11")) {
- LOG.error("Only Flink-1.11 is supported now!");
- throw new SqlGatewayException("Only Flink-1.11 is supported now!");
- } else if (flinkVersion.startsWith("1.11.0")) {
- LOG.error("Flink-1.11.0 is not supported, please use Flink >= 1.11.1!");
- throw new SqlGatewayException("Flink-1.11.0 is not supported, please use Flink >= 1.11.1!");
+ if (!flinkVersion.startsWith("1.12")) {
+ LOG.error("Only Flink-1.12 is supported now!:)");
+ throw new SqlGatewayException("Only Flink-1.12 is supported now!");
}
}
diff --git a/src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java b/src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java
index 33e7972..75b1a85 100644
--- a/src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java
+++ b/src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java
@@ -19,15 +19,10 @@
package com.ververica.flink.table.gateway.context;
import com.ververica.flink.table.gateway.config.Environment;
-import com.ververica.flink.table.gateway.config.entries.DeploymentEntry;
-import com.ververica.flink.table.gateway.config.entries.ExecutionEntry;
-import com.ververica.flink.table.gateway.config.entries.SinkTableEntry;
-import com.ververica.flink.table.gateway.config.entries.SourceSinkTableEntry;
-import com.ververica.flink.table.gateway.config.entries.SourceTableEntry;
-import com.ververica.flink.table.gateway.config.entries.TemporalTableEntry;
-import com.ververica.flink.table.gateway.config.entries.ViewEntry;
+import com.ververica.flink.table.gateway.config.entries.*;
import com.ververica.flink.table.gateway.utils.SqlExecutionException;
-
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -42,60 +37,33 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.*;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.descriptors.CoreModuleDescriptorValidator;
-import org.apache.flink.table.factories.BatchTableSinkFactory;
-import org.apache.flink.table.factories.BatchTableSourceFactory;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.ComponentFactoryService;
-import org.apache.flink.table.factories.ModuleFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.FunctionService;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.factories.*;
+import org.apache.flink.table.functions.*;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TemporaryClassLoaderContext;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-
import java.lang.reflect.Method;
import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -281,8 +249,7 @@ private static Configuration createExecutionConfig(
availableCommandLines,
activeCommandLine);
- Configuration executionConfig = activeCommandLine.applyCommandLineOptionsToConfiguration(
- commandLine);
+ Configuration executionConfig = activeCommandLine.toConfiguration(commandLine);
try {
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
@@ -327,34 +294,73 @@ private Catalog createCatalog(String name, Map catalogProperties
return factory.createCatalog(name, catalogProperties);
}
- private static TableSource> createTableSource(ExecutionEntry execution, Map sourceProperties,
- ClassLoader classLoader) {
- if (execution.isStreamingPlanner()) {
+ private TableSource> createTableSource(String name, Map sourceProperties) {
+ if (environment.getExecution().isStreamingPlanner()) {
final TableSourceFactory> factory = (TableSourceFactory>)
- TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader);
- return factory.createTableSource(sourceProperties);
- } else if (execution.isBatchPlanner()) {
+ TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader);
+ return factory.createTableSource(new TableSourceFactoryContextImpl(
+ ObjectIdentifier.of(
+ tableEnv.getCurrentCatalog(),
+ tableEnv.getCurrentDatabase(),
+ name),
+ CatalogTableImpl.fromProperties(sourceProperties),
+ tableEnv.getConfig().getConfiguration(), true));
+ } else if (environment.getExecution().isBatchPlanner()) {
final BatchTableSourceFactory> factory = (BatchTableSourceFactory>)
- TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
+ TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
return factory.createBatchTableSource(sourceProperties);
}
throw new SqlExecutionException("Unsupported execution type for sources.");
}
- private static TableSink> createTableSink(ExecutionEntry execution, Map sinkProperties,
- ClassLoader classLoader) {
- if (execution.isStreamingPlanner()) {
+ private TableSink> createTableSink(String name, Map sinkProperties) {
+ if (environment.getExecution().isStreamingPlanner()) {
final TableSinkFactory> factory = (TableSinkFactory>)
- TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
- return factory.createTableSink(sinkProperties);
- } else if (execution.isBatchPlanner()) {
+ TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
+ return factory.createTableSink(new TableSinkFactoryContextImpl(
+ ObjectIdentifier.of(
+ tableEnv.getCurrentCatalog(),
+ tableEnv.getCurrentDatabase(),
+ name),
+ CatalogTableImpl.fromProperties(sinkProperties),
+ tableEnv.getConfig().getConfiguration(),
+ !environment.getExecution().inStreamingMode(), true));
+ } else if (environment.getExecution().isBatchPlanner()) {
final BatchTableSinkFactory> factory = (BatchTableSinkFactory>)
- TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
+ TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
return factory.createBatchTableSink(sinkProperties);
}
throw new SqlExecutionException("Unsupported execution type for sinks.");
}
+// private static TableSource> createTableSource(ExecutionEntry execution, Map sourceProperties,
+// ClassLoader classLoader) {
+// if (execution.isStreamingPlanner()) {
+// final TableSourceFactory> factory = (TableSourceFactory>)
+// TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader);
+// return factory.createTableSource(sourceProperties);
+// } else if (execution.isBatchPlanner()) {
+// final BatchTableSourceFactory> factory = (BatchTableSourceFactory>)
+// TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
+// return factory.createBatchTableSource(sourceProperties);
+// }
+// throw new SqlExecutionException("Unsupported execution type for sources.");
+// }
+//
+// private static TableSink> createTableSink(ExecutionEntry execution, Map sinkProperties,
+// ClassLoader classLoader) {
+// if (execution.isStreamingPlanner()) {
+// final TableSinkFactory> factory = (TableSinkFactory>)
+// TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
+// return factory.createTableSink(sinkProperties);
+// } else if (execution.isBatchPlanner()) {
+// final BatchTableSinkFactory> factory = (BatchTableSinkFactory>)
+// TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
+// return factory.createBatchTableSink(sinkProperties);
+// }
+// throw new SqlExecutionException("Unsupported execution type for sinks.");
+// }
+
private TableEnvironment createStreamTableEnvironment(
StreamExecutionEnvironment env,
EnvironmentSettings settings,
@@ -514,20 +520,21 @@ private void initializeCatalogs() {
//--------------------------------------------------------------------------------------------------------------
// Step.2 create table sources & sinks, and register them.
//--------------------------------------------------------------------------------------------------------------
- Map> tableSources = new HashMap<>();
- Map> tableSinks = new HashMap<>();
+ Map tableSources = new HashMap<>();
+ Map tableSinks = new HashMap<>();
+
environment.getTables().forEach((name, entry) -> {
if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
- tableSources.put(name, createTableSource(environment.getExecution(), entry.asMap(), classLoader));
+ tableSources.put(name, createTableSource(name, entry.asMap()));
}
if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) {
- tableSinks.put(name, createTableSink(environment.getExecution(), entry.asMap(), classLoader));
+ tableSinks.put(name, createTableSink(name, entry.asMap()));
}
});
// register table sources
- tableSources.forEach(tableEnv::registerTableSource);
+ tableSources.forEach(((TableEnvironmentInternal) tableEnv)::registerTableSourceInternal);
// register table sinks
- tableSinks.forEach(tableEnv::registerTableSink);
+ tableSinks.forEach(((TableEnvironmentInternal) tableEnv)::registerTableSinkInternal);
//--------------------------------------------------------------------------------------------------------------
// Step.4 Register temporal tables.
diff --git a/src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java b/src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java
index e691535..3dc50a6 100644
--- a/src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java
+++ b/src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java
@@ -21,12 +21,7 @@
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.PipelineExecutor;
-import org.apache.flink.core.execution.PipelineExecutorFactory;
-import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
-
+import org.apache.flink.core.execution.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +36,7 @@ public class ProgramDeployer {
private final Configuration configuration;
private final Pipeline pipeline;
private final String jobName;
+ private final ClassLoader userCodeClassloader;
/**
* Deploys a table program on the cluster.
@@ -52,10 +48,12 @@ public class ProgramDeployer {
public ProgramDeployer(
Configuration configuration,
String jobName,
- Pipeline pipeline) {
+ Pipeline pipeline,
+ ClassLoader userCodeClassloader) {
this.configuration = configuration;
this.pipeline = pipeline;
this.jobName = jobName;
+ this.userCodeClassloader = userCodeClassloader;
}
public CompletableFuture deploy() {
@@ -78,7 +76,7 @@ public CompletableFuture deploy() {
final PipelineExecutor executor = executorFactory.getExecutor(configuration);
try {
- return executor.execute(pipeline, configuration);
+ return executor.execute(pipeline, configuration, userCodeClassloader);
} catch (Exception e) {
throw new RuntimeException("Could not execute program.", e);
}
diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java b/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java
index 30c94e8..b155b2b 100644
--- a/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java
+++ b/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java
@@ -25,18 +25,12 @@
import com.ververica.flink.table.gateway.rest.result.ResultKind;
import com.ververica.flink.table.gateway.rest.result.ResultSet;
import com.ververica.flink.table.gateway.utils.SqlExecutionException;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.api.*;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.Row;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -90,7 +84,11 @@ public ResultSet execute() {
String type = StringUtils.removeEnd(logicalType.toString(), " NOT NULL");
boolean isNullable = logicalType.isNullable();
String key = fieldToPrimaryKey.getOrDefault(column.getName(), null);
- String computedColumn = column.getExpr().orElse(null);
+
+ String computedColumn = null;
+ if (column instanceof TableColumn.ComputedColumn) {
+ computedColumn = ((TableColumn.ComputedColumn) column).getExpression();
+ }
String watermark = fieldToWatermark.getOrDefault(column.getName(), null);
data.add(Row.of(name, type, isNullable, key, computedColumn, watermark));
diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java b/src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java
index bef60b7..246ebb5 100644
--- a/src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java
+++ b/src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java
@@ -27,7 +27,6 @@
import com.ververica.flink.table.gateway.rest.result.ResultKind;
import com.ververica.flink.table.gateway.rest.result.ResultSet;
import com.ververica.flink.table.gateway.utils.SqlExecutionException;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,7 +37,6 @@
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,7 +153,7 @@ private JobID executeUpdateInternal(ExecutionContext executionContext) {
configuration.set(DeploymentOptions.ATTACHED, false);
// create execution
- final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
+ final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline, executionContext.getClassLoader());
// blocking deployment
try {
diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java b/src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java
index 81addbe..7983999 100644
--- a/src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java
+++ b/src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java
@@ -26,15 +26,9 @@
import com.ververica.flink.table.gateway.rest.result.ConstantNames;
import com.ververica.flink.table.gateway.rest.result.ResultKind;
import com.ververica.flink.table.gateway.rest.result.ResultSet;
-import com.ververica.flink.table.gateway.result.BatchResult;
-import com.ververica.flink.table.gateway.result.ChangelogResult;
-import com.ververica.flink.table.gateway.result.Result;
-import com.ververica.flink.table.gateway.result.ResultDescriptor;
-import com.ververica.flink.table.gateway.result.ResultUtil;
-import com.ververica.flink.table.gateway.result.TypedResult;
+import com.ververica.flink.table.gateway.result.*;
import com.ververica.flink.table.gateway.utils.SqlExecutionException;
import com.ververica.flink.table.gateway.utils.SqlGatewayException;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,20 +39,16 @@
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.Row;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
/**
* Operation for SELECT command.
@@ -211,7 +201,7 @@ private ResultDescriptor executeQueryInternal(ExecutionContext executionC
try {
// writing to a sink requires an optimization step that might reference UDFs during code compilation
executionContext.wrapClassLoader(() -> {
- executionContext.getTableEnvironment().registerTableSink(tableName, result.getTableSink());
+ ((TableEnvironmentInternal) executionContext.getTableEnvironment()).registerTableSinkInternal(tableName, result.getTableSink());
table.insertInto(tableName);
return null;
});
@@ -239,7 +229,7 @@ private ResultDescriptor executeQueryInternal(ExecutionContext executionC
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
// create execution
- final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
+ final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline, executionContext.getClassLoader());
JobClient jobClient;
// blocking deployment
diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java b/src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java
index 207622a..8794c7a 100644
--- a/src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java
+++ b/src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java
@@ -18,34 +18,15 @@
package com.ververica.flink.table.gateway.operation;
-import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
-import org.apache.flink.sql.parser.ddl.SqlAlterTable;
-import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.sql.parser.ddl.SqlCreateView;
-import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
-import org.apache.flink.sql.parser.ddl.SqlDropTable;
-import org.apache.flink.sql.parser.ddl.SqlDropView;
-import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
-import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.*;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.flink.sql.parser.ddl.*;
import org.apache.flink.sql.parser.dml.RichSqlInsert;
-import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
-import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
-import org.apache.flink.sql.parser.dql.SqlShowDatabases;
-import org.apache.flink.sql.parser.dql.SqlShowFunctions;
-import org.apache.flink.sql.parser.dql.SqlShowTables;
+import org.apache.flink.sql.parser.dql.*;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.SqlDrop;
-import org.apache.calcite.sql.SqlExplain;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSetOption;
-import org.apache.calcite.sql.parser.SqlParser;
-
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Objects;
@@ -194,7 +175,7 @@ private static Optional parseStmt(String stmt, boolean isBlinkPl
operands = new String[0];
} else if (node instanceof SqlUseCatalog) {
cmd = SqlCommand.USE_CATALOG;
- operands = new String[] { ((SqlUseCatalog) node).getCatalogName() };
+ operands = new String[] { ((SqlUseCatalog) node).getCatalogName().getSimple() };
} else if (node instanceof SqlUseDatabase) {
cmd = SqlCommand.USE;
operands = new String[] { ((SqlUseDatabase) node).getDatabaseName().toString() };
diff --git a/src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java b/src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java
index 83a7c86..cfeec16 100644
--- a/src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java
+++ b/src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java
@@ -20,7 +20,6 @@
import com.ververica.flink.table.gateway.sink.CollectBatchTableSink;
import com.ververica.flink.table.gateway.utils.SqlExecutionException;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
@@ -70,7 +69,7 @@ public BatchResult(
@Override
public void startRetrieval(JobClient jobClient) {
CompletableFuture.completedFuture(jobClient)
- .thenCompose(client -> client.getJobExecutionResult(classLoader))
+ .thenCompose(client -> client.getJobExecutionResult())
.thenAccept(new ResultRetrievalHandler())
.whenComplete((unused, throwable) -> {
if (throwable != null) {
diff --git a/src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java b/src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java
index 49281fd..c76d05e 100644
--- a/src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java
+++ b/src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java
@@ -21,7 +21,6 @@
import com.ververica.flink.table.gateway.sink.CollectStreamTableSink;
import com.ververica.flink.table.gateway.utils.SqlExecutionException;
import com.ververica.flink.table.gateway.utils.SqlGatewayException;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -103,7 +102,7 @@ public void startRetrieval(JobClient jobClient) {
retrievalThread.start();
jobExecutionResultFuture = CompletableFuture.completedFuture(jobClient)
- .thenCompose(client -> client.getJobExecutionResult(classLoader))
+ .thenCompose(client -> client.getJobExecutionResult())
.whenComplete((unused, throwable) -> {
if (throwable != null) {
executionException.compareAndSet(
diff --git a/src/main/java/com/ververica/flink/table/gateway/sink/CollectBatchTableSink.java b/src/main/java/com/ververica/flink/table/gateway/sink/CollectBatchTableSink.java
index de1f6d4..36152f0 100644
--- a/src/main/java/com/ververica/flink/table/gateway/sink/CollectBatchTableSink.java
+++ b/src/main/java/com/ververica/flink/table/gateway/sink/CollectBatchTableSink.java
@@ -43,7 +43,7 @@ public class CollectBatchTableSink extends OutputFormatTableSink implements
public CollectBatchTableSink(String accumulatorName, TypeSerializer serializer, TableSchema tableSchema) {
this.accumulatorName = accumulatorName;
this.serializer = serializer;
- this.tableSchema = TableSchemaUtils.checkNoGeneratedColumns(tableSchema);
+ this.tableSchema = TableSchemaUtils.checkOnlyPhysicalColumns(tableSchema);
}
/**
diff --git a/src/main/java/com/ververica/flink/table/gateway/sink/CollectStreamTableSink.java b/src/main/java/com/ververica/flink/table/gateway/sink/CollectStreamTableSink.java
index 5922195..0814579 100644
--- a/src/main/java/com/ververica/flink/table/gateway/sink/CollectStreamTableSink.java
+++ b/src/main/java/com/ververica/flink/table/gateway/sink/CollectStreamTableSink.java
@@ -48,7 +48,7 @@ public CollectStreamTableSink(InetAddress targetAddress, int targetPort,
this.targetAddress = targetAddress;
this.targetPort = targetPort;
this.serializer = serializer;
- this.tableSchema = TableSchemaUtils.checkNoGeneratedColumns(tableSchema);
+ this.tableSchema = TableSchemaUtils.checkOnlyPhysicalColumns(tableSchema);
}
@Override
diff --git a/src/test/java/com/ververica/flink/table/gateway/config/DependencyTest.java b/src/test/java/com/ververica/flink/table/gateway/config/DependencyTest.java
index 03e0620..b372eb9 100644
--- a/src/test/java/com/ververica/flink/table/gateway/config/DependencyTest.java
+++ b/src/test/java/com/ververica/flink/table/gateway/config/DependencyTest.java
@@ -29,19 +29,14 @@
import com.ververica.flink.table.gateway.sink.TestTableSinkFactoryBase;
import com.ververica.flink.table.gateway.source.TestTableSourceFactoryBase;
import com.ververica.flink.table.gateway.utils.EnvironmentFileUtil;
-
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -58,20 +53,11 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.types.Row;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
-
import org.junit.Test;
import java.net.URL;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
@@ -110,7 +96,7 @@ public void testTableFactoryDiscovery() throws Exception {
env,
Collections.singletonList(dependency),
new Configuration(),
- new DefaultCLI(new Configuration()),
+ new DefaultCLI(),
new DefaultClusterClientServiceLoader());
SessionManager sessionManager = new SessionManager(defaultContext);
String sessionId = sessionManager.createSession("test", "blink", "streaming", Maps.newConcurrentMap());
diff --git a/src/test/java/com/ververica/flink/table/gateway/config/ExecutionContextTest.java b/src/test/java/com/ververica/flink/table/gateway/config/ExecutionContextTest.java
index f26a4dd..dc91850 100644
--- a/src/test/java/com/ververica/flink/table/gateway/config/ExecutionContextTest.java
+++ b/src/test/java/com/ververica/flink/table/gateway/config/ExecutionContextTest.java
@@ -22,7 +22,7 @@
import com.ververica.flink.table.gateway.context.ExecutionContext;
import com.ververica.flink.table.gateway.source.dummy.DummyTableSourceFactory;
import com.ververica.flink.table.gateway.utils.EnvironmentFileUtil;
-
+import org.apache.commons.cli.Options;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.client.cli.DefaultCLI;
@@ -38,366 +38,356 @@
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.util.StringUtils;
-
-import org.apache.commons.cli.Options;
import org.junit.Ignore;
import org.junit.Test;
import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* Test for {@link ExecutionContext}.
*/
public class ExecutionContextTest {
- private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-gateway-defaults.yaml";
- private static final String MODULES_ENVIRONMENT_FILE = "test-sql-gateway-modules.yaml";
- private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-gateway-catalogs.yaml";
- private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-gateway-streaming.yaml";
- private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-gateway-configuration.yaml";
-
- @Test
- public void testExecutionConfig() throws Exception {
- // test default values defined in ExecutionEntry
- final ExecutionContext> context = createDefaultExecutionContext();
- final ExecutionConfig config = context.getExecutionConfig();
-
- assertEquals(200, config.getAutoWatermarkInterval());
-
- final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy();
- assertTrue(restartConfig instanceof RestartStrategies.FallbackRestartStrategyConfiguration);
-
- // TODO test FailureRateRestartStrategyConfiguration
- // final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy =
- // (RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
- // assertEquals(10, failureRateStrategy.getMaxFailureRate());
- // assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds());
- // assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
- }
-
- @Test
- public void testModules() throws Exception {
- final ExecutionContext> context = createModuleExecutionContext();
- final TableEnvironment tableEnv = context.getTableEnvironment();
-
- Set allModules = new HashSet<>(Arrays.asList(tableEnv.listModules()));
- assertEquals(4, allModules.size());
- assertEquals(
- new HashSet<>(
- Arrays.asList(
- "core",
- "mymodule",
- "myhive",
- "myhive2")
- ),
- allModules
- );
- }
-
- @Test
- public void testCatalogs() throws Exception {
- final String inmemoryCatalog = "inmemorycatalog";
- final String hiveCatalog = "hivecatalog";
- final String hiveDefaultVersionCatalog = "hivedefaultversion";
-
- final ExecutionContext> context = createCatalogExecutionContext();
- final TableEnvironment tableEnv = context.getTableEnvironment();
-
- assertEquals(inmemoryCatalog, tableEnv.getCurrentCatalog());
- assertEquals("mydatabase", tableEnv.getCurrentDatabase());
-
- Catalog catalog = tableEnv.getCatalog(hiveCatalog).orElse(null);
- assertNotNull(catalog);
- assertTrue(catalog instanceof HiveCatalog);
- assertEquals("2.3.4", ((HiveCatalog) catalog).getHiveVersion());
-
- catalog = tableEnv.getCatalog(hiveDefaultVersionCatalog).orElse(null);
- assertNotNull(catalog);
- assertTrue(catalog instanceof HiveCatalog);
- // make sure we have assigned a default hive version
- assertFalse(StringUtils.isNullOrWhitespaceOnly(((HiveCatalog) catalog).getHiveVersion()));
-
- tableEnv.useCatalog(hiveCatalog);
-
- assertEquals(hiveCatalog, tableEnv.getCurrentCatalog());
-
- Set allCatalogs = new HashSet<>(Arrays.asList(tableEnv.listCatalogs()));
- assertEquals(6, allCatalogs.size());
- assertEquals(
- new HashSet<>(
- Arrays.asList(
- "default_catalog",
- inmemoryCatalog,
- hiveCatalog,
- hiveDefaultVersionCatalog,
- "catalog1",
- "catalog2")
- ),
- allCatalogs
- );
- }
-
- @Test
- public void testDatabases() throws Exception {
- final String hiveCatalog = "hivecatalog";
-
- final ExecutionContext> context = createCatalogExecutionContext();
- final TableEnvironment tableEnv = context.getTableEnvironment();
-
- assertEquals(1, tableEnv.listDatabases().length);
- assertEquals("mydatabase", tableEnv.listDatabases()[0]);
-
- tableEnv.useCatalog(hiveCatalog);
-
- assertEquals(2, tableEnv.listDatabases().length);
- assertEquals(
- new HashSet<>(
- Arrays.asList(
- HiveCatalog.DEFAULT_DB,
- DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE)
- ),
- new HashSet<>(Arrays.asList(tableEnv.listDatabases()))
- );
-
- tableEnv.useCatalog(hiveCatalog);
-
- assertEquals(HiveCatalog.DEFAULT_DB, tableEnv.getCurrentDatabase());
-
- tableEnv.useDatabase(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE);
-
- assertEquals(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, tableEnv.getCurrentDatabase());
- }
-
- @Test
- public void testFunctions() throws Exception {
- final ExecutionContext> context = createDefaultExecutionContext();
- final TableEnvironment tableEnv = context.getTableEnvironment();
- final String[] expected = new String[] { "scalarudf", "tableudf", "aggregateudf" };
- final String[] actual = tableEnv.listUserDefinedFunctions();
- Arrays.sort(expected);
- Arrays.sort(actual);
- assertArrayEquals(expected, actual);
- }
-
- @Test
- public void testTables() throws Exception {
- final ExecutionContext> context = createDefaultExecutionContext();
- final TableEnvironment tableEnv = context.getTableEnvironment();
-
- assertArrayEquals(
- new String[] { "TableNumber1", "TableNumber2", "TableSourceSink", "TestView1", "TestView2" },
- tableEnv.listTables());
- }
-
- @Ignore
- @Test
- public void testTemporalTables() throws Exception {
- final ExecutionContext> context = createStreamingExecutionContext();
- final StreamTableEnvironment tableEnv = (StreamTableEnvironment) context.getTableEnvironment();
-
- assertArrayEquals(
- new String[] { "EnrichmentSource", "HistorySource", "HistoryView", "TemporalTableUsage" },
- tableEnv.listTables());
-
- assertArrayEquals(
- new String[] { "sourcetemporaltable", "viewtemporaltable" },
- tableEnv.listUserDefinedFunctions());
-
- assertArrayEquals(
- new String[] { "integerField", "stringField", "rowtimeField", "integerField0", "stringField0", "rowtimeField0" },
- tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames());
- }
-
- @Test
- public void testConfiguration() throws Exception {
- final ExecutionContext> context = createConfigurationExecutionContext();
- final TableEnvironment tableEnv = context.getTableEnvironment();
-
- assertEquals(
- 100,
- tableEnv.getConfig().getConfiguration().getInteger(
- ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT));
- assertTrue(
- tableEnv.getConfig().getConfiguration().getBoolean(
- ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED));
- assertEquals(
- "128kb",
- tableEnv.getConfig().getConfiguration().getString(
- ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE));
-
- assertTrue(
- tableEnv.getConfig().getConfiguration().getBoolean(
- OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED));
-
- // these options are not modified and should be equal to their default value
- assertEquals(
- ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(),
- tableEnv.getConfig().getConfiguration().getBoolean(
- ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
- assertEquals(
- ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(),
- tableEnv.getConfig().getConfiguration().getString(
- ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE));
- assertEquals(
- OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(),
- tableEnv.getConfig().getConfiguration().getLong(
- OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD));
- }
-
- @Test
- public void testInitCatalogs() throws Exception {
- final Map replaceVars = createDefaultReplaceVars();
- Environment env = EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
-
- Map catalogProps = new HashMap<>();
- catalogProps.put("name", "test");
- catalogProps.put("type", "test_cl_catalog");
- env.getCatalogs().clear();
- env.getCatalogs().put("test", CatalogEntry.create(catalogProps));
- Configuration flinkConfig = new Configuration();
- ExecutionContext.builder(env,
- new Environment(),
- Collections.emptyList(),
- flinkConfig,
- new DefaultClusterClientServiceLoader(),
- new Options(),
- Collections.singletonList(new DefaultCLI(flinkConfig))).build();
- }
-
- @SuppressWarnings("unchecked")
- private ExecutionContext createExecutionContext(String file, Map replaceVars)
- throws Exception {
- final Environment env = EnvironmentFileUtil.parseModified(
- file,
- replaceVars);
- final Configuration flinkConfig = new Configuration();
- return (ExecutionContext) ExecutionContext.builder(
- env,
- new Environment(),
- Collections.emptyList(),
- flinkConfig,
- new DefaultClusterClientServiceLoader(),
- new Options(),
- Collections.singletonList(new DefaultCLI(flinkConfig)))
- .build();
- }
-
- private Map createDefaultReplaceVars() {
- Map replaceVars = new HashMap<>();
- replaceVars.put("$VAR_PLANNER", "old");
- replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
- replaceVars.put("$VAR_RESULT_MODE", "changelog");
- replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
- replaceVars.put("$VAR_MAX_ROWS", "100");
- replaceVars.put("$VAR_RESTART_STRATEGY_TYPE", "failure-rate");
- return replaceVars;
- }
-
- private ExecutionContext createDefaultExecutionContext() throws Exception {
- final Map replaceVars = createDefaultReplaceVars();
- return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
- }
-
- private ExecutionContext createModuleExecutionContext() throws Exception {
- final Map replaceVars = new HashMap<>();
- replaceVars.put("$VAR_PLANNER", "old");
- replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
- replaceVars.put("$VAR_RESULT_MODE", "changelog");
- replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
- replaceVars.put("$VAR_MAX_ROWS", "100");
- return createExecutionContext(MODULES_ENVIRONMENT_FILE, replaceVars);
- }
-
- private ExecutionContext createCatalogExecutionContext() throws Exception {
- final Map replaceVars = new HashMap<>();
- replaceVars.put("$VAR_PLANNER", "old");
- replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
- replaceVars.put("$VAR_RESULT_MODE", "changelog");
- replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
- replaceVars.put("$VAR_MAX_ROWS", "100");
- return createExecutionContext(CATALOGS_ENVIRONMENT_FILE, replaceVars);
- }
-
- private ExecutionContext createStreamingExecutionContext() throws Exception {
- final Map replaceVars = new HashMap<>();
- replaceVars.put("$VAR_CONNECTOR_TYPE", DummyTableSourceFactory.CONNECTOR_TYPE_VALUE);
- replaceVars.put("$VAR_CONNECTOR_PROPERTY", DummyTableSourceFactory.TEST_PROPERTY);
- replaceVars.put("$VAR_CONNECTOR_PROPERTY_VALUE", "");
- return createExecutionContext(STREAMING_ENVIRONMENT_FILE, replaceVars);
- }
-
- private ExecutionContext createConfigurationExecutionContext() throws Exception {
- return createExecutionContext(CONFIGURATION_ENVIRONMENT_FILE, new HashMap<>());
- }
-
- // a catalog that requires the thread context class loader to be a user code classloader during construction and opening
- private static class TestClassLoaderCatalog extends GenericInMemoryCatalog {
-
- private static final Class parentFirstCL = FlinkUserCodeClassLoaders
- .parentFirst(
- new URL[0],
- TestClassLoaderCatalog.class.getClassLoader(),
- NOOP_EXCEPTION_HANDLER)
- .getClass();
- private static final Class childFirstCL = FlinkUserCodeClassLoaders
- .childFirst(
- new URL[0],
- TestClassLoaderCatalog.class.getClassLoader(),
- new String[0],
- NOOP_EXCEPTION_HANDLER)
- .getClass();
-
- TestClassLoaderCatalog(String name) {
- super(name);
- verifyUserClassLoader();
- }
-
- @Override
- public void open() {
- verifyUserClassLoader();
- super.open();
- }
-
- private void verifyUserClassLoader() {
- ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
- assertTrue(parentFirstCL.isInstance(contextLoader) || childFirstCL.isInstance(contextLoader));
- }
- }
-
- /**
- * Factory to create TestClassLoaderCatalog.
- */
- public static class TestClassLoaderCatalogFactory implements CatalogFactory {
-
- @Override
- public Catalog createCatalog(String name, Map properties) {
- return new TestClassLoaderCatalog("test_cl");
- }
-
- @Override
- public Map requiredContext() {
- Map context = new HashMap<>();
- context.put("type", "test_cl_catalog");
- return context;
- }
-
- @Override
- public List supportedProperties() {
- return Collections.emptyList();
- }
- }
+ private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-gateway-defaults.yaml";
+ private static final String MODULES_ENVIRONMENT_FILE = "test-sql-gateway-modules.yaml";
+ private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-gateway-catalogs.yaml";
+ private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-gateway-streaming.yaml";
+ private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-gateway-configuration.yaml";
+
+ @Test
+ public void testExecutionConfig() throws Exception {
+ // test default values defined in ExecutionEntry
+ final ExecutionContext> context = createDefaultExecutionContext();
+ final ExecutionConfig config = context.getExecutionConfig();
+
+ assertEquals(200, config.getAutoWatermarkInterval());
+
+ final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy();
+ assertTrue(restartConfig instanceof RestartStrategies.FallbackRestartStrategyConfiguration);
+
+ // TODO test FailureRateRestartStrategyConfiguration
+ // final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy =
+ // (RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
+ // assertEquals(10, failureRateStrategy.getMaxFailureRate());
+ // assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds());
+ // assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
+ }
+
+ @Test
+ public void testModules() throws Exception {
+ final ExecutionContext> context = createModuleExecutionContext();
+ final TableEnvironment tableEnv = context.getTableEnvironment();
+
+ Set allModules = new HashSet<>(Arrays.asList(tableEnv.listModules()));
+ assertEquals(4, allModules.size());
+ assertEquals(
+ new HashSet<>(
+ Arrays.asList(
+ "core",
+ "mymodule",
+ "myhive",
+ "myhive2")
+ ),
+ allModules
+ );
+ }
+
+ @Test
+ public void testCatalogs() throws Exception {
+ final String inmemoryCatalog = "inmemorycatalog";
+ final String hiveCatalog = "hivecatalog";
+ final String hiveDefaultVersionCatalog = "hivedefaultversion";
+
+ final ExecutionContext> context = createCatalogExecutionContext();
+ final TableEnvironment tableEnv = context.getTableEnvironment();
+
+ assertEquals(inmemoryCatalog, tableEnv.getCurrentCatalog());
+ assertEquals("mydatabase", tableEnv.getCurrentDatabase());
+
+ Catalog catalog = tableEnv.getCatalog(hiveCatalog).orElse(null);
+ assertNotNull(catalog);
+ assertTrue(catalog instanceof HiveCatalog);
+ assertEquals("2.3.4", ((HiveCatalog) catalog).getHiveVersion());
+
+ catalog = tableEnv.getCatalog(hiveDefaultVersionCatalog).orElse(null);
+ assertNotNull(catalog);
+ assertTrue(catalog instanceof HiveCatalog);
+ // make sure we have assigned a default hive version
+ assertFalse(StringUtils.isNullOrWhitespaceOnly(((HiveCatalog) catalog).getHiveVersion()));
+
+ tableEnv.useCatalog(hiveCatalog);
+
+ assertEquals(hiveCatalog, tableEnv.getCurrentCatalog());
+
+ Set allCatalogs = new HashSet<>(Arrays.asList(tableEnv.listCatalogs()));
+ assertEquals(6, allCatalogs.size());
+ assertEquals(
+ new HashSet<>(
+ Arrays.asList(
+ "default_catalog",
+ inmemoryCatalog,
+ hiveCatalog,
+ hiveDefaultVersionCatalog,
+ "catalog1",
+ "catalog2")
+ ),
+ allCatalogs
+ );
+ }
+
+ @Test
+ public void testDatabases() throws Exception {
+ final String hiveCatalog = "hivecatalog";
+
+ final ExecutionContext> context = createCatalogExecutionContext();
+ final TableEnvironment tableEnv = context.getTableEnvironment();
+
+ assertEquals(1, tableEnv.listDatabases().length);
+ assertEquals("mydatabase", tableEnv.listDatabases()[0]);
+
+ tableEnv.useCatalog(hiveCatalog);
+
+ assertEquals(2, tableEnv.listDatabases().length);
+ assertEquals(
+ new HashSet<>(
+ Arrays.asList(
+ HiveCatalog.DEFAULT_DB,
+ DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE)
+ ),
+ new HashSet<>(Arrays.asList(tableEnv.listDatabases()))
+ );
+
+ tableEnv.useCatalog(hiveCatalog);
+
+ assertEquals(HiveCatalog.DEFAULT_DB, tableEnv.getCurrentDatabase());
+
+ tableEnv.useDatabase(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE);
+
+ assertEquals(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, tableEnv.getCurrentDatabase());
+ }
+
+ @Test
+ public void testFunctions() throws Exception {
+ final ExecutionContext> context = createDefaultExecutionContext();
+ final TableEnvironment tableEnv = context.getTableEnvironment();
+ final String[] expected = new String[]{"scalarudf", "tableudf", "aggregateudf"};
+ final String[] actual = tableEnv.listUserDefinedFunctions();
+ Arrays.sort(expected);
+ Arrays.sort(actual);
+ assertArrayEquals(expected, actual);
+ }
+
+ @Test
+ public void testTables() throws Exception {
+ final ExecutionContext> context = createDefaultExecutionContext();
+ final TableEnvironment tableEnv = context.getTableEnvironment();
+
+ assertArrayEquals(
+ new String[]{"TableNumber1", "TableNumber2", "TableSourceSink", "TestView1", "TestView2"},
+ tableEnv.listTables());
+ }
+
+ @Ignore
+ @Test
+ public void testTemporalTables() throws Exception {
+ final ExecutionContext> context = createStreamingExecutionContext();
+ final StreamTableEnvironment tableEnv = (StreamTableEnvironment) context.getTableEnvironment();
+
+ assertArrayEquals(
+ new String[]{"EnrichmentSource", "HistorySource", "HistoryView", "TemporalTableUsage"},
+ tableEnv.listTables());
+
+ assertArrayEquals(
+ new String[]{"sourcetemporaltable", "viewtemporaltable"},
+ tableEnv.listUserDefinedFunctions());
+
+ assertArrayEquals(
+ new String[]{"integerField", "stringField", "rowtimeField", "integerField0", "stringField0", "rowtimeField0"},
+ tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames());
+ }
+
+ @Test
+ public void testConfiguration() throws Exception {
+ final ExecutionContext> context = createConfigurationExecutionContext();
+ final TableEnvironment tableEnv = context.getTableEnvironment();
+
+ assertEquals(
+ 100,
+ tableEnv.getConfig().getConfiguration().getInteger(
+ ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT));
+ assertTrue(
+ tableEnv.getConfig().getConfiguration().getBoolean(
+ ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED));
+ assertEquals(
+ "128kb",
+ tableEnv.getConfig().getConfiguration().getString(
+ ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE));
+
+ assertTrue(
+ tableEnv.getConfig().getConfiguration().getBoolean(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED));
+
+ // these options are not modified and should be equal to their default value
+ assertEquals(
+ ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(),
+ tableEnv.getConfig().getConfiguration().getBoolean(
+ ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
+ assertEquals(
+ ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(),
+ tableEnv.getConfig().getConfiguration().getString(
+ ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE));
+ assertEquals(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(),
+ tableEnv.getConfig().getConfiguration().getLong(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD));
+ }
+
+ @Test
+ public void testInitCatalogs() throws Exception {
+ final Map replaceVars = createDefaultReplaceVars();
+ Environment env = EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
+
+ Map catalogProps = new HashMap<>();
+ catalogProps.put("name", "test");
+ catalogProps.put("type", "test_cl_catalog");
+ env.getCatalogs().clear();
+ env.getCatalogs().put("test", CatalogEntry.create(catalogProps));
+ Configuration flinkConfig = new Configuration();
+ ExecutionContext.builder(env,
+ new Environment(),
+ Collections.emptyList(),
+ flinkConfig,
+ new DefaultClusterClientServiceLoader(),
+ new Options(),
+ Collections.singletonList(new DefaultCLI())).build();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ExecutionContext createExecutionContext(String file, Map replaceVars)
+ throws Exception {
+ final Environment env = EnvironmentFileUtil.parseModified(
+ file,
+ replaceVars);
+ final Configuration flinkConfig = new Configuration();
+ return (ExecutionContext) ExecutionContext.builder(
+ env,
+ new Environment(),
+ Collections.emptyList(),
+ flinkConfig,
+ new DefaultClusterClientServiceLoader(),
+ new Options(),
+ Collections.singletonList(new DefaultCLI()))
+ .build();
+ }
+
+ private Map createDefaultReplaceVars() {
+ Map replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", "old");
+ replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+ replaceVars.put("$VAR_RESULT_MODE", "changelog");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+ replaceVars.put("$VAR_RESTART_STRATEGY_TYPE", "failure-rate");
+ return replaceVars;
+ }
+
+ private ExecutionContext createDefaultExecutionContext() throws Exception {
+ final Map replaceVars = createDefaultReplaceVars();
+ return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
+ }
+
+ private ExecutionContext createModuleExecutionContext() throws Exception {
+ final Map replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", "old");
+ replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+ replaceVars.put("$VAR_RESULT_MODE", "changelog");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+ return createExecutionContext(MODULES_ENVIRONMENT_FILE, replaceVars);
+ }
+
+ private ExecutionContext createCatalogExecutionContext() throws Exception {
+ final Map replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", "old");
+ replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+ replaceVars.put("$VAR_RESULT_MODE", "changelog");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+ return createExecutionContext(CATALOGS_ENVIRONMENT_FILE, replaceVars);
+ }
+
+ private ExecutionContext createStreamingExecutionContext() throws Exception {
+ final Map replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_CONNECTOR_TYPE", DummyTableSourceFactory.CONNECTOR_TYPE_VALUE);
+ replaceVars.put("$VAR_CONNECTOR_PROPERTY", DummyTableSourceFactory.TEST_PROPERTY);
+ replaceVars.put("$VAR_CONNECTOR_PROPERTY_VALUE", "");
+ return createExecutionContext(STREAMING_ENVIRONMENT_FILE, replaceVars);
+ }
+
+ private ExecutionContext createConfigurationExecutionContext() throws Exception {
+ return createExecutionContext(CONFIGURATION_ENVIRONMENT_FILE, new HashMap<>());
+ }
+
+ // a catalog that requires the thread context class loader to be a user code classloader during construction and opening
+ private static class TestClassLoaderCatalog extends GenericInMemoryCatalog {
+
+ private static final Class parentFirstCL = FlinkUserCodeClassLoaders
+ .parentFirst(
+ new URL[0],
+ TestClassLoaderCatalog.class.getClassLoader(),
+ NOOP_EXCEPTION_HANDLER,
+ true)
+ .getClass();
+ private static final Class childFirstCL = FlinkUserCodeClassLoaders
+ .childFirst(
+ new URL[0],
+ TestClassLoaderCatalog.class.getClassLoader(),
+ new String[0],
+ NOOP_EXCEPTION_HANDLER,
+ true)
+ .getClass();
+
+ TestClassLoaderCatalog(String name) {
+ super(name);
+ verifyUserClassLoader();
+ }
+
+ @Override
+ public void open() {
+ verifyUserClassLoader();
+ super.open();
+ }
+
+ private void verifyUserClassLoader() {
+ ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
+ assertTrue(parentFirstCL.isInstance(contextLoader) || childFirstCL.isInstance(contextLoader));
+ }
+ }
+
+ /**
+ * Factory to create TestClassLoaderCatalog.
+ */
+ public static class TestClassLoaderCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String name, Map properties) {
+ return new TestClassLoaderCatalog("test_cl");
+ }
+
+ @Override
+ public Map requiredContext() {
+ Map context = new HashMap<>();
+ context.put("type", "test_cl_catalog");
+ return context;
+ }
+
+ @Override
+ public List supportedProperties() {
+ return Collections.emptyList();
+ }
+ }
}
diff --git a/src/test/java/com/ververica/flink/table/gateway/operation/OperationTestBase.java b/src/test/java/com/ververica/flink/table/gateway/operation/OperationTestBase.java
index e78d81f..f2776c6 100644
--- a/src/test/java/com/ververica/flink/table/gateway/operation/OperationTestBase.java
+++ b/src/test/java/com/ververica/flink/table/gateway/operation/OperationTestBase.java
@@ -23,11 +23,9 @@
import com.ververica.flink.table.gateway.context.DefaultContext;
import com.ververica.flink.table.gateway.context.SessionContext;
import com.ververica.flink.table.gateway.rest.session.SessionID;
-
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
-
import org.junit.Before;
import java.util.Collections;
@@ -55,7 +53,7 @@ protected DefaultContext getDefaultContext() {
new Environment(),
Collections.emptyList(),
new Configuration(),
- new DefaultCLI(new Configuration()),
+ new DefaultCLI(),
new DefaultClusterClientServiceLoader());
}
diff --git a/src/test/java/com/ververica/flink/table/gateway/operation/OperationWithUserJarTest.java b/src/test/java/com/ververica/flink/table/gateway/operation/OperationWithUserJarTest.java
index 5b10e71..b1a9b61 100644
--- a/src/test/java/com/ververica/flink/table/gateway/operation/OperationWithUserJarTest.java
+++ b/src/test/java/com/ververica/flink/table/gateway/operation/OperationWithUserJarTest.java
@@ -26,14 +26,12 @@
import com.ververica.flink.table.gateway.rest.result.ResultKind;
import com.ververica.flink.table.gateway.rest.result.ResultSet;
import com.ververica.flink.table.gateway.utils.ResourceFileUtils;
-
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.types.Row;
-
import org.junit.Test;
import java.io.File;
@@ -43,9 +41,7 @@
import java.util.Collections;
import java.util.List;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* Test for operations when user jars are provided.
@@ -62,7 +58,7 @@ protected DefaultContext getDefaultContext() {
new Environment(),
Collections.singletonList(jarUrl),
new Configuration(),
- new DefaultCLI(new Configuration()),
+ new DefaultCLI(),
new DefaultClusterClientServiceLoader());
}
diff --git a/src/test/java/com/ververica/flink/table/gateway/sink/TestTableSinkFactoryBase.java b/src/test/java/com/ververica/flink/table/gateway/sink/TestTableSinkFactoryBase.java
index 0fc7359..82422f8 100644
--- a/src/test/java/com/ververica/flink/table/gateway/sink/TestTableSinkFactoryBase.java
+++ b/src/test/java/com/ververica/flink/table/gateway/sink/TestTableSinkFactoryBase.java
@@ -38,18 +38,9 @@
import java.util.Map;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.*;
+import static org.apache.flink.table.descriptors.Rowtime.*;
+import static org.apache.flink.table.descriptors.Schema.*;
import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
@@ -113,7 +104,7 @@ public static class TestTableSink implements TableSink, AppendStreamTableSi
private final String property;
public TestTableSink(TableSchema schema, String property) {
- this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);
+ this.schema = TableSchemaUtils.checkOnlyPhysicalColumns(schema);
this.property = property;
}
diff --git a/src/test/java/com/ververica/flink/table/gateway/source/TestTableSourceFactoryBase.java b/src/test/java/com/ververica/flink/table/gateway/source/TestTableSourceFactoryBase.java
index 64bc7cd..02ec8f6 100644
--- a/src/test/java/com/ververica/flink/table/gateway/source/TestTableSourceFactoryBase.java
+++ b/src/test/java/com/ververica/flink/table/gateway/source/TestTableSourceFactoryBase.java
@@ -33,25 +33,12 @@
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.*;
+import static org.apache.flink.table.descriptors.Rowtime.*;
+import static org.apache.flink.table.descriptors.Schema.*;
import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
@@ -122,7 +109,7 @@ public static class TestTableSource implements StreamTableSource, DefinedRo
private final List rowtime;
public TestTableSource(TableSchema schema, String property, String proctime, List rowtime) {
- this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);
+ this.schema = TableSchemaUtils.checkOnlyPhysicalColumns(schema);
this.property = property;
this.proctime = proctime;
this.rowtime = rowtime;