From a5428e75ff92a4aad1ae9a3d4a613b8dccb6857c Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 16 Apr 2024 17:20:35 -0400 Subject: [PATCH] INSERT/REPLACE complex target column types are validated against source input expressions (#16223) * * fix * * fix * * address review comments * * fix * * simplify tests * * fix complex type nullability issue * * address review comments * * address test review comments * * fix checkstyle --- .../druid/catalog/sql/CatalogInsertTest.java | 13 +- .../druid/catalog/sql/CatalogReplaceTest.java | 7 +- .../calcite/planner/DruidSqlValidator.java | 60 +- .../CalciteCatalogIngestionDmlTest.java | 522 +++++++++++++++++- .../sql/calcite/CalciteCatalogInsertTest.java | 294 +--------- .../calcite/CalciteCatalogReplaceTest.java | 290 +--------- 6 files changed, 575 insertions(+), 611 deletions(-) diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java index d4a97e666ed4..49ebf3f11f72 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogInsertTest.java @@ -31,7 +31,6 @@ import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5; import org.apache.druid.sql.calcite.CalciteCatalogInsertTest; import org.apache.druid.sql.calcite.planner.CatalogResolver; -import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.util.SqlTestFramework; import org.junit.jupiter.api.extension.RegisterExtension; @@ -69,8 +68,8 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework) public void buildDatasources() { - resolvedTables.forEach((datasourceName, datasourceTable) -> { - DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata(); + RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> { + DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( columnFacade -> { @@ -92,14 +91,6 @@ public void buildDatasources() createTableMetadata(tableBuilder.build()); }); - DatasourceFacade catalogMetadata = - ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata(); - TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString()); - catalogMetadata.columnFacades().forEach( - columnFacade -> { - tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType()); - } - ); } private void createTableMetadata(TableMetadata table) diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java index 34011fb2205f..31e0a34112cf 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/sql/CatalogReplaceTest.java @@ -31,7 +31,6 @@ import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5; import org.apache.druid.sql.calcite.CalciteCatalogReplaceTest; import org.apache.druid.sql.calcite.planner.CatalogResolver; -import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.util.SqlTestFramework; import org.junit.jupiter.api.extension.RegisterExtension; @@ -68,8 +67,8 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework) public void buildDatasources() { - resolvedTables.forEach((datasourceName, datasourceTable) -> { - DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata(); + RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> { + DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( columnFacade -> { @@ -92,7 +91,7 @@ public void buildDatasources() createTableMetadata(tableBuilder.build()); }); DatasourceFacade catalogMetadata = - ((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata(); + RESOLVED_TABLES.get("foo").effectiveMetadata().catalogMetadata(); TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString()); catalogMetadata.columnFacades().forEach( columnFacade -> { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 0d3045cd7fdc..03dbbd4b7a7d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -41,7 +41,6 @@ import org.apache.calcite.sql.SqlWindow; import org.apache.calcite.sql.SqlWith; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.validate.IdentifierNamespace; @@ -54,19 +53,21 @@ import org.apache.calcite.util.Static; import org.apache.calcite.util.Util; import org.apache.druid.catalog.model.facade.DatasourceFacade; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier; import org.apache.druid.sql.calcite.run.EngineFeature; import org.apache.druid.sql.calcite.table.DatasourceTable; +import org.apache.druid.sql.calcite.table.RowSignatures; import org.checkerframework.checker.nullness.qual.Nullable; import java.util.ArrayList; @@ -474,19 +475,11 @@ private RelDataType validateTargetType( fields.add(Pair.of(colName, sourceField.getType())); continue; } - SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType()); - RelDataType relType = typeFactory.createSqlType(sqlTypeName); - if (NullHandling.replaceWithDefault() && !SqlTypeFamily.STRING.contains(relType)) { - fields.add(Pair.of( - colName, - relType - )); - } else { - fields.add(Pair.of( - colName, - typeFactory.createTypeWithNullability(relType, true) - )); - } + RelDataType relType = computeTypeForDefinedCol(definedCol, sourceField); + fields.add(Pair.of( + colName, + typeFactory.createTypeWithNullability(relType, sourceField.getType().isNullable()) + )); } // Perform the SQL-standard check: that the SELECT column can be @@ -516,8 +509,14 @@ protected void checkTypeAssignment( RelDataType targetFieldRelDataType = targetFields.get(i).getType(); ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType); ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType); - - if (targetFieldColumnType != ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)) { + try { + if (!Objects.equals( + targetFieldColumnType, + ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType))) { + throw new Types.IncompatibleTypeException(targetFieldColumnType, sourceFieldColumnType); + } + } + catch (Types.IncompatibleTypeException e) { SqlNode node = getNthExpr(query, i, sourceCount); String targetTypeString; String sourceTypeString; @@ -534,12 +533,39 @@ protected void checkTypeAssignment( Static.RESOURCE.typeNotAssignable( targetFields.get(i).getName(), targetTypeString, sourceFields.get(i).getName(), sourceTypeString)); + } } // the call to base class definition will insert implicit casts / coercions where needed. super.checkTypeAssignment(sourceScope, table, sourceRowType, targetRowType, query); } + protected RelDataType computeTypeForDefinedCol( + final DatasourceFacade.ColumnFacade definedCol, + final RelDataTypeField sourceField + ) + { + SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType()); + RelDataType relType; + if (sqlTypeName != null) { + relType = typeFactory.createSqlType(sqlTypeName); + } else { + ColumnType columnType = ColumnType.fromString(definedCol.sqlStorageType()); + if (columnType != null && columnType.getType().equals(ValueType.COMPLEX)) { + relType = RowSignatures.makeComplexType(typeFactory, columnType, sourceField.getType().isNullable()); + } else { + relType = RowSignatures.columnTypeToRelDataType( + typeFactory, + columnType, + // this nullability is ignored for complex types for some reason, hence the check for complex above. + sourceField.getType().isNullable() + ); + } + } + + return relType; + } + /** * Locates the n'th expression in an INSERT or UPDATE query. * diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java index 2d3fb5d7f114..4715096e6bb6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java @@ -29,16 +29,44 @@ import org.apache.druid.catalog.model.TableSpec; import org.apache.druid.catalog.model.facade.DatasourceFacade; import org.apache.druid.catalog.model.table.DatasourceDefn; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.error.DruidException; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.external.ExternalDataSource; +import org.apache.druid.sql.calcite.external.Externals; +import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.table.DruidTable; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.junit.jupiter.api.Test; -public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest +public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest { - public ImmutableMap resolvedTables = ImmutableMap.of( + private final String operationName; + private final String dmlPrefixPattern; + + public CalciteCatalogIngestionDmlTest() + { + this.operationName = getOperationName(); + this.dmlPrefixPattern = getDmlPrefixPattern(); + } + + public abstract String getOperationName(); + public abstract String getDmlPrefixPattern(); + + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + public static ImmutableMap RESOLVED_TABLES = ImmutableMap.of( "hourDs", new DatasourceTable( RowSignature.builder().addTimeColumn().build(), new DatasourceTable.PhysicalDatasourceMetadata( @@ -152,7 +180,8 @@ public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest new ColumnSpec("dim3", Columns.STRING, null), new ColumnSpec("cnt", Columns.LONG, null), new ColumnSpec("m1", Columns.FLOAT, null), - new ColumnSpec("m2", Columns.DOUBLE, null) + new ColumnSpec("m2", Columns.DOUBLE, null), + new ColumnSpec("unique_dim1", HyperUniquesAggregatorFactory.TYPE.asTypeString(), null) ) ), MAPPER @@ -198,8 +227,6 @@ public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest ) ); - private static final ObjectMapper MAPPER = new DefaultObjectMapper(); - @Override public CatalogResolver createCatalogResolver() { @@ -210,11 +237,492 @@ public DruidTable resolveDatasource( final DatasourceTable.PhysicalDatasourceMetadata dsMetadata ) { - if (resolvedTables.get(tableName) != null) { - return resolvedTables.get(tableName); + if (RESOLVED_TABLES.get(tableName) != null) { + return RESOLVED_TABLES.get(tableName); } return dsMetadata == null ? null : new DatasourceTable(dsMetadata); } }; } + + /** + * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the + * value from the catalog. + */ + @Test + public void testInsertHourGrainPartitonedByFromCatalog() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.HOUR)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is given in the catalog, and also by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertHourGrainWithDayPartitonedByFromQuery() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "hourDs") + "\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("hourDs", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } + + /** + * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then + * validation error. + */ + @Test + public void testInsertNoPartitonedByFromCatalog() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" + + "SELECT * FROM foo") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectValidationError( + DruidException.class, + StringUtils.format("Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", operationName) + ) + .verify(); + } + + /** + * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then + * the query value is used. + */ + @Test + public void testInsertNoPartitonedByWithDayPartitonedByFromQuery() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "noPartitonedBy") + "\n" + + "SELECT * FROM foo\n" + + "PARTITIONED BY day") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE) + .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo")) + .expectQuery( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Filtration.eternity())) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .context(queryContextWithGranularity(Granularities.DAY)) + .build() + ) + .verify(); + } + + /** + * Adding a new column during ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .build(); + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG), + expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), + expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("b", "e", "v0", "v1", "v2", "v3") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testGroupByInsertAddNonDefinedColumnIntoNonSealedCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("extra4_complex", ColumnType.LONG) + .build(); + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3,\n" + + " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "GROUP BY 1,2,3,4,5,6\n" + + "PARTITIONED BY ALL TIME" + ) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + GroupByQuery.builder() + .setDataSource(externalDataSource) + .setGranularity(Granularities.ALL) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setVirtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) + ) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("b", "d1", ColumnType.STRING), + new DefaultDimensionSpec("c", "d3", ColumnType.LONG), + new DefaultDimensionSpec("d", "d4", ColumnType.LONG), + new DefaultDimensionSpec("e", "d5", ColumnType.STRING) + ) + ) + .setAggregatorSpecs( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec( + "c", + "c", + ColumnType.LONG + ) + ), + false, + true + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "1", ColumnType.LONG), + expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Adding a new column during ingestion that is not defined in a sealed table should fail with + * proper validation error. + */ + @Test + public void testInsertAddNonDefinedColumnIntoSealedCatalogTable() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "fooSealed") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectValidationError( + DruidException.class, + "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema" + ) + .verify(); + } + + + /** + * Inserting into a catalog table with a WITH source succeeds + */ + @Test + public void testInsertWithSourceIntoCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .build(); + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "WITH \"ext\" AS (\n" + + " SELECT *\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3\n" + + "FROM \"ext\"\n" + + "PARTITIONED BY ALL TIME") + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + newScanQueryBuilder() + .dataSource(externalDataSource) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), + expressionVirtualColumn("v1", "1", ColumnType.LONG), + expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), + expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .columns("b", "e", "v0", "v1", "v2", "v3") + .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + /** + * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. + */ + @Test + public void testGroupByInsertWithSourceIntoCatalogTable() + { + ExternalDataSource externalDataSource = new ExternalDataSource( + new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), + new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), + RowSignature.builder() + .add("a", ColumnType.STRING) + .add("b", ColumnType.STRING) + .add("c", ColumnType.LONG) + .add("d", ColumnType.STRING) + .add("e", ColumnType.STRING) + .build() + ); + final RowSignature signature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .add("m2", ColumnType.DOUBLE) + .add("extra2", ColumnType.LONG) + .add("extra3", ColumnType.STRING) + .add("extra4_complex", ColumnType.LONG) + .build(); + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "WITH \"ext\" AS (\n" + + " SELECT *\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + ")\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " 1 AS cnt,\n" + + " c AS m2,\n" + + " CAST(d AS BIGINT) AS extra2,\n" + + " e AS extra3,\n" + + " APPROX_COUNT_DISTINCT_BUILTIN(c) as extra4_complex\n" + + "FROM \"ext\"\n" + + "GROUP BY 1,2,3,4,5,6\n" + + "PARTITIONED BY ALL TIME" + ) + .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) + .expectTarget("foo", signature) + .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) + .expectQuery( + GroupByQuery.builder() + .setDataSource(externalDataSource) + .setGranularity(Granularities.ALL) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setVirtualColumns( + expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG) + ) + .setDimensions( + dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("b", "d1", ColumnType.STRING), + new DefaultDimensionSpec("c", "d3", ColumnType.LONG), + new DefaultDimensionSpec("d", "d4", ColumnType.LONG), + new DefaultDimensionSpec("e", "d5", ColumnType.STRING) + ) + ) + .setAggregatorSpecs( + new CardinalityAggregatorFactory( + "a0", + null, + ImmutableList.of( + new DefaultDimensionSpec( + "c", + "c", + ColumnType.LONG + ) + ), + false, + true + ) + ) + .setPostAggregatorSpecs( + expressionPostAgg("p0", "1", ColumnType.LONG), + expressionPostAgg("p1", "CAST(\"d3\", 'DOUBLE')", ColumnType.DOUBLE) + ) + // Scan query lists columns in alphabetical order independent of the + // SQL project list or the defined schema. + .setContext(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) + .build() + ) + .verify(); + } + + @Test + public void testInsertIntoExistingStrictNoDefinedSchema() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "strictTableWithNoDefinedSchema") + " SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema") + .verify(); + } + + @Test + public void testInsertIntoExistingWithIncompatibleTypeAssignment() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") + .verify(); + } + + @Test + public void testGroupByInsertIntoExistingWithIncompatibleTypeAssignment() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "foo") + "\n" + + "SELECT\n" + + " __time AS __time,\n" + + " ARRAY[dim1] AS unique_dim1\n" + + "FROM foo\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Cannot assign to target field 'unique_dim1' of type COMPLEX from source field 'unique_dim1' of type VARCHAR ARRAY (line [4], column [3])") + .verify(); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java index af45896011c6..fff5ca9bc58d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogInsertTest.java @@ -19,302 +19,20 @@ package org.apache.druid.sql.calcite; -import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.impl.CsvInputFormat; -import org.apache.druid.data.input.impl.InlineInputSource; -import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.external.ExternalDataSource; -import org.apache.druid.sql.calcite.external.Externals; -import org.apache.druid.sql.calcite.filtration.Filtration; -import org.apache.druid.sql.calcite.util.CalciteTests; -import org.junit.jupiter.api.Test; - /** * Test for INSERT DML statements for tables defined in catalog. */ public class CalciteCatalogInsertTest extends CalciteCatalogIngestionDmlTest { - /** - * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the - * value from the catalog. - */ - @Test - public void testInsertHourGrainPartitonedByFromCatalog() - { - testIngestionQuery() - .sql("INSERT INTO hourDs\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.HOUR)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is given in the catalog, and also by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testInsertHourGrainWithDayPartitonedByFromQuery() - { - testIngestionQuery() - .sql("INSERT INTO hourDs\n" + - "SELECT * FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then - * validation error. - */ - @Test - public void testInsertNoPartitonedByFromCatalog() - { - testIngestionQuery() - .sql("INSERT INTO noPartitonedBy\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectValidationError( - DruidException.class, - "Operation [INSERT] requires a PARTITIONED BY to be explicitly defined, but none was found." - ) - .verify(); - } - - /** - * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testInsertNoPartitonedByWithDayPartitonedByFromQuery() - { - testIngestionQuery() - .sql("INSERT INTO noPartitonedBy\n" + - "SELECT * FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } - - /** - * Adding a new column during ingestion that is not defined in a non-sealed table should succeed. - */ - @Test - public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .build(); - testIngestionQuery() - .sql("INSERT INTO foo\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - newScanQueryBuilder() - .dataSource(externalDataSource) - .intervals(querySegmentSpec(Filtration.eternity())) - .virtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), - expressionVirtualColumn("v1", "1", ColumnType.LONG), - expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), - expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("b", "e", "v0", "v1", "v2", "v3") - .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - /** - * Adding a new column during ingestion that is not defined in a sealed table should fail with - * proper validation error. - */ - @Test - public void testInsertAddNonDefinedColumnIntoSealedCatalogTable() - { - testIngestionQuery() - .sql("INSERT INTO fooSealed\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectValidationError( - DruidException.class, - "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema" - ) - .verify(); - } - - - /** - * Inserting into a catalog table with a WITH source succeeds - */ - @Test - public void testInsertWithSourceIntoCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .build(); - testIngestionQuery() - .sql("INSERT INTO \"foo\"\n" + - "WITH \"ext\" AS (\n" + - " SELECT *\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - ")\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3\n" + - "FROM \"ext\"\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - newScanQueryBuilder() - .dataSource(externalDataSource) - .intervals(querySegmentSpec(Filtration.eternity())) - .virtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), - expressionVirtualColumn("v1", "1", ColumnType.LONG), - expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), - expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. - .columns("b", "e", "v0", "v1", "v2", "v3") - .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - @Test - public void testInsertIntoExistingStrictNoDefinedSchema() + @Override + public String getOperationName() { - testIngestionQuery() - .sql("INSERT INTO strictTableWithNoDefinedSchema SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema") - .verify(); + return "INSERT"; } - @Test - public void testInsertIntoExistingWithIncompatibleTypeAssignment() + @Override + public String getDmlPrefixPattern() { - testIngestionQuery() - .sql("INSERT INTO foo\n" - + "SELECT\n" - + " __time AS __time,\n" - + " ARRAY[dim1] AS dim1\n" - + "FROM foo\n" - + "PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") - .verify(); + return "INSERT INTO \"%s\""; } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java index f4c6a908ca7d..aad22693c93d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogReplaceTest.java @@ -19,298 +19,20 @@ package org.apache.druid.sql.calcite; -import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.impl.CsvInputFormat; -import org.apache.druid.data.input.impl.InlineInputSource; -import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.external.ExternalDataSource; -import org.apache.druid.sql.calcite.external.Externals; -import org.apache.druid.sql.calcite.filtration.Filtration; -import org.apache.druid.sql.calcite.util.CalciteTests; -import org.junit.jupiter.api.Test; - /** * Test for REPLACE DML statements for tables defined in catalog. */ public class CalciteCatalogReplaceTest extends CalciteCatalogIngestionDmlTest { - /** - * If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the - * value from the catalog. - */ - @Test - public void testReplaceHourGrainPartitonedByFromCatalog() - { - testIngestionQuery() - .sql("REPLACE INTO hourDs OVERWRITE ALL\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.HOUR)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is given in the catalog, and also by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testReplaceHourGrainWithDayPartitonedByFromQuery() - { - testIngestionQuery() - .sql("REPLACE INTO hourDs OVERWRITE ALL\n" + - "SELECT *FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("hourDs", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } - - /** - * If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then - * validation error. - */ - @Test - public void testInsertNoPartitonedByFromCatalog() - { - testIngestionQuery() - .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" + - "SELECT * FROM foo") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectValidationError( - DruidException.class, - "Operation [REPLACE] requires a PARTITIONED BY to be explicitly defined, but none was found." - ) - .verify(); - } - - /** - * If the segment grain is absent in the catalog, but given by PARTITIONED BY, then - * the query value is used. - */ - @Test - public void testInsertNoPartitonedByWithDayPartitonedByFromQuery() - { - testIngestionQuery() - .sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" + - "SELECT * FROM foo\n" + - "PARTITIONED BY day") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE) - .expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo")) - .expectQuery( - newScanQueryBuilder() - .dataSource("foo") - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .context(queryContextWithGranularity(Granularities.DAY)) - .build() - ) - .verify(); - } - - /** - * Adding a new column during ingestion that is not defined in a non-sealed table should succeed. - */ - @Test - public void testReplaceAddNonDefinedColumnIntoNonSealedCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .build(); - testIngestionQuery() - .sql("REPLACE INTO foo OVERWRITE ALL\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - newScanQueryBuilder() - .dataSource(externalDataSource) - .intervals(querySegmentSpec(Filtration.eternity())) - .virtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), - expressionVirtualColumn("v1", "1", ColumnType.LONG), - expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), - expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. Here we just check that the - // set of columns is correct, but not their order. - .columns("b", "e", "v0", "v1", "v2", "v3") - .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - /** - * Adding a new column during ingestion that is not defined in a sealed table should fail with - * proper validation error. - */ - @Test - public void testReplaceAddNonDefinedColumnIntoSealedCatalogTable() - { - testIngestionQuery() - .sql("REPLACE INTO fooSealed OVERWRITE ALL\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectValidationError( - DruidException.class, - "Column [extra2] is not defined in the target table [druid.fooSealed] strict schema" - ) - .verify(); - } - - - /** - * Replacing into a catalog table with a WITH source succeeds - */ - @Test - public void testReplaceWithSourceIntoCatalogTable() - { - ExternalDataSource externalDataSource = new ExternalDataSource( - new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"), - new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0), - RowSignature.builder() - .add("a", ColumnType.STRING) - .add("b", ColumnType.STRING) - .add("c", ColumnType.LONG) - .add("d", ColumnType.STRING) - .add("e", ColumnType.STRING) - .build() - ); - final RowSignature signature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG) - .add("m2", ColumnType.DOUBLE) - .add("extra2", ColumnType.LONG) - .add("extra3", ColumnType.STRING) - .build(); - testIngestionQuery() - .sql("REPLACE INTO \"foo\" OVERWRITE ALL\n" + - "WITH \"ext\" AS (\n" + - " SELECT *\n" + - "FROM TABLE(inline(\n" + - " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + - " format => 'csv'))\n" + - " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - ")\n" + - "SELECT\n" + - " TIME_PARSE(a) AS __time,\n" + - " b AS dim1,\n" + - " 1 AS cnt,\n" + - " c AS m2,\n" + - " CAST(d AS BIGINT) AS extra2,\n" + - " e AS extra3\n" + - "FROM \"ext\"\n" + - "PARTITIONED BY ALL TIME") - .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) - .expectTarget("foo", signature) - .expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL")) - .expectQuery( - newScanQueryBuilder() - .dataSource(externalDataSource) - .intervals(querySegmentSpec(Filtration.eternity())) - .virtualColumns( - expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG), - expressionVirtualColumn("v1", "1", ColumnType.LONG), - expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE), - expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG) - ) - // Scan query lists columns in alphabetical order independent of the - // SQL project list or the defined schema. Here we just check that the - // set of columns is correct, but not their order. - .columns("b", "e", "v0", "v1", "v2", "v3") - .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) - .build() - ) - .verify(); - } - - @Test - public void testReplaceIntoExistingStrictNoDefinedSchema() + @Override + public String getOperationName() { - testIngestionQuery() - .sql("REPLACE INTO strictTableWithNoDefinedSchema OVERWRITE ALL SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema") - .verify(); + return "REPLACE"; } - @Test - public void testReplaceIntoExistingWithIncompatibleTypeAssignment() + @Override + public String getDmlPrefixPattern() { - testIngestionQuery() - .sql("REPLACE INTO foo OVERWRITE ALL\n" - + "SELECT\n" - + " __time AS __time,\n" - + " ARRAY[dim1] AS dim1\n" - + "FROM foo\n" - + "PARTITIONED BY ALL TIME") - .expectValidationError( - DruidException.class, - "Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])") - .verify(); + return "REPLACE INTO \"%s\" OVERWRITE ALL"; } }