From 2011f9f6eaccc110b2e7a062e05e84fe2a411b76 Mon Sep 17 00:00:00 2001 From: wangzhiqiang Date: Sat, 19 Dec 2020 09:42:09 +0800 Subject: [PATCH 1/3] Update to flink 1.12.0 --- pom.xml | 60 +- .../flink/table/gateway/SqlGateway.java | 11 +- .../gateway/context/ExecutionContext.java | 133 ++-- .../gateway/deployment/ProgramDeployer.java | 14 +- .../operation/DescribeTableOperation.java | 16 +- .../gateway/operation/InsertOperation.java | 4 +- .../gateway/operation/SelectOperation.java | 20 +- .../gateway/operation/SqlCommandParser.java | 31 +- .../table/gateway/result/BatchResult.java | 3 +- .../table/gateway/result/ChangelogResult.java | 3 +- .../gateway/sink/CollectBatchTableSink.java | 2 +- .../gateway/sink/CollectStreamTableSink.java | 2 +- .../table/gateway/config/DependencyTest.java | 22 +- .../gateway/config/ExecutionContextTest.java | 692 +++++++++--------- .../gateway/operation/OperationTestBase.java | 4 +- .../operation/OperationWithUserJarTest.java | 8 +- .../sink/TestTableSinkFactoryBase.java | 17 +- .../source/TestTableSourceFactoryBase.java | 23 +- 18 files changed, 489 insertions(+), 576 deletions(-) diff --git a/pom.xml b/pom.xml index 5ebe8c7..557fe55 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ under the License. jar - 1.11.1 + 1.12.0 1.8 1.7.15 1.2.17 @@ -626,35 +626,35 @@ under the License. - - org.apache.maven.plugins - maven-checkstyle-plugin - 2.17 - - - com.puppycrawl.tools - checkstyle - - 8.14 - - - - - validate - validate - - check - - - - - /tools/maven/suppressions.xml - true - /tools/maven/checkstyle.xml - true - true - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/java/com/ververica/flink/table/gateway/SqlGateway.java b/src/main/java/com/ververica/flink/table/gateway/SqlGateway.java index 8246590..f9ee1f5 100644 --- a/src/main/java/com/ververica/flink/table/gateway/SqlGateway.java +++ b/src/main/java/com/ververica/flink/table/gateway/SqlGateway.java @@ -23,13 +23,11 @@ import com.ververica.flink.table.gateway.rest.SqlGatewayEndpoint; import com.ververica.flink.table.gateway.rest.session.SessionManager; import com.ververica.flink.table.gateway.utils.SqlGatewayException; - import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.JarUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,12 +149,9 @@ public static void main(String[] args) { private static void checkFlinkVersion() { String flinkVersion = EnvironmentInformation.getVersion(); - if (!flinkVersion.startsWith("1.11")) { - LOG.error("Only Flink-1.11 is supported now!"); - throw new SqlGatewayException("Only Flink-1.11 is supported now!"); - } else if (flinkVersion.startsWith("1.11.0")) { - LOG.error("Flink-1.11.0 is not supported, please use Flink >= 1.11.1!"); - throw new SqlGatewayException("Flink-1.11.0 is not supported, please use Flink >= 1.11.1!"); + if (!flinkVersion.startsWith("1.12")) { + LOG.error("Only Flink-1.12 is supported now!:)"); + throw new SqlGatewayException("Only Flink-1.12 is supported now!"); } } diff --git a/src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java b/src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java index 33e7972..75b1a85 100644 --- a/src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java +++ b/src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java @@ -19,15 +19,10 @@ package com.ververica.flink.table.gateway.context; import com.ververica.flink.table.gateway.config.Environment; -import com.ververica.flink.table.gateway.config.entries.DeploymentEntry; -import com.ververica.flink.table.gateway.config.entries.ExecutionEntry; -import com.ververica.flink.table.gateway.config.entries.SinkTableEntry; -import com.ververica.flink.table.gateway.config.entries.SourceSinkTableEntry; -import com.ververica.flink.table.gateway.config.entries.SourceTableEntry; -import com.ververica.flink.table.gateway.config.entries.TemporalTableEntry; -import com.ververica.flink.table.gateway.config.entries.ViewEntry; +import com.ververica.flink.table.gateway.config.entries.*; import com.ververica.flink.table.gateway.utils.SqlExecutionException; - +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.java.ExecutionEnvironment; @@ -42,60 +37,33 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.catalog.*; import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.descriptors.CoreModuleDescriptorValidator; -import org.apache.flink.table.factories.BatchTableSinkFactory; -import org.apache.flink.table.factories.BatchTableSourceFactory; -import org.apache.flink.table.factories.CatalogFactory; -import org.apache.flink.table.factories.ComponentFactoryService; -import org.apache.flink.table.factories.ModuleFactory; -import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.factories.TableSinkFactory; -import org.apache.flink.table.factories.TableSourceFactory; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.FunctionDefinition; -import org.apache.flink.table.functions.FunctionService; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.factories.*; +import org.apache.flink.table.functions.*; import org.apache.flink.table.module.Module; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TemporaryClassLoaderContext; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.lang.reflect.Method; import java.net.URL; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -281,8 +249,7 @@ private static Configuration createExecutionConfig( availableCommandLines, activeCommandLine); - Configuration executionConfig = activeCommandLine.applyCommandLineOptionsToConfiguration( - commandLine); + Configuration executionConfig = activeCommandLine.toConfiguration(commandLine); try { final ProgramOptions programOptions = ProgramOptions.create(commandLine); @@ -327,34 +294,73 @@ private Catalog createCatalog(String name, Map catalogProperties return factory.createCatalog(name, catalogProperties); } - private static TableSource createTableSource(ExecutionEntry execution, Map sourceProperties, - ClassLoader classLoader) { - if (execution.isStreamingPlanner()) { + private TableSource createTableSource(String name, Map sourceProperties) { + if (environment.getExecution().isStreamingPlanner()) { final TableSourceFactory factory = (TableSourceFactory) - TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader); - return factory.createTableSource(sourceProperties); - } else if (execution.isBatchPlanner()) { + TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader); + return factory.createTableSource(new TableSourceFactoryContextImpl( + ObjectIdentifier.of( + tableEnv.getCurrentCatalog(), + tableEnv.getCurrentDatabase(), + name), + CatalogTableImpl.fromProperties(sourceProperties), + tableEnv.getConfig().getConfiguration(), true)); + } else if (environment.getExecution().isBatchPlanner()) { final BatchTableSourceFactory factory = (BatchTableSourceFactory) - TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader); + TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader); return factory.createBatchTableSource(sourceProperties); } throw new SqlExecutionException("Unsupported execution type for sources."); } - private static TableSink createTableSink(ExecutionEntry execution, Map sinkProperties, - ClassLoader classLoader) { - if (execution.isStreamingPlanner()) { + private TableSink createTableSink(String name, Map sinkProperties) { + if (environment.getExecution().isStreamingPlanner()) { final TableSinkFactory factory = (TableSinkFactory) - TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader); - return factory.createTableSink(sinkProperties); - } else if (execution.isBatchPlanner()) { + TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader); + return factory.createTableSink(new TableSinkFactoryContextImpl( + ObjectIdentifier.of( + tableEnv.getCurrentCatalog(), + tableEnv.getCurrentDatabase(), + name), + CatalogTableImpl.fromProperties(sinkProperties), + tableEnv.getConfig().getConfiguration(), + !environment.getExecution().inStreamingMode(), true)); + } else if (environment.getExecution().isBatchPlanner()) { final BatchTableSinkFactory factory = (BatchTableSinkFactory) - TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader); + TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader); return factory.createBatchTableSink(sinkProperties); } throw new SqlExecutionException("Unsupported execution type for sinks."); } +// private static TableSource createTableSource(ExecutionEntry execution, Map sourceProperties, +// ClassLoader classLoader) { +// if (execution.isStreamingPlanner()) { +// final TableSourceFactory factory = (TableSourceFactory) +// TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader); +// return factory.createTableSource(sourceProperties); +// } else if (execution.isBatchPlanner()) { +// final BatchTableSourceFactory factory = (BatchTableSourceFactory) +// TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader); +// return factory.createBatchTableSource(sourceProperties); +// } +// throw new SqlExecutionException("Unsupported execution type for sources."); +// } +// +// private static TableSink createTableSink(ExecutionEntry execution, Map sinkProperties, +// ClassLoader classLoader) { +// if (execution.isStreamingPlanner()) { +// final TableSinkFactory factory = (TableSinkFactory) +// TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader); +// return factory.createTableSink(sinkProperties); +// } else if (execution.isBatchPlanner()) { +// final BatchTableSinkFactory factory = (BatchTableSinkFactory) +// TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader); +// return factory.createBatchTableSink(sinkProperties); +// } +// throw new SqlExecutionException("Unsupported execution type for sinks."); +// } + private TableEnvironment createStreamTableEnvironment( StreamExecutionEnvironment env, EnvironmentSettings settings, @@ -514,20 +520,21 @@ private void initializeCatalogs() { //-------------------------------------------------------------------------------------------------------------- // Step.2 create table sources & sinks, and register them. //-------------------------------------------------------------------------------------------------------------- - Map> tableSources = new HashMap<>(); - Map> tableSinks = new HashMap<>(); + Map tableSources = new HashMap<>(); + Map tableSinks = new HashMap<>(); + environment.getTables().forEach((name, entry) -> { if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) { - tableSources.put(name, createTableSource(environment.getExecution(), entry.asMap(), classLoader)); + tableSources.put(name, createTableSource(name, entry.asMap())); } if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) { - tableSinks.put(name, createTableSink(environment.getExecution(), entry.asMap(), classLoader)); + tableSinks.put(name, createTableSink(name, entry.asMap())); } }); // register table sources - tableSources.forEach(tableEnv::registerTableSource); + tableSources.forEach(((TableEnvironmentInternal) tableEnv)::registerTableSourceInternal); // register table sinks - tableSinks.forEach(tableEnv::registerTableSink); + tableSinks.forEach(((TableEnvironmentInternal) tableEnv)::registerTableSinkInternal); //-------------------------------------------------------------------------------------------------------------- // Step.4 Register temporal tables. diff --git a/src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java b/src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java index e691535..3dc50a6 100644 --- a/src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java +++ b/src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java @@ -21,12 +21,7 @@ import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; -import org.apache.flink.core.execution.DefaultExecutorServiceLoader; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.PipelineExecutor; -import org.apache.flink.core.execution.PipelineExecutorFactory; -import org.apache.flink.core.execution.PipelineExecutorServiceLoader; - +import org.apache.flink.core.execution.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +36,7 @@ public class ProgramDeployer { private final Configuration configuration; private final Pipeline pipeline; private final String jobName; + private final ClassLoader userCodeClassloader; /** * Deploys a table program on the cluster. @@ -52,10 +48,12 @@ public class ProgramDeployer { public ProgramDeployer( Configuration configuration, String jobName, - Pipeline pipeline) { + Pipeline pipeline, + ClassLoader userCodeClassloader) { this.configuration = configuration; this.pipeline = pipeline; this.jobName = jobName; + this.userCodeClassloader = userCodeClassloader; } public CompletableFuture deploy() { @@ -78,7 +76,7 @@ public CompletableFuture deploy() { final PipelineExecutor executor = executorFactory.getExecutor(configuration); try { - return executor.execute(pipeline, configuration); + return executor.execute(pipeline, configuration, userCodeClassloader); } catch (Exception e) { throw new RuntimeException("Could not execute program.", e); } diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java b/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java index 30c94e8..39c6763 100644 --- a/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java +++ b/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java @@ -25,18 +25,12 @@ import com.ververica.flink.table.gateway.rest.result.ResultKind; import com.ververica.flink.table.gateway.rest.result.ResultSet; import com.ververica.flink.table.gateway.utils.SqlExecutionException; - -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.WatermarkSpec; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.api.*; import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.Row; -import org.apache.commons.lang3.StringUtils; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -90,7 +84,11 @@ public ResultSet execute() { String type = StringUtils.removeEnd(logicalType.toString(), " NOT NULL"); boolean isNullable = logicalType.isNullable(); String key = fieldToPrimaryKey.getOrDefault(column.getName(), null); - String computedColumn = column.getExpr().orElse(null); + + /** + * todo + */ + String computedColumn = ((TableColumn.ComputedColumn)column).getExpression(); String watermark = fieldToWatermark.getOrDefault(column.getName(), null); data.add(Row.of(name, type, isNullable, key, computedColumn, watermark)); diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java b/src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java index bef60b7..246ebb5 100644 --- a/src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java +++ b/src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java @@ -27,7 +27,6 @@ import com.ververica.flink.table.gateway.rest.result.ResultKind; import com.ververica.flink.table.gateway.rest.result.ResultSet; import com.ververica.flink.table.gateway.utils.SqlExecutionException; - import org.apache.flink.api.common.JobID; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.java.tuple.Tuple2; @@ -38,7 +37,6 @@ import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.Row; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,7 +153,7 @@ private JobID executeUpdateInternal(ExecutionContext executionContext) { configuration.set(DeploymentOptions.ATTACHED, false); // create execution - final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline); + final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline, executionContext.getClassLoader()); // blocking deployment try { diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java b/src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java index 81addbe..7983999 100644 --- a/src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java +++ b/src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java @@ -26,15 +26,9 @@ import com.ververica.flink.table.gateway.rest.result.ConstantNames; import com.ververica.flink.table.gateway.rest.result.ResultKind; import com.ververica.flink.table.gateway.rest.result.ResultSet; -import com.ververica.flink.table.gateway.result.BatchResult; -import com.ververica.flink.table.gateway.result.ChangelogResult; -import com.ververica.flink.table.gateway.result.Result; -import com.ververica.flink.table.gateway.result.ResultDescriptor; -import com.ververica.flink.table.gateway.result.ResultUtil; -import com.ververica.flink.table.gateway.result.TypedResult; +import com.ververica.flink.table.gateway.result.*; import com.ververica.flink.table.gateway.utils.SqlExecutionException; import com.ververica.flink.table.gateway.utils.SqlGatewayException; - import org.apache.flink.api.common.JobID; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.java.tuple.Tuple2; @@ -45,20 +39,16 @@ import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.types.Row; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.UUID; +import java.util.*; /** * Operation for SELECT command. @@ -211,7 +201,7 @@ private ResultDescriptor executeQueryInternal(ExecutionContext executionC try { // writing to a sink requires an optimization step that might reference UDFs during code compilation executionContext.wrapClassLoader(() -> { - executionContext.getTableEnvironment().registerTableSink(tableName, result.getTableSink()); + ((TableEnvironmentInternal) executionContext.getTableEnvironment()).registerTableSinkInternal(tableName, result.getTableSink()); table.insertInto(tableName); return null; }); @@ -239,7 +229,7 @@ private ResultDescriptor executeQueryInternal(ExecutionContext executionC configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true); // create execution - final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline); + final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline, executionContext.getClassLoader()); JobClient jobClient; // blocking deployment diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java b/src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java index 207622a..8794c7a 100644 --- a/src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java +++ b/src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java @@ -18,34 +18,15 @@ package com.ververica.flink.table.gateway.operation; -import org.apache.flink.sql.parser.ddl.SqlAlterDatabase; -import org.apache.flink.sql.parser.ddl.SqlAlterTable; -import org.apache.flink.sql.parser.ddl.SqlCreateDatabase; -import org.apache.flink.sql.parser.ddl.SqlCreateTable; -import org.apache.flink.sql.parser.ddl.SqlCreateView; -import org.apache.flink.sql.parser.ddl.SqlDropDatabase; -import org.apache.flink.sql.parser.ddl.SqlDropTable; -import org.apache.flink.sql.parser.ddl.SqlDropView; -import org.apache.flink.sql.parser.ddl.SqlUseCatalog; -import org.apache.flink.sql.parser.ddl.SqlUseDatabase; +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.*; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.flink.sql.parser.ddl.*; import org.apache.flink.sql.parser.dml.RichSqlInsert; -import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; -import org.apache.flink.sql.parser.dql.SqlShowCatalogs; -import org.apache.flink.sql.parser.dql.SqlShowDatabases; -import org.apache.flink.sql.parser.dql.SqlShowFunctions; -import org.apache.flink.sql.parser.dql.SqlShowTables; +import org.apache.flink.sql.parser.dql.*; import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; import org.apache.flink.sql.parser.validate.FlinkSqlConformance; -import org.apache.calcite.config.Lex; -import org.apache.calcite.sql.SqlDrop; -import org.apache.calcite.sql.SqlExplain; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlSetOption; -import org.apache.calcite.sql.parser.SqlParser; - import java.lang.reflect.Field; import java.util.Arrays; import java.util.Objects; @@ -194,7 +175,7 @@ private static Optional parseStmt(String stmt, boolean isBlinkPl operands = new String[0]; } else if (node instanceof SqlUseCatalog) { cmd = SqlCommand.USE_CATALOG; - operands = new String[] { ((SqlUseCatalog) node).getCatalogName() }; + operands = new String[] { ((SqlUseCatalog) node).getCatalogName().getSimple() }; } else if (node instanceof SqlUseDatabase) { cmd = SqlCommand.USE; operands = new String[] { ((SqlUseDatabase) node).getDatabaseName().toString() }; diff --git a/src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java b/src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java index 83a7c86..cfeec16 100644 --- a/src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java +++ b/src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java @@ -20,7 +20,6 @@ import com.ververica.flink.table.gateway.sink.CollectBatchTableSink; import com.ververica.flink.table.gateway.utils.SqlExecutionException; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; @@ -70,7 +69,7 @@ public BatchResult( @Override public void startRetrieval(JobClient jobClient) { CompletableFuture.completedFuture(jobClient) - .thenCompose(client -> client.getJobExecutionResult(classLoader)) + .thenCompose(client -> client.getJobExecutionResult()) .thenAccept(new ResultRetrievalHandler()) .whenComplete((unused, throwable) -> { if (throwable != null) { diff --git a/src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java b/src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java index 49281fd..c76d05e 100644 --- a/src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java +++ b/src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java @@ -21,7 +21,6 @@ import com.ververica.flink.table.gateway.sink.CollectStreamTableSink; import com.ververica.flink.table.gateway.utils.SqlExecutionException; import com.ververica.flink.table.gateway.utils.SqlGatewayException; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -103,7 +102,7 @@ public void startRetrieval(JobClient jobClient) { retrievalThread.start(); jobExecutionResultFuture = CompletableFuture.completedFuture(jobClient) - .thenCompose(client -> client.getJobExecutionResult(classLoader)) + .thenCompose(client -> client.getJobExecutionResult()) .whenComplete((unused, throwable) -> { if (throwable != null) { executionException.compareAndSet( diff --git a/src/main/java/com/ververica/flink/table/gateway/sink/CollectBatchTableSink.java b/src/main/java/com/ververica/flink/table/gateway/sink/CollectBatchTableSink.java index de1f6d4..36152f0 100644 --- a/src/main/java/com/ververica/flink/table/gateway/sink/CollectBatchTableSink.java +++ b/src/main/java/com/ververica/flink/table/gateway/sink/CollectBatchTableSink.java @@ -43,7 +43,7 @@ public class CollectBatchTableSink extends OutputFormatTableSink implements public CollectBatchTableSink(String accumulatorName, TypeSerializer serializer, TableSchema tableSchema) { this.accumulatorName = accumulatorName; this.serializer = serializer; - this.tableSchema = TableSchemaUtils.checkNoGeneratedColumns(tableSchema); + this.tableSchema = TableSchemaUtils.checkOnlyPhysicalColumns(tableSchema); } /** diff --git a/src/main/java/com/ververica/flink/table/gateway/sink/CollectStreamTableSink.java b/src/main/java/com/ververica/flink/table/gateway/sink/CollectStreamTableSink.java index 5922195..0814579 100644 --- a/src/main/java/com/ververica/flink/table/gateway/sink/CollectStreamTableSink.java +++ b/src/main/java/com/ververica/flink/table/gateway/sink/CollectStreamTableSink.java @@ -48,7 +48,7 @@ public CollectStreamTableSink(InetAddress targetAddress, int targetPort, this.targetAddress = targetAddress; this.targetPort = targetPort; this.serializer = serializer; - this.tableSchema = TableSchemaUtils.checkNoGeneratedColumns(tableSchema); + this.tableSchema = TableSchemaUtils.checkOnlyPhysicalColumns(tableSchema); } @Override diff --git a/src/test/java/com/ververica/flink/table/gateway/config/DependencyTest.java b/src/test/java/com/ververica/flink/table/gateway/config/DependencyTest.java index 03e0620..b372eb9 100644 --- a/src/test/java/com/ververica/flink/table/gateway/config/DependencyTest.java +++ b/src/test/java/com/ververica/flink/table/gateway/config/DependencyTest.java @@ -29,19 +29,14 @@ import com.ververica.flink.table.gateway.sink.TestTableSinkFactoryBase; import com.ververica.flink.table.gateway.source.TestTableSourceFactoryBase; import com.ververica.flink.table.gateway.utils.EnvironmentFileUtil; - import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.*; import org.apache.flink.table.catalog.config.CatalogConfig; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; @@ -58,20 +53,11 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.types.Row; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; - import org.junit.Test; import java.net.URL; import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; @@ -110,7 +96,7 @@ public void testTableFactoryDiscovery() throws Exception { env, Collections.singletonList(dependency), new Configuration(), - new DefaultCLI(new Configuration()), + new DefaultCLI(), new DefaultClusterClientServiceLoader()); SessionManager sessionManager = new SessionManager(defaultContext); String sessionId = sessionManager.createSession("test", "blink", "streaming", Maps.newConcurrentMap()); diff --git a/src/test/java/com/ververica/flink/table/gateway/config/ExecutionContextTest.java b/src/test/java/com/ververica/flink/table/gateway/config/ExecutionContextTest.java index f26a4dd..dc91850 100644 --- a/src/test/java/com/ververica/flink/table/gateway/config/ExecutionContextTest.java +++ b/src/test/java/com/ververica/flink/table/gateway/config/ExecutionContextTest.java @@ -22,7 +22,7 @@ import com.ververica.flink.table.gateway.context.ExecutionContext; import com.ververica.flink.table.gateway.source.dummy.DummyTableSourceFactory; import com.ververica.flink.table.gateway.utils.EnvironmentFileUtil; - +import org.apache.commons.cli.Options; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.client.cli.DefaultCLI; @@ -38,366 +38,356 @@ import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.util.StringUtils; - -import org.apache.commons.cli.Options; import org.junit.Ignore; import org.junit.Test; import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * Test for {@link ExecutionContext}. */ public class ExecutionContextTest { - private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-gateway-defaults.yaml"; - private static final String MODULES_ENVIRONMENT_FILE = "test-sql-gateway-modules.yaml"; - private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-gateway-catalogs.yaml"; - private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-gateway-streaming.yaml"; - private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-gateway-configuration.yaml"; - - @Test - public void testExecutionConfig() throws Exception { - // test default values defined in ExecutionEntry - final ExecutionContext context = createDefaultExecutionContext(); - final ExecutionConfig config = context.getExecutionConfig(); - - assertEquals(200, config.getAutoWatermarkInterval()); - - final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy(); - assertTrue(restartConfig instanceof RestartStrategies.FallbackRestartStrategyConfiguration); - - // TODO test FailureRateRestartStrategyConfiguration - // final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy = - // (RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig; - // assertEquals(10, failureRateStrategy.getMaxFailureRate()); - // assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds()); - // assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds()); - } - - @Test - public void testModules() throws Exception { - final ExecutionContext context = createModuleExecutionContext(); - final TableEnvironment tableEnv = context.getTableEnvironment(); - - Set allModules = new HashSet<>(Arrays.asList(tableEnv.listModules())); - assertEquals(4, allModules.size()); - assertEquals( - new HashSet<>( - Arrays.asList( - "core", - "mymodule", - "myhive", - "myhive2") - ), - allModules - ); - } - - @Test - public void testCatalogs() throws Exception { - final String inmemoryCatalog = "inmemorycatalog"; - final String hiveCatalog = "hivecatalog"; - final String hiveDefaultVersionCatalog = "hivedefaultversion"; - - final ExecutionContext context = createCatalogExecutionContext(); - final TableEnvironment tableEnv = context.getTableEnvironment(); - - assertEquals(inmemoryCatalog, tableEnv.getCurrentCatalog()); - assertEquals("mydatabase", tableEnv.getCurrentDatabase()); - - Catalog catalog = tableEnv.getCatalog(hiveCatalog).orElse(null); - assertNotNull(catalog); - assertTrue(catalog instanceof HiveCatalog); - assertEquals("2.3.4", ((HiveCatalog) catalog).getHiveVersion()); - - catalog = tableEnv.getCatalog(hiveDefaultVersionCatalog).orElse(null); - assertNotNull(catalog); - assertTrue(catalog instanceof HiveCatalog); - // make sure we have assigned a default hive version - assertFalse(StringUtils.isNullOrWhitespaceOnly(((HiveCatalog) catalog).getHiveVersion())); - - tableEnv.useCatalog(hiveCatalog); - - assertEquals(hiveCatalog, tableEnv.getCurrentCatalog()); - - Set allCatalogs = new HashSet<>(Arrays.asList(tableEnv.listCatalogs())); - assertEquals(6, allCatalogs.size()); - assertEquals( - new HashSet<>( - Arrays.asList( - "default_catalog", - inmemoryCatalog, - hiveCatalog, - hiveDefaultVersionCatalog, - "catalog1", - "catalog2") - ), - allCatalogs - ); - } - - @Test - public void testDatabases() throws Exception { - final String hiveCatalog = "hivecatalog"; - - final ExecutionContext context = createCatalogExecutionContext(); - final TableEnvironment tableEnv = context.getTableEnvironment(); - - assertEquals(1, tableEnv.listDatabases().length); - assertEquals("mydatabase", tableEnv.listDatabases()[0]); - - tableEnv.useCatalog(hiveCatalog); - - assertEquals(2, tableEnv.listDatabases().length); - assertEquals( - new HashSet<>( - Arrays.asList( - HiveCatalog.DEFAULT_DB, - DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE) - ), - new HashSet<>(Arrays.asList(tableEnv.listDatabases())) - ); - - tableEnv.useCatalog(hiveCatalog); - - assertEquals(HiveCatalog.DEFAULT_DB, tableEnv.getCurrentDatabase()); - - tableEnv.useDatabase(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE); - - assertEquals(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, tableEnv.getCurrentDatabase()); - } - - @Test - public void testFunctions() throws Exception { - final ExecutionContext context = createDefaultExecutionContext(); - final TableEnvironment tableEnv = context.getTableEnvironment(); - final String[] expected = new String[] { "scalarudf", "tableudf", "aggregateudf" }; - final String[] actual = tableEnv.listUserDefinedFunctions(); - Arrays.sort(expected); - Arrays.sort(actual); - assertArrayEquals(expected, actual); - } - - @Test - public void testTables() throws Exception { - final ExecutionContext context = createDefaultExecutionContext(); - final TableEnvironment tableEnv = context.getTableEnvironment(); - - assertArrayEquals( - new String[] { "TableNumber1", "TableNumber2", "TableSourceSink", "TestView1", "TestView2" }, - tableEnv.listTables()); - } - - @Ignore - @Test - public void testTemporalTables() throws Exception { - final ExecutionContext context = createStreamingExecutionContext(); - final StreamTableEnvironment tableEnv = (StreamTableEnvironment) context.getTableEnvironment(); - - assertArrayEquals( - new String[] { "EnrichmentSource", "HistorySource", "HistoryView", "TemporalTableUsage" }, - tableEnv.listTables()); - - assertArrayEquals( - new String[] { "sourcetemporaltable", "viewtemporaltable" }, - tableEnv.listUserDefinedFunctions()); - - assertArrayEquals( - new String[] { "integerField", "stringField", "rowtimeField", "integerField0", "stringField0", "rowtimeField0" }, - tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames()); - } - - @Test - public void testConfiguration() throws Exception { - final ExecutionContext context = createConfigurationExecutionContext(); - final TableEnvironment tableEnv = context.getTableEnvironment(); - - assertEquals( - 100, - tableEnv.getConfig().getConfiguration().getInteger( - ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT)); - assertTrue( - tableEnv.getConfig().getConfiguration().getBoolean( - ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)); - assertEquals( - "128kb", - tableEnv.getConfig().getConfiguration().getString( - ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)); - - assertTrue( - tableEnv.getConfig().getConfiguration().getBoolean( - OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)); - - // these options are not modified and should be equal to their default value - assertEquals( - ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(), - tableEnv.getConfig().getConfiguration().getBoolean( - ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)); - assertEquals( - ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(), - tableEnv.getConfig().getConfiguration().getString( - ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE)); - assertEquals( - OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(), - tableEnv.getConfig().getConfiguration().getLong( - OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)); - } - - @Test - public void testInitCatalogs() throws Exception { - final Map replaceVars = createDefaultReplaceVars(); - Environment env = EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars); - - Map catalogProps = new HashMap<>(); - catalogProps.put("name", "test"); - catalogProps.put("type", "test_cl_catalog"); - env.getCatalogs().clear(); - env.getCatalogs().put("test", CatalogEntry.create(catalogProps)); - Configuration flinkConfig = new Configuration(); - ExecutionContext.builder(env, - new Environment(), - Collections.emptyList(), - flinkConfig, - new DefaultClusterClientServiceLoader(), - new Options(), - Collections.singletonList(new DefaultCLI(flinkConfig))).build(); - } - - @SuppressWarnings("unchecked") - private ExecutionContext createExecutionContext(String file, Map replaceVars) - throws Exception { - final Environment env = EnvironmentFileUtil.parseModified( - file, - replaceVars); - final Configuration flinkConfig = new Configuration(); - return (ExecutionContext) ExecutionContext.builder( - env, - new Environment(), - Collections.emptyList(), - flinkConfig, - new DefaultClusterClientServiceLoader(), - new Options(), - Collections.singletonList(new DefaultCLI(flinkConfig))) - .build(); - } - - private Map createDefaultReplaceVars() { - Map replaceVars = new HashMap<>(); - replaceVars.put("$VAR_PLANNER", "old"); - replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); - replaceVars.put("$VAR_RESULT_MODE", "changelog"); - replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); - replaceVars.put("$VAR_MAX_ROWS", "100"); - replaceVars.put("$VAR_RESTART_STRATEGY_TYPE", "failure-rate"); - return replaceVars; - } - - private ExecutionContext createDefaultExecutionContext() throws Exception { - final Map replaceVars = createDefaultReplaceVars(); - return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars); - } - - private ExecutionContext createModuleExecutionContext() throws Exception { - final Map replaceVars = new HashMap<>(); - replaceVars.put("$VAR_PLANNER", "old"); - replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); - replaceVars.put("$VAR_RESULT_MODE", "changelog"); - replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); - replaceVars.put("$VAR_MAX_ROWS", "100"); - return createExecutionContext(MODULES_ENVIRONMENT_FILE, replaceVars); - } - - private ExecutionContext createCatalogExecutionContext() throws Exception { - final Map replaceVars = new HashMap<>(); - replaceVars.put("$VAR_PLANNER", "old"); - replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); - replaceVars.put("$VAR_RESULT_MODE", "changelog"); - replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); - replaceVars.put("$VAR_MAX_ROWS", "100"); - return createExecutionContext(CATALOGS_ENVIRONMENT_FILE, replaceVars); - } - - private ExecutionContext createStreamingExecutionContext() throws Exception { - final Map replaceVars = new HashMap<>(); - replaceVars.put("$VAR_CONNECTOR_TYPE", DummyTableSourceFactory.CONNECTOR_TYPE_VALUE); - replaceVars.put("$VAR_CONNECTOR_PROPERTY", DummyTableSourceFactory.TEST_PROPERTY); - replaceVars.put("$VAR_CONNECTOR_PROPERTY_VALUE", ""); - return createExecutionContext(STREAMING_ENVIRONMENT_FILE, replaceVars); - } - - private ExecutionContext createConfigurationExecutionContext() throws Exception { - return createExecutionContext(CONFIGURATION_ENVIRONMENT_FILE, new HashMap<>()); - } - - // a catalog that requires the thread context class loader to be a user code classloader during construction and opening - private static class TestClassLoaderCatalog extends GenericInMemoryCatalog { - - private static final Class parentFirstCL = FlinkUserCodeClassLoaders - .parentFirst( - new URL[0], - TestClassLoaderCatalog.class.getClassLoader(), - NOOP_EXCEPTION_HANDLER) - .getClass(); - private static final Class childFirstCL = FlinkUserCodeClassLoaders - .childFirst( - new URL[0], - TestClassLoaderCatalog.class.getClassLoader(), - new String[0], - NOOP_EXCEPTION_HANDLER) - .getClass(); - - TestClassLoaderCatalog(String name) { - super(name); - verifyUserClassLoader(); - } - - @Override - public void open() { - verifyUserClassLoader(); - super.open(); - } - - private void verifyUserClassLoader() { - ClassLoader contextLoader = Thread.currentThread().getContextClassLoader(); - assertTrue(parentFirstCL.isInstance(contextLoader) || childFirstCL.isInstance(contextLoader)); - } - } - - /** - * Factory to create TestClassLoaderCatalog. - */ - public static class TestClassLoaderCatalogFactory implements CatalogFactory { - - @Override - public Catalog createCatalog(String name, Map properties) { - return new TestClassLoaderCatalog("test_cl"); - } - - @Override - public Map requiredContext() { - Map context = new HashMap<>(); - context.put("type", "test_cl_catalog"); - return context; - } - - @Override - public List supportedProperties() { - return Collections.emptyList(); - } - } + private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-gateway-defaults.yaml"; + private static final String MODULES_ENVIRONMENT_FILE = "test-sql-gateway-modules.yaml"; + private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-gateway-catalogs.yaml"; + private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-gateway-streaming.yaml"; + private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-gateway-configuration.yaml"; + + @Test + public void testExecutionConfig() throws Exception { + // test default values defined in ExecutionEntry + final ExecutionContext context = createDefaultExecutionContext(); + final ExecutionConfig config = context.getExecutionConfig(); + + assertEquals(200, config.getAutoWatermarkInterval()); + + final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy(); + assertTrue(restartConfig instanceof RestartStrategies.FallbackRestartStrategyConfiguration); + + // TODO test FailureRateRestartStrategyConfiguration + // final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy = + // (RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig; + // assertEquals(10, failureRateStrategy.getMaxFailureRate()); + // assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds()); + // assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds()); + } + + @Test + public void testModules() throws Exception { + final ExecutionContext context = createModuleExecutionContext(); + final TableEnvironment tableEnv = context.getTableEnvironment(); + + Set allModules = new HashSet<>(Arrays.asList(tableEnv.listModules())); + assertEquals(4, allModules.size()); + assertEquals( + new HashSet<>( + Arrays.asList( + "core", + "mymodule", + "myhive", + "myhive2") + ), + allModules + ); + } + + @Test + public void testCatalogs() throws Exception { + final String inmemoryCatalog = "inmemorycatalog"; + final String hiveCatalog = "hivecatalog"; + final String hiveDefaultVersionCatalog = "hivedefaultversion"; + + final ExecutionContext context = createCatalogExecutionContext(); + final TableEnvironment tableEnv = context.getTableEnvironment(); + + assertEquals(inmemoryCatalog, tableEnv.getCurrentCatalog()); + assertEquals("mydatabase", tableEnv.getCurrentDatabase()); + + Catalog catalog = tableEnv.getCatalog(hiveCatalog).orElse(null); + assertNotNull(catalog); + assertTrue(catalog instanceof HiveCatalog); + assertEquals("2.3.4", ((HiveCatalog) catalog).getHiveVersion()); + + catalog = tableEnv.getCatalog(hiveDefaultVersionCatalog).orElse(null); + assertNotNull(catalog); + assertTrue(catalog instanceof HiveCatalog); + // make sure we have assigned a default hive version + assertFalse(StringUtils.isNullOrWhitespaceOnly(((HiveCatalog) catalog).getHiveVersion())); + + tableEnv.useCatalog(hiveCatalog); + + assertEquals(hiveCatalog, tableEnv.getCurrentCatalog()); + + Set allCatalogs = new HashSet<>(Arrays.asList(tableEnv.listCatalogs())); + assertEquals(6, allCatalogs.size()); + assertEquals( + new HashSet<>( + Arrays.asList( + "default_catalog", + inmemoryCatalog, + hiveCatalog, + hiveDefaultVersionCatalog, + "catalog1", + "catalog2") + ), + allCatalogs + ); + } + + @Test + public void testDatabases() throws Exception { + final String hiveCatalog = "hivecatalog"; + + final ExecutionContext context = createCatalogExecutionContext(); + final TableEnvironment tableEnv = context.getTableEnvironment(); + + assertEquals(1, tableEnv.listDatabases().length); + assertEquals("mydatabase", tableEnv.listDatabases()[0]); + + tableEnv.useCatalog(hiveCatalog); + + assertEquals(2, tableEnv.listDatabases().length); + assertEquals( + new HashSet<>( + Arrays.asList( + HiveCatalog.DEFAULT_DB, + DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE) + ), + new HashSet<>(Arrays.asList(tableEnv.listDatabases())) + ); + + tableEnv.useCatalog(hiveCatalog); + + assertEquals(HiveCatalog.DEFAULT_DB, tableEnv.getCurrentDatabase()); + + tableEnv.useDatabase(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE); + + assertEquals(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, tableEnv.getCurrentDatabase()); + } + + @Test + public void testFunctions() throws Exception { + final ExecutionContext context = createDefaultExecutionContext(); + final TableEnvironment tableEnv = context.getTableEnvironment(); + final String[] expected = new String[]{"scalarudf", "tableudf", "aggregateudf"}; + final String[] actual = tableEnv.listUserDefinedFunctions(); + Arrays.sort(expected); + Arrays.sort(actual); + assertArrayEquals(expected, actual); + } + + @Test + public void testTables() throws Exception { + final ExecutionContext context = createDefaultExecutionContext(); + final TableEnvironment tableEnv = context.getTableEnvironment(); + + assertArrayEquals( + new String[]{"TableNumber1", "TableNumber2", "TableSourceSink", "TestView1", "TestView2"}, + tableEnv.listTables()); + } + + @Ignore + @Test + public void testTemporalTables() throws Exception { + final ExecutionContext context = createStreamingExecutionContext(); + final StreamTableEnvironment tableEnv = (StreamTableEnvironment) context.getTableEnvironment(); + + assertArrayEquals( + new String[]{"EnrichmentSource", "HistorySource", "HistoryView", "TemporalTableUsage"}, + tableEnv.listTables()); + + assertArrayEquals( + new String[]{"sourcetemporaltable", "viewtemporaltable"}, + tableEnv.listUserDefinedFunctions()); + + assertArrayEquals( + new String[]{"integerField", "stringField", "rowtimeField", "integerField0", "stringField0", "rowtimeField0"}, + tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames()); + } + + @Test + public void testConfiguration() throws Exception { + final ExecutionContext context = createConfigurationExecutionContext(); + final TableEnvironment tableEnv = context.getTableEnvironment(); + + assertEquals( + 100, + tableEnv.getConfig().getConfiguration().getInteger( + ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT)); + assertTrue( + tableEnv.getConfig().getConfiguration().getBoolean( + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)); + assertEquals( + "128kb", + tableEnv.getConfig().getConfiguration().getString( + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)); + + assertTrue( + tableEnv.getConfig().getConfiguration().getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)); + + // these options are not modified and should be equal to their default value + assertEquals( + ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(), + tableEnv.getConfig().getConfiguration().getBoolean( + ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)); + assertEquals( + ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(), + tableEnv.getConfig().getConfiguration().getString( + ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE)); + assertEquals( + OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(), + tableEnv.getConfig().getConfiguration().getLong( + OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)); + } + + @Test + public void testInitCatalogs() throws Exception { + final Map replaceVars = createDefaultReplaceVars(); + Environment env = EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars); + + Map catalogProps = new HashMap<>(); + catalogProps.put("name", "test"); + catalogProps.put("type", "test_cl_catalog"); + env.getCatalogs().clear(); + env.getCatalogs().put("test", CatalogEntry.create(catalogProps)); + Configuration flinkConfig = new Configuration(); + ExecutionContext.builder(env, + new Environment(), + Collections.emptyList(), + flinkConfig, + new DefaultClusterClientServiceLoader(), + new Options(), + Collections.singletonList(new DefaultCLI())).build(); + } + + @SuppressWarnings("unchecked") + private ExecutionContext createExecutionContext(String file, Map replaceVars) + throws Exception { + final Environment env = EnvironmentFileUtil.parseModified( + file, + replaceVars); + final Configuration flinkConfig = new Configuration(); + return (ExecutionContext) ExecutionContext.builder( + env, + new Environment(), + Collections.emptyList(), + flinkConfig, + new DefaultClusterClientServiceLoader(), + new Options(), + Collections.singletonList(new DefaultCLI())) + .build(); + } + + private Map createDefaultReplaceVars() { + Map replaceVars = new HashMap<>(); + replaceVars.put("$VAR_PLANNER", "old"); + replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); + replaceVars.put("$VAR_RESULT_MODE", "changelog"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); + replaceVars.put("$VAR_MAX_ROWS", "100"); + replaceVars.put("$VAR_RESTART_STRATEGY_TYPE", "failure-rate"); + return replaceVars; + } + + private ExecutionContext createDefaultExecutionContext() throws Exception { + final Map replaceVars = createDefaultReplaceVars(); + return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars); + } + + private ExecutionContext createModuleExecutionContext() throws Exception { + final Map replaceVars = new HashMap<>(); + replaceVars.put("$VAR_PLANNER", "old"); + replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); + replaceVars.put("$VAR_RESULT_MODE", "changelog"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); + replaceVars.put("$VAR_MAX_ROWS", "100"); + return createExecutionContext(MODULES_ENVIRONMENT_FILE, replaceVars); + } + + private ExecutionContext createCatalogExecutionContext() throws Exception { + final Map replaceVars = new HashMap<>(); + replaceVars.put("$VAR_PLANNER", "old"); + replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); + replaceVars.put("$VAR_RESULT_MODE", "changelog"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); + replaceVars.put("$VAR_MAX_ROWS", "100"); + return createExecutionContext(CATALOGS_ENVIRONMENT_FILE, replaceVars); + } + + private ExecutionContext createStreamingExecutionContext() throws Exception { + final Map replaceVars = new HashMap<>(); + replaceVars.put("$VAR_CONNECTOR_TYPE", DummyTableSourceFactory.CONNECTOR_TYPE_VALUE); + replaceVars.put("$VAR_CONNECTOR_PROPERTY", DummyTableSourceFactory.TEST_PROPERTY); + replaceVars.put("$VAR_CONNECTOR_PROPERTY_VALUE", ""); + return createExecutionContext(STREAMING_ENVIRONMENT_FILE, replaceVars); + } + + private ExecutionContext createConfigurationExecutionContext() throws Exception { + return createExecutionContext(CONFIGURATION_ENVIRONMENT_FILE, new HashMap<>()); + } + + // a catalog that requires the thread context class loader to be a user code classloader during construction and opening + private static class TestClassLoaderCatalog extends GenericInMemoryCatalog { + + private static final Class parentFirstCL = FlinkUserCodeClassLoaders + .parentFirst( + new URL[0], + TestClassLoaderCatalog.class.getClassLoader(), + NOOP_EXCEPTION_HANDLER, + true) + .getClass(); + private static final Class childFirstCL = FlinkUserCodeClassLoaders + .childFirst( + new URL[0], + TestClassLoaderCatalog.class.getClassLoader(), + new String[0], + NOOP_EXCEPTION_HANDLER, + true) + .getClass(); + + TestClassLoaderCatalog(String name) { + super(name); + verifyUserClassLoader(); + } + + @Override + public void open() { + verifyUserClassLoader(); + super.open(); + } + + private void verifyUserClassLoader() { + ClassLoader contextLoader = Thread.currentThread().getContextClassLoader(); + assertTrue(parentFirstCL.isInstance(contextLoader) || childFirstCL.isInstance(contextLoader)); + } + } + + /** + * Factory to create TestClassLoaderCatalog. + */ + public static class TestClassLoaderCatalogFactory implements CatalogFactory { + + @Override + public Catalog createCatalog(String name, Map properties) { + return new TestClassLoaderCatalog("test_cl"); + } + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put("type", "test_cl_catalog"); + return context; + } + + @Override + public List supportedProperties() { + return Collections.emptyList(); + } + } } diff --git a/src/test/java/com/ververica/flink/table/gateway/operation/OperationTestBase.java b/src/test/java/com/ververica/flink/table/gateway/operation/OperationTestBase.java index e78d81f..f2776c6 100644 --- a/src/test/java/com/ververica/flink/table/gateway/operation/OperationTestBase.java +++ b/src/test/java/com/ververica/flink/table/gateway/operation/OperationTestBase.java @@ -23,11 +23,9 @@ import com.ververica.flink.table.gateway.context.DefaultContext; import com.ververica.flink.table.gateway.context.SessionContext; import com.ververica.flink.table.gateway.rest.session.SessionID; - import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.configuration.Configuration; - import org.junit.Before; import java.util.Collections; @@ -55,7 +53,7 @@ protected DefaultContext getDefaultContext() { new Environment(), Collections.emptyList(), new Configuration(), - new DefaultCLI(new Configuration()), + new DefaultCLI(), new DefaultClusterClientServiceLoader()); } diff --git a/src/test/java/com/ververica/flink/table/gateway/operation/OperationWithUserJarTest.java b/src/test/java/com/ververica/flink/table/gateway/operation/OperationWithUserJarTest.java index 5b10e71..b1a9b61 100644 --- a/src/test/java/com/ververica/flink/table/gateway/operation/OperationWithUserJarTest.java +++ b/src/test/java/com/ververica/flink/table/gateway/operation/OperationWithUserJarTest.java @@ -26,14 +26,12 @@ import com.ververica.flink.table.gateway.rest.result.ResultKind; import com.ververica.flink.table.gateway.rest.result.ResultSet; import com.ververica.flink.table.gateway.utils.ResourceFileUtils; - import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.types.Row; - import org.junit.Test; import java.io.File; @@ -43,9 +41,7 @@ import java.util.Collections; import java.util.List; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * Test for operations when user jars are provided. @@ -62,7 +58,7 @@ protected DefaultContext getDefaultContext() { new Environment(), Collections.singletonList(jarUrl), new Configuration(), - new DefaultCLI(new Configuration()), + new DefaultCLI(), new DefaultClusterClientServiceLoader()); } diff --git a/src/test/java/com/ververica/flink/table/gateway/sink/TestTableSinkFactoryBase.java b/src/test/java/com/ververica/flink/table/gateway/sink/TestTableSinkFactoryBase.java index 0fc7359..82422f8 100644 --- a/src/test/java/com/ververica/flink/table/gateway/sink/TestTableSinkFactoryBase.java +++ b/src/test/java/com/ververica/flink/table/gateway/sink/TestTableSinkFactoryBase.java @@ -38,18 +38,9 @@ import java.util.Map; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.DescriptorProperties.*; +import static org.apache.flink.table.descriptors.Rowtime.*; +import static org.apache.flink.table.descriptors.Schema.*; import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; @@ -113,7 +104,7 @@ public static class TestTableSink implements TableSink, AppendStreamTableSi private final String property; public TestTableSink(TableSchema schema, String property) { - this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema); + this.schema = TableSchemaUtils.checkOnlyPhysicalColumns(schema); this.property = property; } diff --git a/src/test/java/com/ververica/flink/table/gateway/source/TestTableSourceFactoryBase.java b/src/test/java/com/ververica/flink/table/gateway/source/TestTableSourceFactoryBase.java index 64bc7cd..02ec8f6 100644 --- a/src/test/java/com/ververica/flink/table/gateway/source/TestTableSourceFactoryBase.java +++ b/src/test/java/com/ververica/flink/table/gateway/source/TestTableSourceFactoryBase.java @@ -33,25 +33,12 @@ import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.DescriptorProperties.*; +import static org.apache.flink.table.descriptors.Rowtime.*; +import static org.apache.flink.table.descriptors.Schema.*; import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; @@ -122,7 +109,7 @@ public static class TestTableSource implements StreamTableSource, DefinedRo private final List rowtime; public TestTableSource(TableSchema schema, String property, String proctime, List rowtime) { - this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema); + this.schema = TableSchemaUtils.checkOnlyPhysicalColumns(schema); this.property = property; this.proctime = proctime; this.rowtime = rowtime; From 3d1394abe715b2c379c0bf87864521d39342d93b Mon Sep 17 00:00:00 2001 From: Romain Rigaux Date: Wed, 23 Dec 2020 21:19:28 +0100 Subject: [PATCH 2/3] Bumping version to 0.3 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 557fe55..e28abe0 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ under the License. flink-sql-gateway com.ververica flink-sql-gateway - 0.2-SNAPSHOT + 0.3-SNAPSHOT Flink SQL gateway is a service that allows other applications @@ -658,4 +658,4 @@ under the License. - \ No newline at end of file + From 88f947dfe514851d656121fd17d035607eb7b214 Mon Sep 17 00:00:00 2001 From: wangzhiqiang Date: Mon, 28 Dec 2020 23:58:23 +0800 Subject: [PATCH 3/3] [fix] describe sometimes not work --- .../table/gateway/operation/DescribeTableOperation.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java b/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java index 39c6763..b155b2b 100644 --- a/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java +++ b/src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java @@ -85,10 +85,10 @@ public ResultSet execute() { boolean isNullable = logicalType.isNullable(); String key = fieldToPrimaryKey.getOrDefault(column.getName(), null); - /** - * todo - */ - String computedColumn = ((TableColumn.ComputedColumn)column).getExpression(); + String computedColumn = null; + if (column instanceof TableColumn.ComputedColumn) { + computedColumn = ((TableColumn.ComputedColumn) column).getExpression(); + } String watermark = fieldToWatermark.getOrDefault(column.getName(), null); data.add(Row.of(name, type, isNullable, key, computedColumn, watermark));