Skip to content

Commit

Permalink
add array column type support to EXTEND operator (apache#15458)
Browse files Browse the repository at this point in the history
  • Loading branch information
clintropolis authored Dec 7, 2023
1 parent fea53c7 commit 557f3f6
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,16 @@ public int hashCode()
useJsonNodeReader
);
}

@Override
public String toString()
{
return "JsonInputFormat{" +
"featureSpec=" + featureSpec +
", keepNullColumns=" + keepNullColumns +
", lineSplittable=" + lineSplittable +
", assumeNewlineDelimited=" + assumeNewlineDelimited +
", useJsonNodeReader=" + useJsonNodeReader +
'}';
}
}
12 changes: 12 additions & 0 deletions server/src/main/java/org/apache/druid/catalog/model/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public class Columns
public static final String BIGINT = "BIGINT";
public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String VARCHAR_ARRAY = "VARCHAR ARRAY";
public static final String BIGINT_ARRAY = "BIGINT ARRAY";
public static final String FLOAT_ARRAY = "FLOAT ARRAY";
public static final String DOUBLE_ARRAY = "DOUBLE ARRAY";
public static final String TIMESTAMP = "TIMESTAMP";

public static final Set<String> NUMERIC_TYPES =
Expand All @@ -52,6 +56,10 @@ public class Columns
.put(FLOAT, ColumnType.FLOAT)
.put(DOUBLE, ColumnType.DOUBLE)
.put(VARCHAR, ColumnType.STRING)
.put(VARCHAR_ARRAY, ColumnType.STRING_ARRAY)
.put(BIGINT_ARRAY, ColumnType.LONG_ARRAY)
.put(FLOAT_ARRAY, ColumnType.FLOAT_ARRAY)
.put(DOUBLE_ARRAY, ColumnType.DOUBLE_ARRAY)
.build();

public static final Map<ColumnType, String> DRUID_TO_SQL_TYPES =
Expand All @@ -60,6 +68,10 @@ public class Columns
.put(ColumnType.FLOAT, FLOAT)
.put(ColumnType.DOUBLE, DOUBLE)
.put(ColumnType.STRING, VARCHAR)
.put(ColumnType.STRING_ARRAY, VARCHAR_ARRAY)
.put(ColumnType.LONG_ARRAY, BIGINT_ARRAY)
.put(ColumnType.FLOAT_ARRAY, FLOAT_ARRAY)
.put(ColumnType.DOUBLE_ARRAY, DOUBLE_ARRAY)
.build();

private Columns()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public class ExtendOperator extends SqlInternalOperator
@Override
public SqlNode rewriteCall(SqlValidator validator, SqlCall call)
{
SqlBasicCall tableOpCall = (SqlBasicCall) call.operand(0);
SqlBasicCall tableOpCall = call.operand(0);
if (!(tableOpCall.getOperator() instanceof SqlCollectionTableOperator)) {
throw new ISE("First argument to EXTEND must be TABLE");
}

// The table function must be a Druid-defined table macro function
// which is aware of the EXTEND schema.
SqlBasicCall tableFnCall = (SqlBasicCall) tableOpCall.operand(0);
SqlBasicCall tableFnCall = tableOpCall.operand(0);
if (!(tableFnCall.getOperator() instanceof SchemaAwareUserDefinedTableMacro)) {
// May be an unresolved function.
throw new IAE(
Expand All @@ -89,7 +89,7 @@ public SqlNode rewriteCall(SqlValidator validator, SqlCall call)
// Move the schema from the second operand of EXTEND into a member
// function of a shim table macro.
SchemaAwareUserDefinedTableMacro tableFn = (SchemaAwareUserDefinedTableMacro) tableFnCall.getOperator();
SqlNodeList schema = (SqlNodeList) call.operand(1);
SqlNodeList schema = call.operand(1);
SqlCall newCall = tableFn.rewriteCall(tableFnCall, schema);

// Create a new TABLE(table_fn) node to replace the EXTEND node. After this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ private static String convertType(String name, SqlDataTypeSpec dataType)
case FLOAT:
case REAL:
return SqlType.FLOAT.name();
case ARRAY:
return convertType(name, dataType.getComponentTypeSpec()) + " " + SqlType.ARRAY.name();
default:
throw unsupportedType(name, dataType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ public static RelDataType toRelDataType(
case DOUBLE:
type = Calcites.createSqlArrayTypeWithNullability(typeFactory, SqlTypeName.DOUBLE, nullNumeric);
break;
case FLOAT:
type = Calcites.createSqlArrayTypeWithNullability(typeFactory, SqlTypeName.FLOAT, nullNumeric);
break;
default:
throw new ISE("valueType[%s] not translatable", columnType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.HttpInputSource;
import org.apache.druid.data.input.impl.HttpInputSourceConfig;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -398,20 +399,24 @@ public void testHttpJson()
SystemFields.none(),
new HttpInputSourceConfig(null)
),
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0),
new JsonInputFormat(null, null, null, null, null),
RowSignature.builder()
.add("x", ColumnType.STRING)
.add("y", ColumnType.STRING)
.add("z", ColumnType.NESTED_DATA)
.add("a", ColumnType.STRING_ARRAY)
.add("b", ColumnType.LONG_ARRAY)
.add("c", ColumnType.FLOAT_ARRAY)
.add("d", ColumnType.DOUBLE_ARRAY)
.build()
);
testIngestionQuery()
.sql("INSERT INTO dst SELECT *\n" +
"FROM TABLE(http(userName => 'bob',\n" +
" password => 'secret',\n" +
" uris => ARRAY['http://foo.com/bar.json'],\n" +
" format => 'csv'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z TYPE('COMPLEX<json>'))\n" +
" format => 'json'))\n" +
" EXTEND (x VARCHAR, y VARCHAR, z TYPE('COMPLEX<json>'), a VARCHAR ARRAY, b BIGINT ARRAY, c FLOAT ARRAY, d DOUBLE ARRAY)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", httpDataSource.getSignature())
Expand All @@ -420,7 +425,7 @@ public void testHttpJson()
newScanQueryBuilder()
.dataSource(httpDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.columns("a", "b", "c", "d", "x", "y", "z")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
Expand Down

0 comments on commit 557f3f6

Please sign in to comment.