From ba32e6e0f434cee05ec905bea547310cc53ddc54 Mon Sep 17 00:00:00 2001 From: Xiangyu Feng Date: Tue, 31 Dec 2024 17:03:08 +0800 Subject: [PATCH] [Presto] Support time travel scan for paimon-presto connector --- .../paimon/presto/TestPrestoITCase.java | 26 ++++++++++++++++++- .../apache/paimon/presto/PrestoMetadata.java | 17 +++++++++--- .../presto/PrestoSessionProperties.java | 16 +++++++++++- .../paimon/presto/TestPrestoITCase.java | 13 +++++++++- 4 files changed, 66 insertions(+), 6 deletions(-) diff --git a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java index 344ed55..d57d9db 100644 --- a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java +++ b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java @@ -37,6 +37,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import com.facebook.presto.Session; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.DistributedQueryRunner; @@ -49,6 +50,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static com.facebook.airlift.testing.Closeables.closeAllSuppress; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; @@ -212,7 +214,18 @@ public void testProjection() throws Exception { @Test public void testFilter() throws Exception { - assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4")) + assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 7")) + .isEqualTo("[[1, 1], [3, 2], [5, 3]]"); + } + + @Test + public void testFilterWithTimeTravel() throws Exception { + // Time travel table t2 to first commit. + assertThat( + sql( + "SELECT a, aCa FROM paimon.default.t2 WHERE a < 7", + PrestoSessionProperties.SCAN_VERSION, + "1")) .isEqualTo("[[1, 1], [3, 2]]"); } @@ -224,6 +237,17 @@ public void testGroupByWithCast() throws Exception { .isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]"); } + private String sql(String sql, String key, String value) throws Exception { + Session session = + testSessionBuilder().setCatalogSessionProperty("paimon", key, value).build(); + MaterializedResult result = queryRunner.execute(session, sql); + return result.getMaterializedRows().stream() + .map(Object::toString) + .sorted() + .collect(Collectors.toList()) + .toString(); + } + private String sql(String sql) throws Exception { MaterializedResult result = queryRunner.execute(sql); return result.getMaterializedRows().toString(); diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java index 80463b7..b1a3aeb 100644 --- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java +++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java @@ -27,6 +27,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.security.SecurityContext; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.InstantiationUtil; import org.apache.paimon.utils.StringUtils; @@ -130,14 +131,24 @@ public void dropSchema(ConnectorSession session, String schemaName) { @Override public PrestoTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { - return getTableHandle(tableName); + return getTableHandle(tableName, PrestoSessionProperties.getScanVersion(session)); } - public PrestoTableHandle getTableHandle(SchemaTableName tableName) { + public PrestoTableHandle getTableHandle(SchemaTableName tableName, String scanVersion) { Identifier tablePath = new Identifier(tableName.getSchemaName(), tableName.getTableName()); byte[] serializedTable; try { - serializedTable = InstantiationUtil.serializeObject(catalog.getTable(tablePath)); + Table table = catalog.getTable(tablePath); + if (!StringUtils.isBlank(scanVersion)) { + table = + table.copy( + new HashMap() { + { + put(CoreOptions.SCAN_VERSION.key(), scanVersion); + } + }); + } + serializedTable = InstantiationUtil.serializeObject(table); } catch (Catalog.TableNotExistException e) { return null; } catch (IOException e) { diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java index 3888cb9..2d4704f 100644 --- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java +++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java @@ -27,12 +27,16 @@ import java.util.List; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; +import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; /** Presto {@link PrestoSessionProperties}. */ public class PrestoSessionProperties { public static final String QUERY_PUSHDOWN_ENABLED = "query_pushdown_enabled"; public static final String PARTITION_PRUNE_ENABLED = "partition_prune_enabled"; + public static final String RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED = + "range_filters_on_subscripts_enabled"; + public static final String SCAN_VERSION = "scan_version"; private final List> sessionProperties; @@ -49,7 +53,13 @@ public PrestoSessionProperties(PaimonConfig config) { PARTITION_PRUNE_ENABLED, "Enable paimon query partition prune", config.isPaimonPartitionPruningEnabled(), - false)); + false), + booleanProperty( + RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED, + "Whether to enable pushdown of range filters on subscripts like (a[2] = 5)", + false, + false), + stringProperty(SCAN_VERSION, "Paimon table scan version", null, false)); } public List> getSessionProperties() { @@ -63,4 +73,8 @@ public static boolean isPaimonPushdownEnabled(ConnectorSession session) { public static boolean isPartitionPruneEnabled(ConnectorSession session) { return session.getProperty(PARTITION_PRUNE_ENABLED, Boolean.class); } + + public static String getScanVersion(ConnectorSession session) { + return session.getProperty(SCAN_VERSION, String.class); + } } diff --git a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java index 03678f2..8643fc3 100644 --- a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java +++ b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java @@ -355,7 +355,18 @@ public void testProjection() throws Exception { @Test public void testFilter() throws Exception { - assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4")) + assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 7")) + .isEqualTo("[[1, 1], [3, 2], [5, 3]]"); + } + + @Test + public void testFilterWithTimeTravel() throws Exception { + // Time travel table t2 to first commit. + assertThat( + sql( + "SELECT a, aCa FROM paimon.default.t2 WHERE a < 7", + PrestoSessionProperties.SCAN_VERSION, + "1")) .isEqualTo("[[1, 1], [3, 2]]"); }