diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index dc8957cc1dc9..07f658f99e14 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -215,4 +215,16 @@ public int hashCode() useJsonNodeReader ); } + + @Override + public String toString() + { + return "JsonInputFormat{" + + "featureSpec=" + featureSpec + + ", keepNullColumns=" + keepNullColumns + + ", lineSplittable=" + lineSplittable + + ", assumeNewlineDelimited=" + assumeNewlineDelimited + + ", useJsonNodeReader=" + useJsonNodeReader + + '}'; + } } diff --git a/server/src/main/java/org/apache/druid/catalog/model/Columns.java b/server/src/main/java/org/apache/druid/catalog/model/Columns.java index 0bc0f71bc1b9..e19f0052ed5e 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/Columns.java +++ b/server/src/main/java/org/apache/druid/catalog/model/Columns.java @@ -38,6 +38,10 @@ public class Columns public static final String BIGINT = "BIGINT"; public static final String FLOAT = "FLOAT"; public static final String DOUBLE = "DOUBLE"; + public static final String VARCHAR_ARRAY = "VARCHAR ARRAY"; + public static final String BIGINT_ARRAY = "BIGINT ARRAY"; + public static final String FLOAT_ARRAY = "FLOAT ARRAY"; + public static final String DOUBLE_ARRAY = "DOUBLE ARRAY"; public static final String TIMESTAMP = "TIMESTAMP"; public static final Set NUMERIC_TYPES = @@ -52,6 +56,10 @@ public class Columns .put(FLOAT, ColumnType.FLOAT) .put(DOUBLE, ColumnType.DOUBLE) .put(VARCHAR, ColumnType.STRING) + .put(VARCHAR_ARRAY, ColumnType.STRING_ARRAY) + .put(BIGINT_ARRAY, ColumnType.LONG_ARRAY) + .put(FLOAT_ARRAY, ColumnType.FLOAT_ARRAY) + .put(DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY) .build(); public static final Map DRUID_TO_SQL_TYPES = @@ -60,6 +68,10 @@ public class Columns .put(ColumnType.FLOAT, FLOAT) .put(ColumnType.DOUBLE, DOUBLE) .put(ColumnType.STRING, VARCHAR) + .put(ColumnType.STRING_ARRAY, VARCHAR_ARRAY) + .put(ColumnType.LONG_ARRAY, BIGINT_ARRAY) + .put(ColumnType.FLOAT_ARRAY, FLOAT_ARRAY) + .put(ColumnType.DOUBLE_ARRAY, DOUBLE_ARRAY) .build(); private Columns() diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java index c2162491e5b0..14d5ca63e934 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExtendOperator.java @@ -70,14 +70,14 @@ public class ExtendOperator extends SqlInternalOperator @Override public SqlNode rewriteCall(SqlValidator validator, SqlCall call) { - SqlBasicCall tableOpCall = (SqlBasicCall) call.operand(0); + SqlBasicCall tableOpCall = call.operand(0); if (!(tableOpCall.getOperator() instanceof SqlCollectionTableOperator)) { throw new ISE("First argument to EXTEND must be TABLE"); } // The table function must be a Druid-defined table macro function // which is aware of the EXTEND schema. - SqlBasicCall tableFnCall = (SqlBasicCall) tableOpCall.operand(0); + SqlBasicCall tableFnCall = tableOpCall.operand(0); if (!(tableFnCall.getOperator() instanceof SchemaAwareUserDefinedTableMacro)) { // May be an unresolved function. throw new IAE( @@ -89,7 +89,7 @@ public SqlNode rewriteCall(SqlValidator validator, SqlCall call) // Move the schema from the second operand of EXTEND into a member // function of a shim table macro. SchemaAwareUserDefinedTableMacro tableFn = (SchemaAwareUserDefinedTableMacro) tableFnCall.getOperator(); - SqlNodeList schema = (SqlNodeList) call.operand(1); + SqlNodeList schema = call.operand(1); SqlCall newCall = tableFn.rewriteCall(tableFnCall, schema); // Create a new TABLE(table_fn) node to replace the EXTEND node. After this, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java index 0063253efcd0..598bdf973754 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java @@ -292,6 +292,8 @@ private static String convertType(String name, SqlDataTypeSpec dataType) case FLOAT: case REAL: return SqlType.FLOAT.name(); + case ARRAY: + return convertType(name, dataType.getComponentTypeSpec()) + " " + SqlType.ARRAY.name(); default: throw unsupportedType(name, dataType); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java index 87519c753745..528a0a4a0b56 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/RowSignatures.java @@ -151,6 +151,9 @@ public static RelDataType toRelDataType( case DOUBLE: type = Calcites.createSqlArrayTypeWithNullability(typeFactory, SqlTypeName.DOUBLE, nullNumeric); break; + case FLOAT: + type = Calcites.createSqlArrayTypeWithNullability(typeFactory, SqlTypeName.FLOAT, nullNumeric); + break; default: throw new ISE("valueType[%s] not translatable", columnType); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java index 0b2131197d36..558625a10b36 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.HttpInputSource; import org.apache.druid.data.input.impl.HttpInputSourceConfig; +import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.ISE; @@ -398,11 +399,15 @@ public void testHttpJson() SystemFields.none(), new HttpInputSourceConfig(null) ), - new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0), + new JsonInputFormat(null, null, null, null, null), RowSignature.builder() .add("x", ColumnType.STRING) .add("y", ColumnType.STRING) .add("z", ColumnType.NESTED_DATA) + .add("a", ColumnType.STRING_ARRAY) + .add("b", ColumnType.LONG_ARRAY) + .add("c", ColumnType.FLOAT_ARRAY) + .add("d", ColumnType.DOUBLE_ARRAY) .build() ); testIngestionQuery() @@ -410,8 +415,8 @@ public void testHttpJson() "FROM TABLE(http(userName => 'bob',\n" + " password => 'secret',\n" + " uris => ARRAY['http://foo.com/bar.json'],\n" + - " format => 'csv'))\n" + - " EXTEND (x VARCHAR, y VARCHAR, z TYPE('COMPLEX'))\n" + + " format => 'json'))\n" + + " EXTEND (x VARCHAR, y VARCHAR, z TYPE('COMPLEX'), a VARCHAR ARRAY, b BIGINT ARRAY, c FLOAT ARRAY, d DOUBLE ARRAY)\n" + "PARTITIONED BY ALL TIME") .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) .expectTarget("dst", httpDataSource.getSignature()) @@ -420,7 +425,7 @@ public void testHttpJson() newScanQueryBuilder() .dataSource(httpDataSource) .intervals(querySegmentSpec(Filtration.eternity())) - .columns("x", "y", "z") + .columns("a", "b", "c", "d", "x", "y", "z") .context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT) .build() )