From 24ada36d67479f6fbe17181655f669a73c122d84 Mon Sep 17 00:00:00 2001 From: Jiadong Bai Date: Thu, 19 Dec 2024 13:49:16 -0800 Subject: [PATCH] fix the append read --- .../result/iceberg/IcebergDocument.scala | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala index 571bd53c34..b8d6af6f6f 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -49,32 +49,44 @@ class IcebergDocument[T >: Null <: AnyRef]( override def get(): Iterator[T] = { new Iterator[T] { private var table: Option[Table] = loadTable() - private var currentSnapshot: Option[Snapshot] = - table.flatMap(t => Option(t.currentSnapshot())) + private var lastSnapshotId: Option[Long] = None private var recordIterator: Iterator[T] = loadRecords() /** - * Loads the table, handling cases where it may not exist. - */ + * Loads the table, handling cases where it may not exist. + */ private def loadTable(): Option[Table] = { - IcebergUtil.loadTable( - catalog, - tableNamespace, - tableName, - tableSchema, - createIfNotExist = false - ) + IcebergUtil.loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = false) } /** - * Loads all records from the current snapshot. - */ + * Loads records incrementally using `newIncrementalAppendScan` from the last snapshot ID. + */ private def loadRecords(): Iterator[T] = { table match { - case Some(t) if currentSnapshot.isDefined => + case Some(t) => try { - val records: CloseableIterable[Record] = IcebergGenerics.read(t).build() + val currentSnapshot = Option(t.currentSnapshot()) + val currentSnapshotId = currentSnapshot.map(_.snapshotId()) + + val records: CloseableIterable[Record] = (lastSnapshotId, currentSnapshotId) match { + case (Some(lastId), Some(currId)) if lastId != currId => + // Perform incremental append scan if snapshot IDs are different + IcebergGenerics.read(t).appendsAfter(lastId).build() + + case (None, Some(_)) => + // First read, perform a full scan + IcebergGenerics.read(t).build() + + case _ => + // No new data; return an empty iterator + CloseableIterable.empty() + } + + // Update the last snapshot ID to the current one + lastSnapshotId = currentSnapshotId records.iterator().asScala.map(record => deserde(tableSchema, record)) + } catch { case _: java.io.FileNotFoundException => println("Metadata file not found. Returning an empty iterator.") @@ -84,6 +96,7 @@ class IcebergDocument[T >: Null <: AnyRef]( e.printStackTrace() Iterator.empty } + case _ => Iterator.empty } } @@ -95,15 +108,8 @@ class IcebergDocument[T >: Null <: AnyRef]( // Refresh the table and check for new commits table = loadTable() table.foreach(_.refresh()) - val newSnapshot = table.flatMap(t => Option(t.currentSnapshot())) - - if (newSnapshot != currentSnapshot) { - currentSnapshot = newSnapshot - recordIterator = loadRecords() - recordIterator.hasNext - } else { - false - } + recordIterator = loadRecords() + recordIterator.hasNext } }