Skip to content

Commit

Permalink
Expose metrics for paimon presto reader (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Oct 30, 2024
1 parent a01736f commit d4c275c
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SimpleTableTestHelper {

public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
Map<String, String> options = new HashMap<>();
options.put("bucket", "2");
options.put("bucket", "1");
new SchemaManager(LocalFileIO.create(), path)
.createTable(
new Schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand All @@ -44,7 +45,7 @@ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("a"),
Collections.emptyMap(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path);
String user = "user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand Down Expand Up @@ -130,7 +131,7 @@ protected QueryRunner createQueryRunner() throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("i"),
new HashMap<>(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
InnerTableWrite writer = table.newWrite("user");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ public abstract class PrestoPageSourceBase implements ConnectorPageSource {
private final PageBuilder pageBuilder;
private final List<Type> prestoColumnTypes;
private final List<DataType> paimonColumnTypes;
private long completedBytes;
private long readTimeNanos;
private long memoryUsageBytes;
private long numReturn = 0;

private boolean isFinished = false;
private long numReturn = 0;

public PrestoPageSourceBase(
RecordReader<InternalRow> reader, List<ColumnHandle> projectedColumns) {
Expand All @@ -102,17 +105,17 @@ public PrestoPageSourceBase(

@Override
public long getCompletedBytes() {
return 0;
return completedBytes;
}

@Override
public long getCompletedPositions() {
return 0;
return numReturn;
}

@Override
public long getReadTimeNanos() {
return 0;
return readTimeNanos;
}

@Override
Expand All @@ -135,32 +138,37 @@ public Page getNextPage() {

@Override
public long getSystemMemoryUsage() {
return 0;
return memoryUsageBytes;
}

@Nullable
private Page nextPage() throws IOException {
int count = 0;
while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) {
if (!iterator.hasNext()) {
isFinished = true;
return returnPage(count);
}
long start = System.nanoTime();
try {
while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) {
if (!iterator.hasNext()) {
isFinished = true;
return returnPage(count);
}

InternalRow row = iterator.next();
pageBuilder.declarePosition();
count++;
for (int i = 0; i < prestoColumnTypes.size(); i++) {
BlockBuilder output = pageBuilder.getBlockBuilder(i);
appendTo(
prestoColumnTypes.get(i),
paimonColumnTypes.get(i),
InternalRowUtils.get(row, i, paimonColumnTypes.get(i)),
output);
InternalRow row = iterator.next();
pageBuilder.declarePosition();
count++;
for (int i = 0; i < prestoColumnTypes.size(); i++) {
BlockBuilder output = pageBuilder.getBlockBuilder(i);
appendTo(
prestoColumnTypes.get(i),
paimonColumnTypes.get(i),
InternalRowUtils.get(row, i, paimonColumnTypes.get(i)),
output);
}
}
}

return returnPage(count);
return returnPage(count);
} finally {
readTimeNanos += System.nanoTime() - start;
}
}

private Page returnPage(int count) {
Expand All @@ -169,6 +177,9 @@ private Page returnPage(int count) {
}
numReturn += count;
Page page = pageBuilder.build();
long pageSizeInBytes = page.getSizeInBytes();
completedBytes += pageSizeInBytes;
memoryUsageBytes = Math.max(memoryUsageBytes, pageSizeInBytes);
pageBuilder.reset();
return page;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand All @@ -44,7 +45,7 @@ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("a"),
Collections.emptyMap(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path);
String user = "user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand Down Expand Up @@ -143,7 +144,7 @@ protected QueryRunner createQueryRunner() throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("i"),
new HashMap<>(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
InnerTableWrite writer = table.newWrite("user");
Expand Down Expand Up @@ -174,7 +175,7 @@ protected QueryRunner createQueryRunner() throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("ts"),
new HashMap<>(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath5);
InnerTableWrite writer = table.newWrite("user");
Expand Down Expand Up @@ -204,7 +205,7 @@ protected QueryRunner createQueryRunner() throws Exception {
rowType.getFields(),
Collections.emptyList(),
Arrays.asList("c1", "c2"),
new HashMap<>(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath5);
InnerTableWrite writer = table.newWrite("user");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand All @@ -44,7 +45,7 @@ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("a"),
Collections.emptyMap(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path);
String user = "user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand Down Expand Up @@ -104,7 +105,7 @@ protected QueryRunner createQueryRunner() throws Exception {
rowType.getFields(),
Collections.singletonList("pt"),
Collections.emptyList(),
new HashMap<>(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
InnerTableWrite writer = table.newWrite("user");
Expand Down Expand Up @@ -133,7 +134,7 @@ protected QueryRunner createQueryRunner() throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("i"),
new HashMap<>(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
InnerTableWrite writer = table.newWrite("user");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand All @@ -44,7 +45,7 @@ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("a"),
Collections.emptyMap(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path);
String user = "user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected QueryRunner createQueryRunner() throws Exception {
rowType.getFields(),
Collections.singletonList("pt"),
Collections.emptyList(),
new HashMap<>(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
InnerTableWrite writer = table.newWrite("user");
Expand Down Expand Up @@ -134,7 +135,7 @@ protected QueryRunner createQueryRunner() throws Exception {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("i"),
new HashMap<>(),
ImmutableMap.of("bucket", "1"),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
InnerTableWrite writer = table.newWrite("user");
Expand Down

0 comments on commit d4c275c

Please sign in to comment.