Skip to content

Commit

Permalink
fix the append read
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Dec 19, 2024
1 parent 0dded73 commit 24ada36
Showing 1 changed file with 30 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,44 @@ class IcebergDocument[T >: Null <: AnyRef](
override def get(): Iterator[T] = {
new Iterator[T] {
private var table: Option[Table] = loadTable()
private var currentSnapshot: Option[Snapshot] =
table.flatMap(t => Option(t.currentSnapshot()))
private var lastSnapshotId: Option[Long] = None
private var recordIterator: Iterator[T] = loadRecords()

/**
* Loads the table, handling cases where it may not exist.
*/
* Loads the table, handling cases where it may not exist.
*/
private def loadTable(): Option[Table] = {
IcebergUtil.loadTable(
catalog,
tableNamespace,
tableName,
tableSchema,
createIfNotExist = false
)
IcebergUtil.loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = false)
}

/**
* Loads all records from the current snapshot.
*/
* Loads records incrementally using `newIncrementalAppendScan` from the last snapshot ID.
*/
private def loadRecords(): Iterator[T] = {
table match {
case Some(t) if currentSnapshot.isDefined =>
case Some(t) =>
try {
val records: CloseableIterable[Record] = IcebergGenerics.read(t).build()
val currentSnapshot = Option(t.currentSnapshot())
val currentSnapshotId = currentSnapshot.map(_.snapshotId())

val records: CloseableIterable[Record] = (lastSnapshotId, currentSnapshotId) match {
case (Some(lastId), Some(currId)) if lastId != currId =>
// Perform incremental append scan if snapshot IDs are different
IcebergGenerics.read(t).appendsAfter(lastId).build()

case (None, Some(_)) =>
// First read, perform a full scan
IcebergGenerics.read(t).build()

case _ =>
// No new data; return an empty iterator
CloseableIterable.empty()
}

// Update the last snapshot ID to the current one
lastSnapshotId = currentSnapshotId
records.iterator().asScala.map(record => deserde(tableSchema, record))

} catch {
case _: java.io.FileNotFoundException =>
println("Metadata file not found. Returning an empty iterator.")
Expand All @@ -84,6 +96,7 @@ class IcebergDocument[T >: Null <: AnyRef](
e.printStackTrace()
Iterator.empty
}

case _ => Iterator.empty
}
}
Expand All @@ -95,15 +108,8 @@ class IcebergDocument[T >: Null <: AnyRef](
// Refresh the table and check for new commits
table = loadTable()
table.foreach(_.refresh())
val newSnapshot = table.flatMap(t => Option(t.currentSnapshot()))

if (newSnapshot != currentSnapshot) {
currentSnapshot = newSnapshot
recordIterator = loadRecords()
recordIterator.hasNext
} else {
false
}
recordIterator = loadRecords()
recordIterator.hasNext
}
}

Expand Down

0 comments on commit 24ada36

Please sign in to comment.