Skip to content

Commit

Permalink
(feat) Support partition pushdown with complex predicate (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Dec 16, 2024
1 parent d4c275c commit 80d45e8
Show file tree
Hide file tree
Showing 12 changed files with 823 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.PlanNode;
Expand Down Expand Up @@ -54,14 +55,21 @@ public class PrestoComputePushdown implements ConnectorPlanOptimizer {

private final StandardFunctionResolution functionResolution;
private final RowExpressionService rowExpressionService;
private final FunctionMetadataManager functionMetadataManager;
private final PrestoTransactionManager transactionManager;

public PrestoComputePushdown(
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService) {
RowExpressionService rowExpressionService,
FunctionMetadataManager functionMetadataManager,
PrestoTransactionManager transactionManager) {

this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService =
requireNonNull(rowExpressionService, "rowExpressionService is null");
this.functionMetadataManager =
requireNonNull(functionMetadataManager, "functionMetadataManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
}

@Override
Expand Down Expand Up @@ -180,7 +188,8 @@ public PlanNode visitFilter(FilterNode filter, Void context) {
oldPrestoTableHandle.getTableName(),
oldPrestoTableHandle.getSerializedTable(),
entireColumnDomain,
projectedColumns);
projectedColumns,
Optional.empty());

PrestoTableLayoutHandle newLayoutHandle =
new PrestoTableLayoutHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,22 @@

package org.apache.paimon.presto;

import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.InstantiationUtil;

import com.facebook.presto.block.BlockEncodingManager;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.metadata.FunctionManager;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
Expand All @@ -40,6 +53,7 @@
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.TestingRowExpressionTranslator;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.gen.RowExpressionPredicateCompiler;
import com.facebook.presto.sql.planner.PlanVariableAllocator;
import com.facebook.presto.sql.planner.TypeProvider;
Expand All @@ -52,8 +66,12 @@
import com.facebook.presto.type.TypeRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -102,6 +120,33 @@ public String formatRowExpression(
}
};

public byte[] table;

@BeforeTest
public void setUp() throws Exception {
String warehouse =
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
Path tablePath3 = new Path(warehouse, "default.db/t3");
RowType rowType =
new RowType(
Arrays.asList(
new DataField(0, "pt", new VarCharType()),
new DataField(1, "a", new IntType()),
new DataField(2, "b", new BigIntType()),
new DataField(3, "c", new BigIntType()),
new DataField(4, "d", new IntType())));
new SchemaManager(LocalFileIO.create(), tablePath3)
.createTable(
new Schema(
rowType.getFields(),
Collections.singletonList("pt"),
Collections.emptyList(),
new HashMap<>(),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
this.table = InstantiationUtil.serializeObject(table);
}

private TableScanNode createTableScan() {
PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
VariableReferenceExpression variableA = variableAllocator.newVariable("a", BIGINT);
Expand All @@ -114,7 +159,7 @@ private TableScanNode createTableScan() {
"id", new BigIntType(), new TypeRegistry()))
.build();

PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test", "table".getBytes());
PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test", this.table);

return new TableScanNode(
new PlanNodeId(UUID.randomUUID().toString()),
Expand All @@ -127,8 +172,9 @@ private TableScanNode createTableScan() {
new PrestoTableHandle(
"test",
"test",
"table".getBytes(),
this.table,
TupleDomain.all(),
Optional.empty(),
Optional.empty()),
TupleDomain.all()))),
ImmutableList.copyOf(assignments.keySet()),
Expand Down Expand Up @@ -171,7 +217,14 @@ public void testOptimizeFilter() {
PrestoSessionProperties prestoSessionProperties = new PrestoSessionProperties(config);

PrestoComputePushdown prestoComputePushdown =
new PrestoComputePushdown(FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE);
new PrestoComputePushdown(
FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE,
new FunctionManager(
new TypeRegistry(),
new BlockEncodingManager(new TypeRegistry()),
new FeaturesConfig()),
new PrestoTransactionManager());

PlanNode mockInputPlan = createFilterNode();
ConnectorSession session =
Expand Down Expand Up @@ -219,7 +272,14 @@ public void testNotOptimizeFilter() {
PrestoSessionProperties prestoSessionProperties = new PrestoSessionProperties(config);

PrestoComputePushdown prestoComputePushdown =
new PrestoComputePushdown(FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE);
new PrestoComputePushdown(
FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE,
new FunctionManager(
new TypeRegistry(),
new BlockEncodingManager(new TypeRegistry()),
new FeaturesConfig()),
new PrestoTransactionManager());

PlanNode mockInputPlan = createFilterNode();
ConnectorSession session =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class PaimonConfig {
private String metastore;
private String uri;
private boolean paimonPushdownEnabled = true;
private boolean paimonPartitionPruningEnabled = true;

public String getWarehouse() {
return warehouse;
Expand Down Expand Up @@ -69,4 +70,15 @@ public PaimonConfig setPaimonPushdownEnabled(boolean paimonPushdownEnabled) {
this.paimonPushdownEnabled = paimonPushdownEnabled;
return this;
}

public boolean isPaimonPartitionPruningEnabled() {
return paimonPartitionPruningEnabled;
}

@Config("paimon.partition-prune-enabled")
@ConfigDescription("Enable paimon query partition prune")
public PaimonConfig setPaimonPartitionPruningEnabled(boolean paimonPartitionPruningEnabled) {
this.paimonPartitionPruningEnabled = paimonPartitionPruningEnabled;
return this;
}
}
Loading

0 comments on commit 80d45e8

Please sign in to comment.