diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java deleted file mode 100644 index aa3e0f9b8528..000000000000 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogIngestionTest.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.catalog.sql; - -import org.apache.druid.catalog.CatalogException; -import org.apache.druid.catalog.model.Columns; -import org.apache.druid.catalog.model.TableMetadata; -import org.apache.druid.catalog.model.table.TableBuilder; -import org.apache.druid.catalog.storage.CatalogStorage; -import org.apache.druid.catalog.storage.CatalogTests; -import org.apache.druid.catalog.sync.CachedMetadataCatalog; -import org.apache.druid.catalog.sync.MetadataCatalog; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.CalciteIngestionDmlTest; -import org.apache.druid.sql.calcite.filtration.Filtration; -import org.apache.druid.sql.calcite.planner.CatalogResolver; -import org.apache.druid.sql.calcite.util.CalciteTests; -import org.apache.druid.sql.calcite.util.SqlTestFramework; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import java.util.Arrays; - -import static org.junit.Assert.fail; - -/** - * Test the use of catalog specs to drive MSQ ingestion. - */ -public class CatalogIngestionTest extends CalciteIngestionDmlTest -{ - @RegisterExtension - public static final DerbyConnectorRule5 DERBY_CONNECTION_RULE = new DerbyConnectorRule5(); - - /** - * Signature for the foo datasource after applying catalog metadata. - */ - private static final RowSignature FOO_SIGNATURE = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("extra1", ColumnType.STRING) - .add("dim2", ColumnType.STRING) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m1", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .add("m2", ColumnType.DOUBLE) - .build(); - - private static CatalogStorage storage; - - @Override - public CatalogResolver createCatalogResolver() - { - CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE); - storage = dbFixture.storage; - MetadataCatalog catalog = new CachedMetadataCatalog( - storage, - storage.schemaRegistry(), - storage.jsonMapper() - ); - return new LiveCatalogResolver(catalog); - } - - @Override - public void finalizeTestFramework(SqlTestFramework sqlTestFramework) - { - super.finalizeTestFramework(sqlTestFramework); - buildTargetDatasources(); - buildFooDatasource(); - } - - private void buildTargetDatasources() - { - TableMetadata spec = TableBuilder.datasource("hourDs", "PT1H") - .build(); - createTableMetadata(spec); - } - - public void buildFooDatasource() - { - TableMetadata spec = TableBuilder.datasource("foo", "ALL") - .timeColumn() - .column("extra1", null) - .column("dim2", null) - .column("dim1", null) - .column("cnt", null) - .column("m1", Columns.DOUBLE) - .column("extra2", Columns.LONG) - .column("extra3", Columns.STRING) - .hiddenColumns(Arrays.asList("dim3", "unique_dim1")) - .sealed(true) - .build(); - createTableMetadata(spec); - } - - private void createTableMetadata(TableMetadata table) - { - try { - storage.tables().create(table); - } - catch (CatalogException e) { - fail(e.getMessage()); - } - } - - /** - * If the segment grain is given in the catalog then use this value is used. - */ - @Test - public void testInsertHourGrain() - { - testIngestionQuery() - .sql("INSERT INTO hourDs\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2") - .context(queryContextWithGranularity(Granularities.HOUR)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is given in the catalog, and also by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testInsertHourGrainWithDay() - { - testIngestionQuery() - .sql("INSERT INTO hourDs\n" + - "SELECT * FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } -} diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java new file mode 100644 index 000000000000..d4a97e666ed4 --- /dev/null +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sql; + +import org.apache.druid.catalog.CatalogException; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.facade.DatasourceFacade; +import org.apache.druid.catalog.model.table.ClusterKeySpec; +import org.apache.druid.catalog.model.table.TableBuilder; +import org.apache.druid.catalog.storage.CatalogStorage; +import org.apache.druid.catalog.storage.CatalogTests; +import org.apache.druid.catalog.sync.CachedMetadataCatalog; +import org.apache.druid.catalog.sync.MetadataCatalog; +import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5; +import org.apache.druid.sql.calcite.CalciteCatalogInsertTest; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.junit.Assert.fail; + +/** + * Test the use of catalog specs to drive MSQ ingestion. + */ +public class CatalogInsertTest extends CalciteCatalogInsertTest +{ + @RegisterExtension + public static final DerbyConnectorRule5 DERBY_CONNECTION_RULE = new DerbyConnectorRule5(); + + private static CatalogStorage storage; + + @Override + public CatalogResolver createCatalogResolver() + { + CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE); + storage = dbFixture.storage; + MetadataCatalog catalog = new CachedMetadataCatalog( + storage, + storage.schemaRegistry(), + storage.jsonMapper() + ); + return new LiveCatalogResolver(catalog); + } + + @Override + public void finalizeTestFramework(SqlTestFramework sqlTestFramework) + { + super.finalizeTestFramework(sqlTestFramework); + buildDatasources(); + } + + public void buildDatasources() + { + resolvedTables.forEach((datasourceName, datasourceTable) -> { + DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata(); + TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString()); + catalogMetadata.columnFacades().forEach( + columnFacade -> { + tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType()); + } + ); + + if (catalogMetadata.hiddenColumns() != null && !catalogMetadata.hiddenColumns().isEmpty()) { + tableBuilder.hiddenColumns(catalogMetadata.hiddenColumns()); + } + + if (catalogMetadata.isSealed()) { + tableBuilder.sealed(true); + } + + if (catalogMetadata.clusterKeys() != null && !catalogMetadata.clusterKeys().isEmpty()) { + tableBuilder.clusterColumns(catalogMetadata.clusterKeys().toArray(new ClusterKeySpec[0])); + } + + createTableMetadata(tableBuilder.build()); + }); + DatasourceFacade catalogMetadata = + ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata(); + TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString()); + catalogMetadata.columnFacades().forEach( + columnFacade -> { + tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType()); + } + ); + } + + private void createTableMetadata(TableMetadata table) + { + try { + storage.tables().create(table); + } + catch (CatalogException e) { + fail(e.getMessage()); + } + } +} diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java new file mode 100644 index 000000000000..34011fb2205f --- /dev/null +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.sql; + +import org.apache.druid.catalog.CatalogException; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.facade.DatasourceFacade; +import org.apache.druid.catalog.model.table.ClusterKeySpec; +import org.apache.druid.catalog.model.table.TableBuilder; +import org.apache.druid.catalog.storage.CatalogStorage; +import org.apache.druid.catalog.storage.CatalogTests; +import org.apache.druid.catalog.sync.CachedMetadataCatalog; +import org.apache.druid.catalog.sync.MetadataCatalog; +import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5; +import org.apache.druid.sql.calcite.CalciteCatalogReplaceTest; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.junit.Assert.fail; + +/** + * Test the use of catalog specs to drive MSQ ingestion. + */ +public class CatalogReplaceTest extends CalciteCatalogReplaceTest +{ + @RegisterExtension + public static final DerbyConnectorRule5 DERBY_CONNECTION_RULE = new DerbyConnectorRule5(); + private static CatalogStorage storage; + + @Override + public CatalogResolver createCatalogResolver() + { + CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE); + storage = dbFixture.storage; + MetadataCatalog catalog = new CachedMetadataCatalog( + storage, + storage.schemaRegistry(), + storage.jsonMapper() + ); + return new LiveCatalogResolver(catalog); + } + + @Override + public void finalizeTestFramework(SqlTestFramework sqlTestFramework) + { + super.finalizeTestFramework(sqlTestFramework); + buildDatasources(); + } + + public void buildDatasources() + { + resolvedTables.forEach((datasourceName, datasourceTable) -> { + DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata(); + TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString()); + catalogMetadata.columnFacades().forEach( + columnFacade -> { + tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType()); + } + ); + + if (catalogMetadata.hiddenColumns() != null && !catalogMetadata.hiddenColumns().isEmpty()) { + tableBuilder.hiddenColumns(catalogMetadata.hiddenColumns()); + } + + if (catalogMetadata.isSealed()) { + tableBuilder.sealed(true); + } + + if (catalogMetadata.clusterKeys() != null && !catalogMetadata.clusterKeys().isEmpty()) { + tableBuilder.clusterColumns(catalogMetadata.clusterKeys().toArray(new ClusterKeySpec[0])); + } + + createTableMetadata(tableBuilder.build()); + }); + DatasourceFacade catalogMetadata = + ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata(); + TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString()); + catalogMetadata.columnFacades().forEach( + columnFacade -> { + tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType()); + } + ); + } + + private void createTableMetadata(TableMetadata table) + { + try { + storage.tables().create(table); + } + catch (CatalogException e) { + fail(e.getMessage()); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java index 5abc866dcbf2..d81907ef1443 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java @@ -241,18 +241,7 @@ public static Granularity convertSqlNodeToGranularity(SqlNode sqlNode) private static Granularity convertSqlLiteralCharToGranularity(SqlLiteral literal) { - String value = literal.getValueAs(String.class); - try { - return Granularity.fromString(value); - } - catch (IllegalArgumentException e) { - try { - return new PeriodGranularity(new Period(value), null, null); - } - catch (Exception e2) { - throw makeInvalidPartitionByException(literal); - } - } + return convertStringToGranularity(literal.getValueAs(String.class), literal); } private static Granularity convertSqlIdentiferToGranularity(SqlIdentifier identifier) @@ -260,7 +249,11 @@ private static Granularity convertSqlIdentiferToGranularity(SqlIdentifier identi if (identifier.names.isEmpty()) { throw makeInvalidPartitionByException(identifier); } - String value = identifier.names.get(0); + return convertStringToGranularity(identifier.names.get(0), identifier); + } + + private static Granularity convertStringToGranularity(String value, SqlNode node) + { try { return Granularity.fromString(value); } @@ -269,7 +262,7 @@ private static Granularity convertSqlIdentiferToGranularity(SqlIdentifier identi return new PeriodGranularity(new Period(value), null, null); } catch (Exception e2) { - throw makeInvalidPartitionByException(identifier); + throw makeInvalidPartitionByException(node); } } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index fa7d7b2fe8b6..7b1e5742d3d9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -20,7 +20,6 @@ package org.apache.druid.sql.calcite.planner; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.prepare.BaseDruidSqlValidator; import org.apache.calcite.prepare.CalciteCatalogReader; @@ -34,25 +33,34 @@ import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlUpdate; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWindow; +import org.apache.calcite.sql.SqlWith; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.validate.IdentifierNamespace; +import org.apache.calcite.sql.validate.SqlNonNullableAccessors; import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.calcite.sql.validate.SqlValidatorNamespace; import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.sql.validate.SqlValidatorTable; import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Static; import org.apache.calcite.util.Util; import org.apache.druid.catalog.model.facade.DatasourceFacade; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.QueryContexts; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; @@ -64,6 +72,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.regex.Pattern; /** @@ -77,14 +86,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator // Copied here from MSQE since that extension is not visible here. public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment"; - public interface ValidatorContext - { - Map queryContextMap(); - CatalogResolver catalog(); - String druidSchemaName(); - ObjectMapper jsonMapper(); - } - private final PlannerContext plannerContext; protected DruidSqlValidator( @@ -153,6 +154,12 @@ public void validateWindow(SqlNode windowOrId, SqlValidatorScope scope, @Nullabl super.validateWindow(windowOrId, scope, call); } + /** + * Most of the implementation here is copied over from {@link org.apache.calcite.sql.validate.SqlValidator#validateInsert(SqlInsert)} + * we've extended, refactored, and extracted methods, to fit out needs, and added comments where appropriate. + * + * @param insert INSERT statement + */ @Override public void validateInsert(final SqlInsert insert) { @@ -173,7 +180,10 @@ public void validateInsert(final SqlInsert insert) } // The target namespace is both the target table ID and the row type for that table. - final SqlValidatorNamespace targetNamespace = getNamespace(insert); + final SqlValidatorNamespace targetNamespace = Objects.requireNonNull( + getNamespace(insert), + () -> "namespace for " + insert + ); final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace; // The target is a new or existing datasource. final DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName); @@ -226,6 +236,20 @@ public void validateInsert(final SqlInsert insert) // Determine the output (target) schema. final RelDataType targetType = validateTargetType(scope, insertNs, insert, sourceType, tableMetadata); + // WITH node type is computed to be the type of the body recursively in + // org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery}. If this computed type + // is different than the type validated and stored for the node in memory a nasty relational + // algebra error will occur in org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType. + // During the validateTargetType call above, the WITH body node validated type may be updated + // with any coercions applied. We update the validated node type of the WITH node here so + // that they are consistent. + if (source instanceof SqlWith) { + final RelDataType withBodyType = getValidatedNodeTypeIfKnown(((SqlWith) source).body); + if (withBodyType != null) { + setValidatedNodeType(source, withBodyType); + } + } + // Set the type for the INSERT/REPLACE node setValidatedNodeType(insert, targetType); @@ -379,10 +403,11 @@ private RelDataType validateTargetType( for (final RelDataTypeField sourceField : sourceFields) { // Check that there are no unnamed columns in the insert. if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) { - throw InvalidSqlInput.exception( + throw buildCalciteContextException( "Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually " + "the result of applying a function without having an AS clause, please ensure that all function calls" - + "are named with an AS clause as in \"func(X) as myColumn\"." + + "are named with an AS clause as in \"func(X) as myColumn\".", + getSqlNodeFor(insert, sourceFields.indexOf(sourceField)) ); } } @@ -424,19 +449,26 @@ private RelDataType validateTargetType( // extensions which are not loaded. Those details are not known at the time // of this code so we are not yet in a position to make the right decision. // This is a task to be revisited when we have more information. - final String sqlTypeName = definedCol.sqlStorageType(); - if (sqlTypeName == null) { + if (definedCol.sqlStorageType() == null) { // Don't know the storage type. Just skip this one: Druid types are // fluid so let Druid sort out what to store. This is probably not the right // answer, but should avoid problems until full type system support is completed. fields.add(Pair.of(colName, sourceField.getType())); continue; } - RelDataType relType = typeFactory.createSqlType(SqlTypeName.get(sqlTypeName)); - fields.add(Pair.of( - colName, - typeFactory.createTypeWithNullability(relType, true) - )); + SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType()); + RelDataType relType = typeFactory.createSqlType(sqlTypeName); + if (NullHandling.replaceWithDefault() && !SqlTypeFamily.STRING.contains(relType)) { + fields.add(Pair.of( + colName, + relType + )); + } else { + fields.add(Pair.of( + colName, + typeFactory.createTypeWithNullability(relType, true) + )); + } } // Perform the SQL-standard check: that the SELECT column can be @@ -449,6 +481,89 @@ private RelDataType validateTargetType( return targetType; } + @Override + protected void checkTypeAssignment( + @Nullable SqlValidatorScope sourceScope, + SqlValidatorTable table, + RelDataType sourceRowType, + RelDataType targetRowType, + final SqlNode query + ) + { + final List sourceFields = sourceRowType.getFieldList(); + List targetFields = targetRowType.getFieldList(); + final int sourceCount = sourceFields.size(); + for (int i = 0; i < sourceCount; ++i) { + RelDataType sourceFielRelDataType = sourceFields.get(i).getType(); + RelDataType targetFieldRelDataType = targetFields.get(i).getType(); + ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType); + ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType); + + if (targetFieldColumnType != ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)) { + SqlNode node = getNthExpr(query, i, sourceCount); + String targetTypeString; + String sourceTypeString; + if (SqlTypeUtil.areCharacterSetsMismatched( + sourceFielRelDataType, + targetFieldRelDataType)) { + sourceTypeString = sourceFielRelDataType.getFullTypeString(); + targetTypeString = targetFieldRelDataType.getFullTypeString(); + } else { + sourceTypeString = sourceFielRelDataType.toString(); + targetTypeString = targetFieldRelDataType.toString(); + } + throw newValidationError(node, + Static.RESOURCE.typeNotAssignable( + targetFields.get(i).getName(), targetTypeString, + sourceFields.get(i).getName(), sourceTypeString)); + } + } + // the call to base class definition will insert implicit casts / coercions where needed. + super.checkTypeAssignment(sourceScope, table, sourceRowType, targetRowType, query); + } + + /** + * Locates the n'th expression in an INSERT or UPDATE query. + * + * @param query Query + * @param ordinal Ordinal of expression + * @param sourceCount Number of expressions + * @return Ordinal'th expression, never null + */ + private static SqlNode getNthExpr(SqlNode query, int ordinal, int sourceCount) + { + if (query instanceof SqlInsert) { + SqlInsert insert = (SqlInsert) query; + if (insert.getTargetColumnList() != null) { + return insert.getTargetColumnList().get(ordinal); + } else { + return getNthExpr( + insert.getSource(), + ordinal, + sourceCount); + } + } else if (query instanceof SqlUpdate) { + SqlUpdate update = (SqlUpdate) query; + if (update.getSourceExpressionList() != null) { + return update.getSourceExpressionList().get(ordinal); + } else { + return getNthExpr( + SqlNonNullableAccessors.getSourceSelect(update), + ordinal, sourceCount); + } + } else if (query instanceof SqlSelect) { + SqlSelect select = (SqlSelect) query; + SqlNodeList selectList = SqlNonNullableAccessors.getSelectList(select); + if (selectList.size() == sourceCount) { + return selectList.get(ordinal); + } else { + return query; // give up + } + } else { + return query; // give up + } + } + private boolean isPrecedingOrFollowing(@Nullable SqlNode bound) { if (bound == null) { @@ -530,4 +645,17 @@ public static CalciteContextException buildCalciteContextException(Throwable t, pos.getEndColumnNum() ); } + + private SqlNode getSqlNodeFor(SqlInsert insert, int idx) + { + SqlNode src = insert.getSource(); + if (src instanceof SqlSelect) { + SqlSelect sqlSelect = (SqlSelect) src; + SqlNodeList selectList = sqlSelect.getSelectList(); + if (idx < selectList.size()) { + return selectList.get(idx); + } + } + return src; + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java index d936d64ea881..dfd3d7260607 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DatasourceTable.java @@ -175,7 +175,7 @@ public EffectiveMetadata( this.columns = columns; } - private static Map toEffectiveColumns(RowSignature rowSignature) + public static Map toEffectiveColumns(RowSignature rowSignature) { Map columns = new HashMap<>(); for (int i = 0; i < rowSignature.size(); i++) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java new file mode 100644 index 000000000000..2d3fb5d7f114 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.catalog.model.ColumnSpec; +import org.apache.druid.catalog.model.Columns; +import org.apache.druid.catalog.model.ResolvedTable; +import org.apache.druid.catalog.model.TableDefn; +import org.apache.druid.catalog.model.TableSpec; +import org.apache.druid.catalog.model.facade.DatasourceFacade; +import org.apache.druid.catalog.model.table.DatasourceDefn; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.sql.calcite.table.DruidTable; + +public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest +{ + public ImmutableMap resolvedTables = ImmutableMap.of( + "hourDs", new DatasourceTable( + RowSignature.builder().addTimeColumn().build(), + new DatasourceTable.PhysicalDatasourceMetadata( + new TableDataSource("hourDs"), + RowSignature.builder().addTimeColumn().build(), + false, + false + ), + new DatasourceTable.EffectiveMetadata( + new DatasourceFacade(new ResolvedTable( + new TableDefn( + "foo", + DatasourceDefn.TABLE_TYPE, + null, + null + ), + new TableSpec( + DatasourceDefn.TABLE_TYPE, + ImmutableMap.of(DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "PT1H"), + ImmutableList.of( + new ColumnSpec("__time", Columns.TIME_COLUMN, null) + ) + ), + MAPPER + )), + DatasourceTable.EffectiveMetadata.toEffectiveColumns( + RowSignature.builder() + .addTimeColumn() + .build()), + false + ) + ), + "noPartitonedBy", new DatasourceTable( + RowSignature.builder().addTimeColumn().build(), + new DatasourceTable.PhysicalDatasourceMetadata( + new TableDataSource("hourDs"), + RowSignature.builder().addTimeColumn().build(), + false, + false + ), + new DatasourceTable.EffectiveMetadata( + new DatasourceFacade(new ResolvedTable( + new TableDefn( + "foo", + DatasourceDefn.TABLE_TYPE, + null, + null + ), + new TableSpec( + DatasourceDefn.TABLE_TYPE, + ImmutableMap.of(), + ImmutableList.of( + new ColumnSpec("__time", Columns.TIME_COLUMN, null) + ) + ), + MAPPER + )), + DatasourceTable.EffectiveMetadata.toEffectiveColumns( + RowSignature.builder() + .addTimeColumn() + .build()), + false + ) + ), + "strictTableWithNoDefinedSchema", new DatasourceTable( + RowSignature.builder().build(), + new DatasourceTable.PhysicalDatasourceMetadata( + new TableDataSource("strictTableWithNoDefinedSchema"), + RowSignature.builder().build(), + false, + false + ), + new DatasourceTable.EffectiveMetadata( + new DatasourceFacade(new ResolvedTable( + new TableDefn( + "strictTableWithNoDefinedSchema", + DatasourceDefn.TABLE_TYPE, + null, + null + ), + new TableSpec(DatasourceDefn.TABLE_TYPE, ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true), null), + MAPPER + )), + DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder().build()), + false + ) + ), + "foo", new DatasourceTable( + FOO_TABLE_SIGNATURE, + new DatasourceTable.PhysicalDatasourceMetadata( + new TableDataSource("foo"), + FOO_TABLE_SIGNATURE, + false, + false + ), + new DatasourceTable.EffectiveMetadata( + new DatasourceFacade(new ResolvedTable( + new TableDefn( + "foo", + DatasourceDefn.TABLE_TYPE, + null, + null + ), + new TableSpec( + DatasourceDefn.TABLE_TYPE, + ImmutableMap.of(), + ImmutableList.of( + new ColumnSpec("__time", Columns.TIME_COLUMN, null), + new ColumnSpec("dim1", Columns.STRING, null), + new ColumnSpec("dim2", Columns.STRING, null), + new ColumnSpec("dim3", Columns.STRING, null), + new ColumnSpec("cnt", Columns.LONG, null), + new ColumnSpec("m1", Columns.FLOAT, null), + new ColumnSpec("m2", Columns.DOUBLE, null) + ) + ), + MAPPER + )), + DatasourceTable.EffectiveMetadata.toEffectiveColumns(FOO_TABLE_SIGNATURE), + false + ) + ), + "fooSealed", new DatasourceTable( + FOO_TABLE_SIGNATURE, + new DatasourceTable.PhysicalDatasourceMetadata( + new TableDataSource("foo"), + FOO_TABLE_SIGNATURE, + false, + false + ), + new DatasourceTable.EffectiveMetadata( + new DatasourceFacade(new ResolvedTable( + new TableDefn( + "foo", + DatasourceDefn.TABLE_TYPE, + null, + null + ), + new TableSpec( + DatasourceDefn.TABLE_TYPE, + ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true), + ImmutableList.of( + new ColumnSpec("__time", Columns.TIME_COLUMN, null), + new ColumnSpec("dim1", Columns.STRING, null), + new ColumnSpec("dim2", Columns.STRING, null), + new ColumnSpec("dim3", Columns.STRING, null), + new ColumnSpec("cnt", Columns.LONG, null), + new ColumnSpec("m1", Columns.FLOAT, null), + new ColumnSpec("m2", Columns.DOUBLE, null) + ) + ), + MAPPER + )), + DatasourceTable.EffectiveMetadata.toEffectiveColumns(FOO_TABLE_SIGNATURE), + false + ) + ) + ); + + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Override + public CatalogResolver createCatalogResolver() + { + return new CatalogResolver.NullCatalogResolver() { + @Override + public DruidTable resolveDatasource( + final String tableName, + final DatasourceTable.PhysicalDatasourceMetadata dsMetadata + ) + { + if (resolvedTables.get(tableName) != null) { + return resolvedTables.get(tableName); + } + return dsMetadata == null ? null : new DatasourceTable(dsMetadata); + } + }; + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java new file mode 100644 index 000000000000..af45896011c6 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.external.ExternalDataSource; +import org.apache.druid.sql.calcite.external.Externals; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.junit.jupiter.api.Test; + +/** + * Test for INSERT DML statements for tables defined in catalog. + */ +public class CalciteCatalogInsertTest extends CalciteCatalogIngestionDmlTest +{ + /** + * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the + * value from the catalog. + */ + @Test + public void testInsertHourGrainPartitonedByFromCatalog() + { + testIngestionQuery() + .sql("INSERT INTO hourDs\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.HOUR)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is given in the catalog, and also by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertHourGrainWithDayPartitonedByFromQuery() + { + testIngestionQuery() + .sql("INSERT INTO hourDs\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then + * validation error. + */ + @Test + public void testInsertNoPartitonedByFromCatalog() + { + testIngestionQuery() + .sql("INSERT INTO noPartitonedBy\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectValidationError( + DruidException.class, + "Operation [INSERT] requires a PARTITIONED BY to be explicitly defined, but none was found." + ) + .verify(); + } + + /** + * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertNoPartitonedByWithDayPartitonedByFromQuery() + { + testIngestionQuery() + .sql("INSERT INTO noPartitonedBy\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } + + /** + * Adding a new column during ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .build(); + testIngestionQuery() + .sql("INSERT INTO foo\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG), + expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), + expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("b", "e", "v0", "v1", "v2", "v3") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Adding a new column during ingestion that is not defined in a sealed table should fail with + * proper validation error. + */ + @Test + public void testInsertAddNonDefinedColumnIntoSealedCatalogTable() + { + testIngestionQuery() + .sql("INSERT INTO fooSealed\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectValidationError( + DruidException.class, + "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema" + ) + .verify(); + } + + + /** + * Inserting into a catalog table with a WITH source succeeds + */ + @Test + public void testInsertWithSourceIntoCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .build(); + testIngestionQuery() + .sql("INSERT INTO \"foo\"\n" + + "WITH \"ext\" AS (\n" + + " SELECT *\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3\n" + + "FROM \"ext\"\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG), + expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), + expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("b", "e", "v0", "v1", "v2", "v3") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + @Test + public void testInsertIntoExistingStrictNoDefinedSchema() + { + testIngestionQuery() + .sql("INSERT INTO strictTableWithNoDefinedSchema SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema") + .verify(); + } + + @Test + public void testInsertIntoExistingWithIncompatibleTypeAssignment() + { + testIngestionQuery() + .sql("INSERT INTO foo\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") + .verify(); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java new file mode 100644 index 000000000000..f4c6a908ca7d --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.external.ExternalDataSource; +import org.apache.druid.sql.calcite.external.Externals; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.junit.jupiter.api.Test; + +/** + * Test for REPLACE DML statements for tables defined in catalog. + */ +public class CalciteCatalogReplaceTest extends CalciteCatalogIngestionDmlTest +{ + /** + * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the + * value from the catalog. + */ + @Test + public void testReplaceHourGrainPartitonedByFromCatalog() + { + testIngestionQuery() + .sql("REPLACE INTO hourDs OVERWRITE ALL\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.HOUR)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is given in the catalog, and also by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testReplaceHourGrainWithDayPartitonedByFromQuery() + { + testIngestionQuery() + .sql("REPLACE INTO hourDs OVERWRITE ALL\n" + + "SELECT *FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then + * validation error. + */ + @Test + public void testInsertNoPartitonedByFromCatalog() + { + testIngestionQuery() + .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectValidationError( + DruidException.class, + "Operation [REPLACE] requires a PARTITIONED BY to be explicitly defined, but none was found." + ) + .verify(); + } + + /** + * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertNoPartitonedByWithDayPartitonedByFromQuery() + { + testIngestionQuery() + .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } + + /** + * Adding a new column during ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testReplaceAddNonDefinedColumnIntoNonSealedCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .build(); + testIngestionQuery() + .sql("REPLACE INTO foo OVERWRITE ALL\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG), + expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), + expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. Here we just check that the + // set of columns is correct, but not their order. + .columns("b", "e", "v0", "v1", "v2", "v3") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Adding a new column during ingestion that is not defined in a sealed table should fail with + * proper validation error. + */ + @Test + public void testReplaceAddNonDefinedColumnIntoSealedCatalogTable() + { + testIngestionQuery() + .sql("REPLACE INTO fooSealed OVERWRITE ALL\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectValidationError( + DruidException.class, + "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema" + ) + .verify(); + } + + + /** + * Replacing into a catalog table with a WITH source succeeds + */ + @Test + public void testReplaceWithSourceIntoCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .build(); + testIngestionQuery() + .sql("REPLACE INTO \"foo\" OVERWRITE ALL\n" + + "WITH \"ext\" AS (\n" + + " SELECT *\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3\n" + + "FROM \"ext\"\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG), + expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), + expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. Here we just check that the + // set of columns is correct, but not their order. + .columns("b", "e", "v0", "v1", "v2", "v3") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + @Test + public void testReplaceIntoExistingStrictNoDefinedSchema() + { + testIngestionQuery() + .sql("REPLACE INTO strictTableWithNoDefinedSchema OVERWRITE ALL SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema") + .verify(); + } + + @Test + public void testReplaceIntoExistingWithIncompatibleTypeAssignment() + { + testIngestionQuery() + .sql("REPLACE INTO foo OVERWRITE ALL\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") + .verify(); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index b420c4357374..8b62bd668053 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1644,6 +1644,15 @@ public void testInsertWithInvalidColumnNameInIngest() .verify(); } + @Test + public void testInsertWithInvalidColumnName2InIngest() + { + testIngestionQuery() + .sql("INSERT INTO t SELECT __time, 1+1 FROM foo PARTITIONED BY ALL") + .expectValidationError(invalidSqlContains("Insertion requires columns to be named")) + .verify(); + } + @Test public void testInsertWithUnnamedColumnInNestedSelectStatement() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteStrictInsertTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteStrictInsertTest.java new file mode 100644 index 000000000000..03045ebcaf52 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteStrictInsertTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import org.apache.druid.error.DruidException; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.planner.CatalogResolver.NullCatalogResolver; +import org.junit.jupiter.api.Test; + +/** + * Test for the "strict" feature of the catalog which can restrict INSERT statements + * to only work with existing datasources. The strict option is a config option which + * we enable only for this one test. + */ +public class CalciteStrictInsertTest extends CalciteIngestionDmlTest +{ + @Override + public CatalogResolver createCatalogResolver() + { + return new NullCatalogResolver() { + @Override + public boolean ingestRequiresExistingTable() + { + return true; + } + }; + } + + @Test + public void testInsertIntoNewTable() + { + testIngestionQuery() + .sql("INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME") + .expectValidationError(DruidException.class, "Cannot INSERT into [dst] because it does not exist") + .verify(); + } + + @Test + public void testInsertIntoExisting() + { + testIngestionQuery() + .sql("INSERT INTO druid.numfoo SELECT * FROM foo PARTITIONED BY ALL TIME") + .expectTarget("numfoo", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceRead("foo"), dataSourceWrite("numfoo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } +}