From d4c275c9bbbc9f03f9a56668c1f0fe41ab53c3f7 Mon Sep 17 00:00:00 2001 From: WenjunMin Date: Wed, 30 Oct 2024 13:41:58 +0800 Subject: [PATCH] Expose metrics for paimon presto reader (#42) --- .../paimon/presto/SimpleTableTestHelper.java | 2 +- .../paimon/presto/SimpleTableTestHelper.java | 3 +- .../paimon/presto/TestPrestoITCase.java | 3 +- .../paimon/presto/PrestoPageSourceBase.java | 55 +++++++++++-------- .../paimon/presto/SimpleTableTestHelper.java | 3 +- .../paimon/presto/TestPrestoITCase.java | 7 ++- .../prestosql/SimpleTableTestHelper.java | 3 +- .../paimon/prestosql/TestPrestoSqlITCase.java | 5 +- .../prestosql/SimpleTableTestHelper.java | 3 +- .../paimon/prestosql/TestPrestoSqlTCase.java | 5 +- 10 files changed, 54 insertions(+), 35 deletions(-) diff --git a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java index 9ce28fe..6a17eee 100644 --- a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java +++ b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java @@ -41,7 +41,7 @@ public class SimpleTableTestHelper { public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { Map options = new HashMap<>(); - options.put("bucket", "2"); + options.put("bucket", "1"); new SchemaManager(LocalFileIO.create(), path) .createTable( new Schema( diff --git a/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java b/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java index a1057d7..b2fcf80 100644 --- a/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java +++ b/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableCommit; @@ -44,7 +45,7 @@ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("a"), - Collections.emptyMap(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path); String user = "user"; diff --git a/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java b/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java index 46dc6f2..3e475ed 100644 --- a/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java +++ b/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableCommit; @@ -130,7 +131,7 @@ protected QueryRunner createQueryRunner() throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("i"), - new HashMap<>(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4); InnerTableWrite writer = table.newWrite("user"); diff --git a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java index a56de27..8eaf987 100644 --- a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java +++ b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java @@ -82,9 +82,12 @@ public abstract class PrestoPageSourceBase implements ConnectorPageSource { private final PageBuilder pageBuilder; private final List prestoColumnTypes; private final List paimonColumnTypes; + private long completedBytes; + private long readTimeNanos; + private long memoryUsageBytes; + private long numReturn = 0; private boolean isFinished = false; - private long numReturn = 0; public PrestoPageSourceBase( RecordReader reader, List projectedColumns) { @@ -102,17 +105,17 @@ public PrestoPageSourceBase( @Override public long getCompletedBytes() { - return 0; + return completedBytes; } @Override public long getCompletedPositions() { - return 0; + return numReturn; } @Override public long getReadTimeNanos() { - return 0; + return readTimeNanos; } @Override @@ -135,32 +138,37 @@ public Page getNextPage() { @Override public long getSystemMemoryUsage() { - return 0; + return memoryUsageBytes; } @Nullable private Page nextPage() throws IOException { int count = 0; - while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) { - if (!iterator.hasNext()) { - isFinished = true; - return returnPage(count); - } + long start = System.nanoTime(); + try { + while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) { + if (!iterator.hasNext()) { + isFinished = true; + return returnPage(count); + } - InternalRow row = iterator.next(); - pageBuilder.declarePosition(); - count++; - for (int i = 0; i < prestoColumnTypes.size(); i++) { - BlockBuilder output = pageBuilder.getBlockBuilder(i); - appendTo( - prestoColumnTypes.get(i), - paimonColumnTypes.get(i), - InternalRowUtils.get(row, i, paimonColumnTypes.get(i)), - output); + InternalRow row = iterator.next(); + pageBuilder.declarePosition(); + count++; + for (int i = 0; i < prestoColumnTypes.size(); i++) { + BlockBuilder output = pageBuilder.getBlockBuilder(i); + appendTo( + prestoColumnTypes.get(i), + paimonColumnTypes.get(i), + InternalRowUtils.get(row, i, paimonColumnTypes.get(i)), + output); + } } - } - return returnPage(count); + return returnPage(count); + } finally { + readTimeNanos += System.nanoTime() - start; + } } private Page returnPage(int count) { @@ -169,6 +177,9 @@ private Page returnPage(int count) { } numReturn += count; Page page = pageBuilder.build(); + long pageSizeInBytes = page.getSizeInBytes(); + completedBytes += pageSizeInBytes; + memoryUsageBytes = Math.max(memoryUsageBytes, pageSizeInBytes); pageBuilder.reset(); return page; } diff --git a/paimon-presto-common/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java b/paimon-presto-common/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java index a1057d7..b2fcf80 100644 --- a/paimon-presto-common/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java +++ b/paimon-presto-common/src/test/java/org/apache/paimon/presto/SimpleTableTestHelper.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableCommit; @@ -44,7 +45,7 @@ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("a"), - Collections.emptyMap(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path); String user = "user"; diff --git a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java index 0a0f0cb..04d63bf 100644 --- a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java +++ b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java @@ -26,6 +26,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableCommit; @@ -143,7 +144,7 @@ protected QueryRunner createQueryRunner() throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("i"), - new HashMap<>(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4); InnerTableWrite writer = table.newWrite("user"); @@ -174,7 +175,7 @@ protected QueryRunner createQueryRunner() throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("ts"), - new HashMap<>(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath5); InnerTableWrite writer = table.newWrite("user"); @@ -204,7 +205,7 @@ protected QueryRunner createQueryRunner() throws Exception { rowType.getFields(), Collections.emptyList(), Arrays.asList("c1", "c2"), - new HashMap<>(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath5); InnerTableWrite writer = table.newWrite("user"); diff --git a/paimon-prestosql-332/src/test/java/org/apache/paimon/prestosql/SimpleTableTestHelper.java b/paimon-prestosql-332/src/test/java/org/apache/paimon/prestosql/SimpleTableTestHelper.java index 503aab1..b97476b 100644 --- a/paimon-prestosql-332/src/test/java/org/apache/paimon/prestosql/SimpleTableTestHelper.java +++ b/paimon-prestosql-332/src/test/java/org/apache/paimon/prestosql/SimpleTableTestHelper.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableCommit; @@ -44,7 +45,7 @@ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("a"), - Collections.emptyMap(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path); String user = "user"; diff --git a/paimon-prestosql-332/src/test/java/org/apache/paimon/prestosql/TestPrestoSqlITCase.java b/paimon-prestosql-332/src/test/java/org/apache/paimon/prestosql/TestPrestoSqlITCase.java index f841072..8b2394c 100644 --- a/paimon-prestosql-332/src/test/java/org/apache/paimon/prestosql/TestPrestoSqlITCase.java +++ b/paimon-prestosql-332/src/test/java/org/apache/paimon/prestosql/TestPrestoSqlITCase.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableCommit; @@ -104,7 +105,7 @@ protected QueryRunner createQueryRunner() throws Exception { rowType.getFields(), Collections.singletonList("pt"), Collections.emptyList(), - new HashMap<>(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath3); InnerTableWrite writer = table.newWrite("user"); @@ -133,7 +134,7 @@ protected QueryRunner createQueryRunner() throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("i"), - new HashMap<>(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4); InnerTableWrite writer = table.newWrite("user"); diff --git a/paimon-prestosql-common/src/test/java/org/apache/paimon/prestosql/SimpleTableTestHelper.java b/paimon-prestosql-common/src/test/java/org/apache/paimon/prestosql/SimpleTableTestHelper.java index 503aab1..b97476b 100644 --- a/paimon-prestosql-common/src/test/java/org/apache/paimon/prestosql/SimpleTableTestHelper.java +++ b/paimon-prestosql-common/src/test/java/org/apache/paimon/prestosql/SimpleTableTestHelper.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableCommit; @@ -44,7 +45,7 @@ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("a"), - Collections.emptyMap(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path); String user = "user"; diff --git a/paimon-prestosql-common/src/test/java/org/apache/paimon/prestosql/TestPrestoSqlTCase.java b/paimon-prestosql-common/src/test/java/org/apache/paimon/prestosql/TestPrestoSqlTCase.java index 16be38f..39af5b9 100644 --- a/paimon-prestosql-common/src/test/java/org/apache/paimon/prestosql/TestPrestoSqlTCase.java +++ b/paimon-prestosql-common/src/test/java/org/apache/paimon/prestosql/TestPrestoSqlTCase.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.sink.InnerTableCommit; @@ -105,7 +106,7 @@ protected QueryRunner createQueryRunner() throws Exception { rowType.getFields(), Collections.singletonList("pt"), Collections.emptyList(), - new HashMap<>(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath3); InnerTableWrite writer = table.newWrite("user"); @@ -134,7 +135,7 @@ protected QueryRunner createQueryRunner() throws Exception { rowType.getFields(), Collections.emptyList(), Collections.singletonList("i"), - new HashMap<>(), + ImmutableMap.of("bucket", "1"), "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4); InnerTableWrite writer = table.newWrite("user");