From 57412657e0f3821921b206fecca0ce9ee3bb9cf1 Mon Sep 17 00:00:00 2001 From: Vladyslav Lyutenko Date: Mon, 16 Sep 2024 11:39:35 +0200 Subject: [PATCH] Add projection push down for STRUCT field in big query connector --- .../BigQueryArrowToPageConverter.java | 20 +- .../trino/plugin/bigquery/BigQueryClient.java | 14 +- .../plugin/bigquery/BigQueryColumnHandle.java | 13 ++ .../trino/plugin/bigquery/BigQueryConfig.java | 14 ++ .../plugin/bigquery/BigQueryMetadata.java | 167 ++++++++++++++-- .../bigquery/BigQueryPageSourceProvider.java | 12 +- .../plugin/bigquery/BigQueryPseudoColumn.java | 1 + .../bigquery/BigQueryQueryPageSource.java | 16 +- .../bigquery/BigQuerySessionProperties.java | 11 ++ .../plugin/bigquery/BigQuerySplitManager.java | 8 +- .../BigQueryStorageAvroPageSource.java | 19 +- .../plugin/bigquery/BigQueryTypeManager.java | 3 +- .../plugin/bigquery/ReadSessionCreator.java | 2 +- .../bigquery/BaseBigQueryConnectorTest.java | 1 - .../TestBigQueryAvroConnectorTest.java | 186 ++++++++++++++++++ .../plugin/bigquery/TestBigQueryConfig.java | 3 + .../plugin/bigquery/TestBigQueryMetadata.java | 93 +++++++++ .../bigquery/TestBigQuerySplitManager.java | 2 +- 18 files changed, 551 insertions(+), 34 deletions(-) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java index 9b6550d260ef..24697d91c9c6 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java @@ -103,16 +103,32 @@ public void convert(PageBuilder pageBuilder, ArrowRecordBatch batch) for (int column = 0; column < columns.size(); column++) { BigQueryColumnHandle columnHandle = columns.get(column); + FieldVector fieldVector = getFieldVector(root, columnHandle); convertType(pageBuilder.getBlockBuilder(column), columnHandle.trinoType(), - root.getVector(toBigQueryColumnName(columnHandle.name())), + fieldVector, 0, - root.getVector(toBigQueryColumnName(columnHandle.name())).getValueCount()); + fieldVector.getValueCount()); } root.clear(); } + private static FieldVector getFieldVector(VectorSchemaRoot root, BigQueryColumnHandle columnHandle) + { + FieldVector fieldVector = root.getVector(toBigQueryColumnName(columnHandle.name())); + + for (String dereferenceName : columnHandle.dereferenceNames()) { + for (FieldVector child : fieldVector.getChildrenFromFields()) { + if (child.getField().getName().equals(dereferenceName)) { + fieldVector = child; + break; + } + } + } + return fieldVector; + } + private void convertType(BlockBuilder output, Type type, FieldVector vector, int offset, int length) { Class javaType = type.getJavaType(); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index 36e98f20d476..3b6203e30188 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -33,6 +33,7 @@ import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TableResult; import com.google.cloud.http.BaseHttpServiceException; +import com.google.common.base.Joiner; import com.google.common.cache.Cache; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -468,8 +469,17 @@ public TableId getDestinationTable(String sql) public static String selectSql(TableId table, List requiredColumns, Optional filter) { - String columns = requiredColumns.stream().map(column -> format("`%s`", column.name())).collect(joining(",")); - return selectSql(table, columns, filter); + return selectSql(table, + requiredColumns.stream() + .map(column -> Joiner.on('.') + .join(ImmutableList.builder() + .add(format("`%s`", column.name())) + .addAll(column.dereferenceNames().stream() + .map(dereferenceName -> format("`%s`", dereferenceName)) + .collect(toImmutableList())) + .build())) + .collect(joining(",")), + filter); } public static String selectSql(TableId table, String formattedColumns, Optional filter) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java index e529e0581379..699aaf6b40e9 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; @@ -30,6 +31,7 @@ public record BigQueryColumnHandle( String name, + List dereferenceNames, Type trinoType, StandardSQLTypeName bigqueryType, boolean isPushdownSupported, @@ -44,6 +46,7 @@ public record BigQueryColumnHandle( public BigQueryColumnHandle { requireNonNull(name, "name is null"); + dereferenceNames = ImmutableList.copyOf(requireNonNull(dereferenceNames, "dereferenceNames is null")); requireNonNull(trinoType, "trinoType is null"); requireNonNull(bigqueryType, "bigqueryType is null"); requireNonNull(mode, "mode is null"); @@ -62,6 +65,16 @@ public ColumnMetadata getColumnMetadata() .build(); } + @JsonIgnore + public String getQualifiedName() + { + return Joiner.on('.') + .join(ImmutableList.builder() + .add(name) + .addAll(dereferenceNames) + .build()); + } + @JsonIgnore public long getRetainedSizeInBytes() { diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java index f97bada6ec4a..2e7d34c3ad2e 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java @@ -63,6 +63,7 @@ public class BigQueryConfig private String queryLabelName; private String queryLabelFormat; private boolean proxyEnabled; + private boolean projectionPushDownEnabled = true; private int metadataParallelism = 2; public Optional getProjectId() @@ -342,6 +343,19 @@ public BigQueryConfig setProxyEnabled(boolean proxyEnabled) return this; } + public boolean isProjectionPushdownEnabled() + { + return projectionPushDownEnabled; + } + + @Config("bigquery.projection-pushdown-enabled") + @ConfigDescription("Dereference push down for ROW type") + public BigQueryConfig setProjectionPushdownEnabled(boolean projectionPushDownEnabled) + { + this.projectionPushDownEnabled = projectionPushDownEnabled; + return this; + } + @Min(1) @Max(32) public int getMetadataParallelism() diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index b1fee28fc5ea..bb264e69a528 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -32,15 +32,18 @@ import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Ordering; import com.google.common.io.Closer; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import io.airlift.log.Logger; import io.airlift.slice.Slice; +import io.trino.plugin.base.projection.ApplyProjectionUtil; import io.trino.plugin.bigquery.BigQueryClient.RemoteDatabaseObject; import io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType; import io.trino.plugin.bigquery.ptf.Query.QueryHandle; @@ -78,18 +81,22 @@ import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.expression.Variable; import io.trino.spi.function.table.ConnectorTableFunctionHandle; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.type.BigintType; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; import org.json.JSONArray; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -108,16 +115,22 @@ import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.Futures.allAsList; import static io.trino.plugin.base.TemporaryTables.generateTemporaryTableName; +import static io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation; +import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; +import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_BAD_WRITE; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_TABLE_ERROR; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_UNSUPPORTED_OPERATION; import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE; import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_TIME; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.isProjectionPushdownEnabled; import static io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType.INGESTION; import static io.trino.plugin.bigquery.BigQueryTableHandle.getPartitionType; import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable; @@ -138,6 +151,8 @@ public class BigQueryMetadata { private static final Logger log = Logger.get(BigQueryMetadata.class); private static final Type TRINO_PAGE_SINK_ID_COLUMN_TYPE = BigintType.BIGINT; + private static final Ordering COLUMN_HANDLE_ORDERING = Ordering + .from(Comparator.comparingInt(columnHandle -> columnHandle.dereferenceNames().size())); static final int DEFAULT_NUMERIC_TYPE_PRECISION = 38; static final int DEFAULT_NUMERIC_TYPE_SCALE = 9; @@ -771,7 +786,7 @@ public Optional finishInsert( @Override public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) { - return new BigQueryColumnHandle("$merge_row_id", BIGINT, INT64, true, Field.Mode.REQUIRED, ImmutableList.of(), null, true); + return new BigQueryColumnHandle("$merge_row_id", ImmutableList.of(), BIGINT, INT64, true, Field.Mode.REQUIRED, ImmutableList.of(), null, true); } @Override @@ -882,24 +897,150 @@ public Optional> applyProjecti log.debug("applyProjection(session=%s, handle=%s, projections=%s, assignments=%s)", session, handle, projections, assignments); BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) handle; + if (!isProjectionPushdownEnabled(session)) { + List newColumns = ImmutableList.copyOf(assignments.values()); + if (bigQueryTableHandle.projectedColumns().isPresent() && containSameElements(newColumns, bigQueryTableHandle.projectedColumns().get())) { + return Optional.empty(); + } - List newColumns = ImmutableList.copyOf(assignments.values()); + ImmutableList.Builder projectedColumns = ImmutableList.builder(); + ImmutableList.Builder assignmentList = ImmutableList.builder(); + assignments.forEach((name, column) -> { + BigQueryColumnHandle columnHandle = (BigQueryColumnHandle) column; + projectedColumns.add(columnHandle); + assignmentList.add(new Assignment(name, column, columnHandle.trinoType())); + }); - if (bigQueryTableHandle.projectedColumns().isPresent() && containSameElements(newColumns, bigQueryTableHandle.projectedColumns().get())) { - return Optional.empty(); + bigQueryTableHandle = bigQueryTableHandle.withProjectedColumns(projectedColumns.build()); + + return Optional.of(new ProjectionApplicationResult<>(bigQueryTableHandle, projections, assignmentList.build(), false)); } - ImmutableList.Builder projectedColumns = ImmutableList.builder(); - ImmutableList.Builder assignmentList = ImmutableList.builder(); - assignments.forEach((name, column) -> { - BigQueryColumnHandle columnHandle = (BigQueryColumnHandle) column; - projectedColumns.add(columnHandle); - assignmentList.add(new Assignment(name, column, columnHandle.trinoType())); - }); + // Create projected column representations for supported sub expressions. Simple column references and chain of + // dereferences on a variable are supported right now. + Set projectedExpressions = projections.stream() + .flatMap(expression -> extractSupportedProjectedColumns(expression).stream()) + .collect(toImmutableSet()); + + Map columnProjections = projectedExpressions.stream() + .collect(toImmutableMap(identity(), ApplyProjectionUtil::createProjectedColumnRepresentation)); + + // all references are simple variables + if (columnProjections.values().stream().allMatch(ProjectedColumnRepresentation::isVariable)) { + Set projectedColumns = ImmutableSet.copyOf(projectParentColumns(assignments.values().stream() + .map(BigQueryColumnHandle.class::cast) + .collect(toImmutableList()))); + if (bigQueryTableHandle.projectedColumns().isPresent() && containSameElements(projectedColumns, bigQueryTableHandle.projectedColumns().get())) { + return Optional.empty(); + } + List assignmentsList = assignments.entrySet().stream() + .map(assignment -> new Assignment( + assignment.getKey(), + assignment.getValue(), + ((BigQueryColumnHandle) assignment.getValue()).trinoType())) + .collect(toImmutableList()); + + return Optional.of(new ProjectionApplicationResult<>( + bigQueryTableHandle.withProjectedColumns(ImmutableList.copyOf(projectedColumns)), + projections, + assignmentsList, + false)); + } + + Map newAssignments = new HashMap<>(); + ImmutableMap.Builder newVariablesBuilder = ImmutableMap.builder(); + ImmutableSet.Builder projectedColumnsBuilder = ImmutableSet.builder(); + + for (Map.Entry entry : columnProjections.entrySet()) { + ConnectorExpression expression = entry.getKey(); + ProjectedColumnRepresentation projectedColumn = entry.getValue(); - bigQueryTableHandle = bigQueryTableHandle.withProjectedColumns(projectedColumns.build()); + BigQueryColumnHandle baseColumnHandle = (BigQueryColumnHandle) assignments.get(projectedColumn.getVariable().getName()); + BigQueryColumnHandle projectedColumnHandle = createProjectedColumnHandle(baseColumnHandle, projectedColumn.getDereferenceIndices(), expression.getType()); + String projectedColumnName = projectedColumnHandle.getQualifiedName(); + + Variable projectedColumnVariable = new Variable(projectedColumnName, expression.getType()); + Assignment newAssignment = new Assignment(projectedColumnName, projectedColumnHandle, expression.getType()); + newAssignments.putIfAbsent(projectedColumnName, newAssignment); + + newVariablesBuilder.put(expression, projectedColumnVariable); + projectedColumnsBuilder.add(projectedColumnHandle); + } + + // Modify projections to refer to new variables + Map newVariables = newVariablesBuilder.buildOrThrow(); + List newProjections = projections.stream() + .map(expression -> replaceWithNewVariables(expression, newVariables)) + .collect(toImmutableList()); + + List outputAssignments = newAssignments.values().stream().collect(toImmutableList()); + return Optional.of(new ProjectionApplicationResult<>( + bigQueryTableHandle.withProjectedColumns(projectParentColumns(ImmutableList.copyOf(projectedColumnsBuilder.build()))), + newProjections, + outputAssignments, + false)); + } - return Optional.of(new ProjectionApplicationResult<>(bigQueryTableHandle, projections, assignmentList.build(), false)); + /** + * Creates a set of parent columns for the input projected columns. For example, + * if input {@param columns} include columns "a.b" and "a.b.c", then they will be projected from a single column "a.b". + */ + @VisibleForTesting + static List projectParentColumns(List columnHandles) + { + List sortedColumnHandles = COLUMN_HANDLE_ORDERING.sortedCopy(columnHandles); + List parentColumns = new ArrayList<>(); + for (BigQueryColumnHandle column : sortedColumnHandles) { + if (!parentColumnExists(parentColumns, column)) { + parentColumns.add(column); + } + } + return parentColumns; + } + + private static boolean parentColumnExists(List existingColumns, BigQueryColumnHandle column) + { + for (BigQueryColumnHandle existingColumn : existingColumns) { + List existingColumnDereferenceNames = existingColumn.dereferenceNames(); + verify( + column.dereferenceNames().size() >= existingColumnDereferenceNames.size(), + "Selected column's dereference size must be greater than or equal to the existing column's dereference size"); + if (existingColumn.name().equals(column.name()) + && column.dereferenceNames().subList(0, existingColumnDereferenceNames.size()).equals(existingColumnDereferenceNames)) { + return true; + } + } + return false; + } + + private BigQueryColumnHandle createProjectedColumnHandle(BigQueryColumnHandle baseColumn, List indices, Type projectedColumnType) + { + if (indices.isEmpty()) { + return baseColumn; + } + + ImmutableList.Builder dereferenceNamesBuilder = ImmutableList.builder(); + dereferenceNamesBuilder.addAll(baseColumn.dereferenceNames()); + + Type type = baseColumn.trinoType(); + for (int index : indices) { + checkArgument(type instanceof RowType, "type should be Row type"); + RowType rowType = (RowType) type; + RowType.Field field = rowType.getFields().get(index); + dereferenceNamesBuilder.add(field.getName() + .orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "ROW type does not have field names declared: " + rowType))); + type = field.getType(); + } + return new BigQueryColumnHandle( + baseColumn.name(), + dereferenceNamesBuilder.build(), + projectedColumnType, + typeManager.toStandardSqlTypeName(projectedColumnType), + baseColumn.isPushdownSupported(), + baseColumn.mode(), + baseColumn.subColumns(), + baseColumn.description(), + baseColumn.hidden()); } @Override diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index 027f3bd529be..b4af27b1038f 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -68,10 +70,12 @@ public ConnectorPageSource createPageSource( log.debug("createPageSource(transaction=%s, session=%s, split=%s, table=%s, columns=%s)", transaction, session, split, table, columns); BigQuerySplit bigQuerySplit = (BigQuerySplit) split; - // We expect columns list requested here to match list passed to ConnectorMetadata.applyProjection. - checkArgument(bigQuerySplit.getColumns().isEmpty() || bigQuerySplit.getColumns().equals(columns), - "Requested columns %s do not match list in split %s", columns, bigQuerySplit.getColumns()); - + Set projectedColumnNames = bigQuerySplit.getColumns().stream().map(BigQueryColumnHandle::name).collect(Collectors.toSet()); + // because we apply logic (download only parent columns - BigQueryMetadata.projectParentColumns) + // columns and split columns could differ + columns.stream() + .map(BigQueryColumnHandle.class::cast) + .forEach(column -> checkArgument(projectedColumnNames.contains(column.name()), "projected columns should contain all reader columns")); if (bigQuerySplit.representsEmptyProjection()) { return new BigQueryEmptyProjectionPageSource(bigQuerySplit.getEmptyRowsToGenerate()); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPseudoColumn.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPseudoColumn.java index db7078f59d31..e0d2bd2fc800 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPseudoColumn.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPseudoColumn.java @@ -54,6 +54,7 @@ public BigQueryColumnHandle getColumnHandle() { return new BigQueryColumnHandle( trinoColumnName, + ImmutableList.of(), trinoType, bigqueryType, true, diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 71387dd83713..75bfbb50ef8b 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -77,6 +77,7 @@ public class BigQueryQueryPageSource private final BigQueryTypeManager typeManager; private final List columnHandles; private final PageBuilder pageBuilder; + private final boolean isQueryFunction; private final TableResult tableResult; private boolean finished; @@ -95,13 +96,19 @@ public BigQueryQueryPageSource( this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.columnHandles = requireNonNull(columnHandles, "columnHandles is null"); this.pageBuilder = new PageBuilder(columnHandles.stream().map(BigQueryColumnHandle::trinoType).collect(toImmutableList())); - String sql = buildSql(table, client.getProjectId(), columnHandles, filter); + this.isQueryFunction = table.relationHandle() instanceof BigQueryQueryRelationHandle; + String sql = buildSql( + table, + client.getProjectId(), + ImmutableList.copyOf(columnHandles), + filter); this.tableResult = client.executeQuery(session, sql); } - private static String buildSql(BigQueryTableHandle table, String projectId, List columns, Optional filter) + private String buildSql(BigQueryTableHandle table, String projectId, List columns, Optional filter) { - if (table.relationHandle() instanceof BigQueryQueryRelationHandle queryRelationHandle) { + if (isQueryFunction) { + BigQueryQueryRelationHandle queryRelationHandle = (BigQueryQueryRelationHandle) table.relationHandle(); if (filter.isEmpty()) { return queryRelationHandle.getQuery(); } @@ -144,7 +151,8 @@ public Page getNextPage() for (int column = 0; column < columnHandles.size(); column++) { BigQueryColumnHandle columnHandle = columnHandles.get(column); BlockBuilder output = pageBuilder.getBlockBuilder(column); - appendTo(columnHandle.trinoType(), record.get(columnHandle.name()), output); + FieldValue fieldValue = isQueryFunction ? record.get(columnHandle.name()) : record.get(column); + appendTo(columnHandle.trinoType(), fieldValue, output); } } finished = true; diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java index 009ea5ea9237..894b371739a8 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java @@ -32,6 +32,7 @@ public final class BigQuerySessionProperties private static final String VIEW_MATERIALIZATION_WITH_FILTER = "view_materialization_with_filter"; private static final String QUERY_RESULTS_CACHE_ENABLED = "query_results_cache_enabled"; private static final String CREATE_DISPOSITION_TYPE = "create_disposition_type"; + private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; private final List> sessionProperties; @@ -60,6 +61,11 @@ public BigQuerySessionProperties(BigQueryConfig config) CreateDisposition.class, CreateDisposition.CREATE_IF_NEEDED, // https://cloud.google.com/bigquery/docs/cached-results true)) + .add(booleanProperty( + PROJECTION_PUSHDOWN_ENABLED, + "Dereference push down for STRUCT type", + config.isProjectionPushdownEnabled(), + false)) .build(); } @@ -88,4 +94,9 @@ public static CreateDisposition createDisposition(ConnectorSession session) { return session.getProperty(CREATE_DISPOSITION_TYPE, CreateDisposition.class); } + + public static boolean isProjectionPushdownEnabled(ConnectorSession session) + { + return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class); + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java index 608467e78ff3..712f437d51be 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java @@ -162,8 +162,8 @@ private List readFromBigQuery( log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, filter=[%s])", remoteTableId, projectedColumns, filter); List columns = projectedColumns.get(); List projectedColumnsNames = getProjectedColumnNames(columns); - ImmutableList.Builder additionalDomainColumns = ImmutableList.builder(); - additionalDomainColumns.addAll(columns); + ImmutableList.Builder projectedColumnHandles = ImmutableList.builder(); + projectedColumnHandles.addAll(columns); if (isWildcardTable(type, remoteTableId.getTable())) { // Storage API doesn't support reading wildcard tables @@ -180,9 +180,9 @@ private List readFromBigQuery( tableConstraint.getDomains().ifPresent(domains -> domains.keySet().stream() .map(BigQueryColumnHandle.class::cast) .filter(column -> !projectedColumnsNames.contains(column.name())) - .forEach(additionalDomainColumns::add)); + .forEach(projectedColumnHandles::add)); } - ReadSession readSession = createReadSession(session, remoteTableId, ImmutableList.copyOf(additionalDomainColumns.build()), filter); + ReadSession readSession = createReadSession(session, remoteTableId, ImmutableList.copyOf(projectedColumnHandles.build()), filter); String schemaString = getSchemaAsString(readSession); return readSession.getStreamsList().stream() diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java index d2365b9f171d..48c6147cd247 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java @@ -144,7 +144,7 @@ public Page getNextPage() for (int column = 0; column < columns.size(); column++) { BlockBuilder output = pageBuilder.getBlockBuilder(column); BigQueryColumnHandle columnHandle = columns.get(column); - appendTo(columnHandle.trinoType(), record.get(toBigQueryColumnName(columnHandle.name())), output); + appendTo(columnHandle.trinoType(), getValueRecord(record, columnHandle), output); } } @@ -153,6 +153,23 @@ public Page getNextPage() return page; } + private static Object getValueRecord(GenericRecord record, BigQueryColumnHandle columnHandle) + { + Object valueRecord = record.get(toBigQueryColumnName(columnHandle.name())); + for (String dereferenceName : columnHandle.dereferenceNames()) { + if (valueRecord == null) { + break; + } + if (valueRecord instanceof GenericRecord genericRecord) { + valueRecord = genericRecord.get(dereferenceName); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to extract dereference value from record"); + } + } + return valueRecord; + } + private void appendTo(Type type, Object value, BlockBuilder output) { if (value == null) { diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java index 28d2a3f9eb3a..5785e0ec7b20 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryTypeManager.java @@ -248,7 +248,7 @@ private FieldList toFieldList(RowType rowType) return FieldList.of(fields.build()); } - private StandardSQLTypeName toStandardSqlTypeName(Type type) + StandardSQLTypeName toStandardSqlTypeName(Type type) { if (type == BooleanType.BOOLEAN) { return StandardSQLTypeName.BOOL; @@ -392,6 +392,7 @@ public BigQueryColumnHandle toColumnHandle(Field field) ColumnMapping columnMapping = toTrinoType(field).orElseThrow(() -> new IllegalArgumentException("Unsupported type: " + field)); return new BigQueryColumnHandle( field.getName(), + ImmutableList.of(), columnMapping.type(), field.getType().getStandardType(), columnMapping.isPushdownSupported(), diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java index 9fcb8085c14c..8a28d3a99a20 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java @@ -84,7 +84,7 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List filteredSelectedFields = selectedFields.stream() - .map(BigQueryColumnHandle::name) + .map(BigQueryColumnHandle::getQualifiedName) .map(BigQueryUtil::toBigQueryColumnName) .collect(toList()); diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java index fc4bf46b9eea..58545bdc4d32 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java @@ -82,7 +82,6 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) case SUPPORTS_ADD_COLUMN, SUPPORTS_CREATE_MATERIALIZED_VIEW, SUPPORTS_CREATE_VIEW, - SUPPORTS_DEREFERENCE_PUSHDOWN, SUPPORTS_MERGE, SUPPORTS_NEGATIVE_DATE, SUPPORTS_NOT_NULL_CONSTRAINT, diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java index ec9875b77779..5df1f7c6a436 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java @@ -13,14 +13,21 @@ */ package io.trino.plugin.bigquery; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; +import io.trino.Session; +import io.trino.sql.planner.plan.ProjectNode; +import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.Set; +import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; import static io.trino.testing.TestingNames.randomNameSuffix; import static org.assertj.core.api.Assertions.assertThat; @@ -83,4 +90,183 @@ public void testSelectFailsForColumnName() } } } + + @Override + @Test + public void testProjectionPushdown() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_pushdown_", + "(id BIGINT, root ROW(f1 BIGINT, f2 BIGINT))", + ImmutableList.of("(1, ROW(1, 2))", "(2, NULl)", "(3, ROW(NULL, 4))"))) { + String selectQuery = "SELECT id, root.f1 FROM " + testTable.getName(); + String expectedResult = "VALUES (BIGINT '1', BIGINT '1'), (BIGINT '2', NULL), (BIGINT '3', NULL)"; + + // With Projection Pushdown enabled + assertThat(query(selectQuery)) + .matches(expectedResult) + .isFullyPushedDown(); + + // With Projection Pushdown disabled + Session sessionWithoutPushdown = sessionWithProjectionPushdownDisabled(getSession()); + assertThat(query(sessionWithoutPushdown, selectQuery)) + .matches(expectedResult) + .isNotFullyPushedDown(ProjectNode.class); + } + } + + @Override + @Test + public void testProjectionWithCaseSensitiveField() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_with_case_sensitive_field_", + "(id INT, a ROW(\"UPPER_CASE\" INT, \"lower_case\" INT, \"MiXeD_cAsE\" INT))", + ImmutableList.of("(1, ROW(2, 3, 4))", "(5, ROW(6, 7, 8))"))) { + // shippriority column is bigint (not integer) in BigQuery connector + String expected = "VALUES (BIGINT '2', BIGINT '3', BIGINT '4'), (BIGINT '6', BIGINT '7', BIGINT '8')"; + assertThat(query("SELECT \"a\".\"UPPER_CASE\", \"a\".\"lower_case\", \"a\".\"MiXeD_cAsE\" FROM " + testTable.getName())) + .matches(expected) + .isFullyPushedDown(); + assertThat(query("SELECT \"a\".\"upper_case\", \"a\".\"lower_case\", \"a\".\"mixed_case\" FROM " + testTable.getName())) + .matches(expected) + .isFullyPushedDown(); + assertThat(query("SELECT \"a\".\"UPPER_CASE\", \"a\".\"LOWER_CASE\", \"a\".\"MIXED_CASE\" FROM " + testTable.getName())) + .matches(expected) + .isFullyPushedDown(); + } + } + + @Override + @Test + public void testProjectionPushdownMultipleRows() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_pushdown_multiple_rows_", + "(id INT, nested1 ROW(child1 INT, child2 VARCHAR, child3 INT), nested2 ROW(child1 DOUBLE, child2 BOOLEAN, child3 DATE))", + ImmutableList.of( + "(1, ROW(10, 'a', 100), ROW(10.10, true, DATE '2023-04-19'))", + "(2, ROW(20, 'b', 200), ROW(20.20, false, DATE '1990-04-20'))", + "(4, ROW(40, NULL, 400), NULL)", + "(5, NULL, ROW(NULL, true, NULL))"))) { + // Select one field from one row field + assertThat(query("SELECT id, nested1.child1 FROM " + testTable.getName())) + .matches("VALUES (BIGINT '1', BIGINT '10'), (BIGINT '2', BIGINT '20'), (BIGINT '4', BIGINT '40'), (BIGINT '5', NULL)") + .isFullyPushedDown(); + assertThat(query("SELECT nested2.child3, id FROM " + testTable.getName())) + .matches("VALUES (DATE '2023-04-19', BIGINT '1'), (DATE '1990-04-20', BIGINT '2'), (NULL, BIGINT '4'), (NULL, BIGINT '5')") + .isFullyPushedDown(); + + // Select one field each from multiple row fields + assertThat(query("SELECT nested2.child1, id, nested1.child2 FROM " + testTable.getName())) + .skippingTypesCheck() + .matches("VALUES (DOUBLE '10.10', BIGINT '1', 'a'), (DOUBLE '20.20', BIGINT '2', 'b'), (NULL, BIGINT '4', NULL), (NULL, BIGINT '5', NULL)") + .isFullyPushedDown(); + + // Select multiple fields from one row field + assertThat(query("SELECT nested1.child3, id, nested1.child2 FROM " + testTable.getName())) + .skippingTypesCheck() + .matches("VALUES (BIGINT '100', BIGINT '1', 'a'), (BIGINT '200', BIGINT '2', 'b'), (BIGINT '400', BIGINT '4', NULL), (NULL, BIGINT '5', NULL)") + .isFullyPushedDown(); + assertThat(query("SELECT nested2.child2, nested2.child3, id FROM " + testTable.getName())) + .matches("VALUES (true, DATE '2023-04-19' , BIGINT '1'), (false, DATE '1990-04-20', BIGINT '2'), (NULL, NULL, BIGINT '4'), (true, NULL, BIGINT '5')") + .isFullyPushedDown(); + + // Select multiple fields from multiple row fields + assertThat(query("SELECT id, nested2.child1, nested1.child3, nested2.child2, nested1.child1 FROM " + testTable.getName())) + .matches("VALUES (BIGINT '1', DOUBLE '10.10', BIGINT '100', true, BIGINT '10'), (BIGINT '2', DOUBLE '20.20', BIGINT '200', false, BIGINT '20'), (BIGINT '4', NULL, BIGINT '400', NULL, BIGINT '40'), (BIGINT '5', NULL, NULL, true, NULL)") + .isFullyPushedDown(); + + // Select only nested fields + assertThat(query("SELECT nested2.child2, nested1.child3 FROM " + testTable.getName())) + .matches("VALUES (true, BIGINT '100'), (false, BIGINT '200'), (NULL, BIGINT '400'), (true, NULL)") + .isFullyPushedDown(); + } + } + + @Override + @Test + public void testProjectionPushdownReadsLessData() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_pushdown_reads_less_data_", + "AS SELECT val AS id, CAST(ROW(val + 1, val + 2) AS ROW(leaf1 BIGINT, leaf2 BIGINT)) AS root FROM UNNEST(SEQUENCE(1, 10)) AS t(val)")) { + MaterializedResult expectedResult = computeActual("SELECT val + 2 FROM UNNEST(SEQUENCE(1, 10)) AS t(val)"); + String selectQuery = "SELECT root.leaf2 FROM " + testTable.getName(); + Session sessionWithoutPushdown = sessionWithProjectionPushdownDisabled(getSession()); + + assertQueryStats( + getSession(), + selectQuery, + statsWithPushdown -> { + DataSize physicalInputDataSizeWithPushdown = statsWithPushdown.getPhysicalInputDataSize(); + DataSize processedDataSizeWithPushdown = statsWithPushdown.getProcessedInputDataSize(); + assertQueryStats( + sessionWithoutPushdown, + selectQuery, + statsWithoutPushdown -> { + if (supportsPhysicalPushdown()) { + assertThat(statsWithoutPushdown.getPhysicalInputDataSize()).isGreaterThan(physicalInputDataSizeWithPushdown); + } + else { + // TODO https://github.com/trinodb/trino/issues/17201 + assertThat(statsWithoutPushdown.getPhysicalInputDataSize()).isEqualTo(physicalInputDataSizeWithPushdown); + } + assertThat(statsWithoutPushdown.getProcessedInputDataSize()).isGreaterThan(processedDataSizeWithPushdown); + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(expectedResult.getOnlyColumnAsSet())); + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(expectedResult.getOnlyColumnAsSet())); + } + } + + @Override + @Test + public void testProjectionPushdownPhysicalInputSize() + { + try (TestTable testTable = new TestTable( + getQueryRunner()::execute, + "test_projection_pushdown_physical_input_size_", + "AS SELECT val AS id, CAST(ROW(val + 1, val + 2) AS ROW(leaf1 BIGINT, leaf2 BIGINT)) AS root FROM UNNEST(SEQUENCE(1, 10)) AS t(val)")) { + // Verify that the physical input size is smaller when reading the root.leaf1 field compared to reading the root field + assertQueryStats( + getSession(), + "SELECT root FROM " + testTable.getName(), + statsWithSelectRootField -> { + assertQueryStats( + getSession(), + "SELECT root.leaf1 FROM " + testTable.getName(), + statsWithSelectLeafField -> { + if (supportsPhysicalPushdown()) { + assertThat(statsWithSelectLeafField.getPhysicalInputDataSize()).isLessThan(statsWithSelectRootField.getPhysicalInputDataSize()); + } + else { + // TODO https://github.com/trinodb/trino/issues/17201 + assertThat(statsWithSelectLeafField.getPhysicalInputDataSize()).isEqualTo(statsWithSelectRootField.getPhysicalInputDataSize()); + } + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(computeActual("SELECT val + 1 FROM UNNEST(SEQUENCE(1, 10)) AS t(val)").getOnlyColumnAsSet())); + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(computeActual("SELECT ROW(val + 1, val + 2) FROM UNNEST(SEQUENCE(1, 10)) AS t(val)").getOnlyColumnAsSet())); + + // Verify that the physical input size is the same when reading the root field compared to reading both the root and root.leaf1 fields + assertQueryStats( + getSession(), + "SELECT root FROM " + testTable.getName(), + statsWithSelectRootField -> { + assertQueryStats( + getSession(), + "SELECT root, root.leaf1 FROM " + testTable.getName(), + statsWithSelectRootAndLeafField -> { + assertThat(statsWithSelectRootAndLeafField.getPhysicalInputDataSize()).isEqualTo(statsWithSelectRootField.getPhysicalInputDataSize()); + }, + results -> assertEqualsIgnoreOrder(results.getMaterializedRows(), computeActual("SELECT ROW(val + 1, val + 2), val + 1 FROM UNNEST(SEQUENCE(1, 10)) AS t(val)").getMaterializedRows())); + }, + results -> assertThat(results.getOnlyColumnAsSet()).isEqualTo(computeActual("SELECT ROW(val + 1, val + 2) FROM UNNEST(SEQUENCE(1, 10)) AS t(val)").getOnlyColumnAsSet())); + } + } } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java index 51b8a6dc1a7f..8ba85db57dea 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java @@ -54,6 +54,7 @@ public void testDefaults() .setQueryLabelName(null) .setQueryLabelFormat(null) .setProxyEnabled(false) + .setProjectionPushdownEnabled(true) .setMetadataParallelism(2)); } @@ -82,6 +83,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .put("bigquery.job.label-format", "$TRACE_TOKEN") .put("bigquery.rpc-proxy.enabled", "true") .put("bigquery.metadata.parallelism", "31") + .put("bigquery.projection-pushdown-enabled", "false") .buildOrThrow(); BigQueryConfig expected = new BigQueryConfig() @@ -105,6 +107,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .setQueryLabelName("trino_job_name") .setQueryLabelFormat("$TRACE_TOKEN") .setProxyEnabled(true) + .setProjectionPushdownEnabled(false) .setMetadataParallelism(31); assertFullMapping(properties, expected); diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryMetadata.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryMetadata.java index 770b665508ab..2af3bd99fbf9 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryMetadata.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryMetadata.java @@ -15,9 +15,17 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; +import com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; +import java.util.List; + +import static com.google.cloud.bigquery.Field.Mode.NULLABLE; +import static com.google.cloud.bigquery.StandardSQLTypeName.BIGNUMERIC; +import static io.trino.plugin.bigquery.BigQueryMetadata.projectParentColumns; import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor; +import static io.trino.spi.type.BigintType.BIGINT; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; public class TestBigQueryMetadata @@ -31,4 +39,89 @@ public void testDatasetNotFoundMessage() .isThrownBy(() -> bigQuery.listTables("test_dataset_not_found")) .matches(e -> e.getCode() == 404 && e.getMessage().contains("Not found: Dataset")); } + + @Test + public void testProjectParentColumnsSingleParent() + { + BigQueryColumnHandle parentColumn = testingColumn("a", ImmutableList.of()); + List columns = ImmutableList.of( + parentColumn, + testingColumn("a", ImmutableList.of("b")), + testingColumn("a", ImmutableList.of("b", "c", "d")), + testingColumn("a", ImmutableList.of("b", "c"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(1); + assertThat(parentColumns.getFirst()).isEqualTo(parentColumn); + } + + @Test + public void testProjectParentColumnsSingleParentDifferentOrder() + { + BigQueryColumnHandle parentColumn = testingColumn("a", ImmutableList.of()); + List columns = ImmutableList.of( + parentColumn, + testingColumn("a", ImmutableList.of("b")), + testingColumn("a", ImmutableList.of("d", "c", "b")), + testingColumn("a", ImmutableList.of("b", "c", "d")), + testingColumn("a", ImmutableList.of("b", "c"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(1); + assertThat(parentColumns.getFirst()).isEqualTo(parentColumn); + } + + @Test + public void testProjectParentColumnsNoParentDifferentOrder() + { + List columns = ImmutableList.of( + testingColumn("a", ImmutableList.of("b", "c", "d")), + testingColumn("a", ImmutableList.of("d", "c", "b"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(2); + } + + @Test + public void testProjectParentColumnsSingleParentSuddenJump() + { + BigQueryColumnHandle parentColumn = testingColumn("a", ImmutableList.of()); + List columns = ImmutableList.of( + parentColumn, + testingColumn("a", ImmutableList.of("d", "c", "b"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(1); + assertThat(parentColumns.getFirst()).isEqualTo(parentColumn); + } + + @Test + public void testProjectParentColumnsMultipleParent() + { + BigQueryColumnHandle parentColumn = testingColumn("a", ImmutableList.of()); + BigQueryColumnHandle anotherParentColumn = testingColumn("a1", ImmutableList.of()); + List columns = ImmutableList.of( + parentColumn, + anotherParentColumn, + testingColumn("a", ImmutableList.of("b", "c", "d")), + testingColumn("a", ImmutableList.of("b", "c"))); + + List parentColumns = projectParentColumns(columns); + assertThat(parentColumns).size().isEqualTo(2); + assertThat(parentColumns).containsExactlyInAnyOrder(parentColumn, anotherParentColumn); + } + + private static BigQueryColumnHandle testingColumn(String name, List dereferenceNames) + { + return new BigQueryColumnHandle( + name, + dereferenceNames, + BIGINT, + BIGNUMERIC, + false, + NULLABLE, + ImmutableList.of(), + "description", + false); + } } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java index 4c192bedc61c..2e0175d79aef 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java @@ -80,7 +80,7 @@ void testBigQueryMaterializedView() assertThat(readSession.getTable()).contains(TEMP_TABLE_PREFIX); // Ignore constraints when creating temporary tables by default (view_materialization_with_filter is false) - BigQueryColumnHandle column = new BigQueryColumnHandle("cnt", BIGINT, INT64, true, REQUIRED, ImmutableList.of(), null, false); + BigQueryColumnHandle column = new BigQueryColumnHandle("cnt", ImmutableList.of(), BIGINT, INT64, true, REQUIRED, ImmutableList.of(), null, false); BigQueryTableHandle tableDifferentFilter = new BigQueryTableHandle(table.relationHandle(), TupleDomain.fromFixedValues(ImmutableMap.of(column, new NullableValue(BIGINT, 0L))), table.projectedColumns()); assertThat(createReadSession(session, tableDifferentFilter).getTable()) .isEqualTo(readSession.getTable());