diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala index 96dbaa5180..571bd53c34 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -2,9 +2,10 @@ package edu.uci.ics.amber.core.storage.result.iceberg import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument} import edu.uci.ics.amber.util.IcebergUtil -import org.apache.iceberg.Table +import org.apache.iceberg.{Snapshot, Table} import org.apache.iceberg.catalog.{Catalog, TableIdentifier} import org.apache.iceberg.data.{IcebergGenerics, Record} +import org.apache.iceberg.exceptions.NoSuchTableException import org.apache.iceberg.io.CloseableIterable import java.net.URI @@ -26,7 +27,11 @@ class IcebergDocument[T >: Null <: AnyRef]( * Returns the URI of the table location. */ override def getURI: URI = { - val table = IcebergUtil.loadOrCreateTable(catalog, tableNamespace, tableName, tableSchema) + val table = IcebergUtil + .loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = false) + .getOrElse( + throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist") + ) URI.create(table.location()) } @@ -41,36 +46,45 @@ class IcebergDocument[T >: Null <: AnyRef]( } } - /** - * Returns an iterator that iterates over all records in the table, including new records - * from concurrent writers as they commit. - */ - override def get(): Iterator[T] = + override def get(): Iterator[T] = { new Iterator[T] { - private val table = - IcebergUtil.loadOrCreateTable(catalog, tableNamespace, tableName, tableSchema) - private var currentSnapshot = table.currentSnapshot() - private var recordIterator = loadRecords() + private var table: Option[Table] = loadTable() + private var currentSnapshot: Option[Snapshot] = + table.flatMap(t => Option(t.currentSnapshot())) + private var recordIterator: Iterator[T] = loadRecords() + + /** + * Loads the table, handling cases where it may not exist. + */ + private def loadTable(): Option[Table] = { + IcebergUtil.loadTable( + catalog, + tableNamespace, + tableName, + tableSchema, + createIfNotExist = false + ) + } /** * Loads all records from the current snapshot. */ private def loadRecords(): Iterator[T] = { - if (currentSnapshot != null) { - try { - val records: CloseableIterable[Record] = IcebergGenerics.read(table).build() - records.iterator().asScala.map(record => deserde(tableSchema, record)) - } catch { - case _: java.io.FileNotFoundException => - println("Metadata file not found. Returning an empty iterator.") - Iterator.empty - case e: Exception => - println(s"Error during record loading: ${e.getMessage}") - e.printStackTrace() - Iterator.empty - } - } else { - Iterator.empty + table match { + case Some(t) if currentSnapshot.isDefined => + try { + val records: CloseableIterable[Record] = IcebergGenerics.read(t).build() + records.iterator().asScala.map(record => deserde(tableSchema, record)) + } catch { + case _: java.io.FileNotFoundException => + println("Metadata file not found. Returning an empty iterator.") + Iterator.empty + case e: Exception => + println(s"Error during record loading: ${e.getMessage}") + e.printStackTrace() + Iterator.empty + } + case _ => Iterator.empty } } @@ -79,8 +93,10 @@ class IcebergDocument[T >: Null <: AnyRef]( true } else { // Refresh the table and check for new commits - table.refresh() - val newSnapshot = table.currentSnapshot() + table = loadTable() + table.foreach(_.refresh()) + val newSnapshot = table.flatMap(t => Option(t.currentSnapshot())) + if (newSnapshot != currentSnapshot) { currentSnapshot = newSnapshot recordIterator = loadRecords() @@ -91,8 +107,12 @@ class IcebergDocument[T >: Null <: AnyRef]( } } - override def next(): T = recordIterator.next() + override def next(): T = { + if (!hasNext) throw new NoSuchElementException("No more records available") + recordIterator.next() + } } + } /** * Returns a BufferedItemWriter for writing data to the table. diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 5c8f27ecc0..462fe87d7a 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -27,7 +27,9 @@ class IcebergTableWriter[T]( // Load the Iceberg table private val table: Table = - IcebergUtil.loadOrCreateTable(catalog, tableNamespace, tableName, tableSchema) + IcebergUtil + .loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = true) + .get override def open(): Unit = withLock { diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala index 4927c641a0..3887917118 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala @@ -11,6 +11,7 @@ import org.apache.iceberg.types.Type.PrimitiveType import org.apache.iceberg.{CatalogProperties, Table, Schema => IcebergSchema} import java.net.URI +import java.nio.ByteBuffer import java.sql.Timestamp import java.time.LocalDateTime import java.time.ZoneId @@ -22,8 +23,8 @@ object IcebergUtil { * Creates and initializes a JdbcCatalog with the given parameters. * * @param catalogName The name of the catalog. - * @param warehouseUri The warehouse directory path. - * @param jdbcUri The JDBC URI for the catalog. + * @param warehouseUri The warehouse directory path. + * @param jdbcUri The JDBC URI for the catalog. * @param jdbcUser The JDBC username. * @param jdbcPassword The JDBC password. * @return The initialized JdbcCatalog. @@ -49,17 +50,21 @@ object IcebergUtil { catalog } - def loadOrCreateTable( + def loadTable( catalog: Catalog, tableNamespace: String, tableName: String, - tableSchema: IcebergSchema - ): Table = { + tableSchema: IcebergSchema, + createIfNotExist: Boolean + ): Option[Table] = { val identifier = TableIdentifier.of(tableNamespace, tableName) if (!catalog.tableExists(identifier)) { - catalog.createTable(identifier, tableSchema) + if (!createIfNotExist) { + return None + } + Some(catalog.createTable(identifier, tableSchema)) } else { - catalog.loadTable(identifier) + Some(catalog.loadTable(identifier)) } } @@ -72,7 +77,7 @@ object IcebergUtil { def toIcebergSchema(amberSchema: Schema): IcebergSchema = { val icebergFields = amberSchema.getAttributes.zipWithIndex.map { case (attribute, index) => - Types.NestedField.required(index + 1, attribute.getName, toIcebergType(attribute.getType)) + Types.NestedField.optional(index + 1, attribute.getName, toIcebergType(attribute.getType)) } new IcebergSchema(icebergFields.asJava) } @@ -98,21 +103,22 @@ object IcebergUtil { } /** - * Converts a custom Amber `Tuple` to an Iceberg `GenericRecord`. + * Converts a custom Amber `Tuple` to an Iceberg `GenericRecord`, handling `null` values. * * @param tuple The custom Amber Tuple. * @return An Iceberg GenericRecord. */ def toGenericRecord(tuple: Tuple): Record = { - // Convert the Amber schema to an Iceberg schema val icebergSchema = toIcebergSchema(tuple.schema) val record = GenericRecord.create(icebergSchema) tuple.schema.getAttributes.zipWithIndex.foreach { case (attribute, index) => val value = tuple.getField[AnyRef](index) match { - case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime - case other => other + case null => null + case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + case other => other } record.setField(attribute.getName, value) } @@ -121,17 +127,22 @@ object IcebergUtil { } /** - * Converts an Iceberg `Record` to an Amber `Tuple`. + * Converts an Iceberg `Record` to an Amber `Tuple`, handling `null` values. * - * @param record The Iceberg Record. + * @param record The Iceberg Record. * @param amberSchema The corresponding Amber Schema. * @return An Amber Tuple. */ def fromRecord(record: Record, amberSchema: Schema): Tuple = { val fieldValues = amberSchema.getAttributes.map { attribute => val value = record.getField(attribute.getName) match { + case null => null case ldt: LocalDateTime => Timestamp.valueOf(ldt) - case other => other + case buffer: ByteBuffer => + val bytes = new Array[Byte](buffer.remaining()) + buffer.get(bytes) + bytes + case other => other } value } diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala index 845aa16441..9dd13e4994 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala @@ -42,9 +42,43 @@ trait VirtualDocumentSpec[T] extends AnyFlatSpec with BeforeAndAfterEach { // Read items back val retrievedItems = document.get().toList - assert(retrievedItems == items, "The retrieved items should match the written items.") + assert(retrievedItems == items) } + "VirtualDocument" should "read items while writer is writing new data" in { + val allItems = generateSampleItems() + + // Split the items into two batches + val (batch1, batch2) = allItems.splitAt(allItems.length / 2) + + // Create a reader before any data is written + val reader = document.get() + assert(!reader.hasNext, "Reader should initially have no data.") + + // Write the first batch + val writer = document.writer() + writer.open() + batch1.foreach(writer.putOne) + writer.close() + + // The reader should detect and read the first batch + val retrievedBatch1 = reader.take(batch1.length).toList + assert(retrievedBatch1.toSet == batch1.toSet, "Reader should read the first batch correctly.") + + // Write the second batch + val writer2 = document.writer() + writer2.open() + batch2.foreach(writer2.putOne) + writer2.close() + + // The reader should detect and read the second batch + val retrievedBatch2 = reader.toList + assert(retrievedBatch2.toSet == batch2.toSet, "Reader should read the second batch correctly.") + + // Verify that the combined retrieved items match the original items + val retrievedItems = retrievedBatch1 ++ retrievedBatch2 + assert(retrievedItems.toSet == allItems.toSet, "Reader should read all items correctly.") + } it should "clear the document" in { val items = generateSampleItems() diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala index c84f339a23..8195c798ab 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -15,13 +15,16 @@ import java.util.UUID class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] { - // Define Amber Schema + // Define Amber Schema with all possible attribute types val amberSchema: Schema = Schema( List( - new Attribute("id", AttributeType.LONG), - new Attribute("name", AttributeType.STRING), - new Attribute("score", AttributeType.DOUBLE), - new Attribute("timestamp", AttributeType.TIMESTAMP) + new Attribute("col-string", AttributeType.STRING), + new Attribute("col-int", AttributeType.INTEGER), + new Attribute("col-bool", AttributeType.BOOLEAN), + new Attribute("col-long", AttributeType.LONG), + new Attribute("col-double", AttributeType.DOUBLE), + new Attribute("col-timestamp", AttributeType.TIMESTAMP) +// new Attribute("col-binary", AttributeType.BINARY) ) ) @@ -65,7 +68,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] { ) } - // Implementation of isDocumentCleared + // Implementation of isDocumentClearedgetSam override def isDocumentCleared: Boolean = { val identifier = TableIdentifier.of(tableNamespace, tableName) !catalog.tableExists(identifier) @@ -76,18 +79,45 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] { List( Tuple .builder(amberSchema) - .add("id", AttributeType.LONG, 1L) - .add("name", AttributeType.STRING, "Alice") - .add("score", AttributeType.DOUBLE, 95.5) - .add("timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis())) + .add("col-string", AttributeType.STRING, "Hello World") + .add("col-int", AttributeType.INTEGER, 42) + .add("col-bool", AttributeType.BOOLEAN, true) + .add("col-long", AttributeType.LONG, 12345678901234L) + .add("col-double", AttributeType.DOUBLE, 3.14159) + .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis())) +// .add("col-binary", AttributeType.BINARY, Array[Byte](1, 2, 3, 4, 5)) .build(), Tuple .builder(amberSchema) - .add("id", AttributeType.LONG, 2L) - .add("name", AttributeType.STRING, "Bob") - .add("score", AttributeType.DOUBLE, 88.0) - .add("timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis())) + .add("col-string", AttributeType.STRING, "") + .add("col-int", AttributeType.INTEGER, -1) + .add("col-bool", AttributeType.BOOLEAN, false) + .add("col-long", AttributeType.LONG, -98765432109876L) + .add("col-double", AttributeType.DOUBLE, -0.001) + .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(0L)) +// .add("col-binary", AttributeType.BINARY, Array[Byte]()) .build() +// +// Tuple +// .builder(amberSchema) +// .add("col-string", AttributeType.STRING, "Special Characters: \n\t\r") +// .add("col-int", AttributeType.INTEGER, Int.MaxValue) +// .add("col-bool", AttributeType.BOOLEAN, true) +// .add("col-long", AttributeType.LONG, Long.MaxValue) +// .add("col-double", AttributeType.DOUBLE, Double.MaxValue) +// .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(1234567890L)) +//// .add("col-binary", AttributeType.BINARY, Array.fill[Byte](1000)('a')) +// .build(), +// Tuple +// .builder(amberSchema) +// .add("col-string", AttributeType.STRING, null) +// .add("col-int", AttributeType.INTEGER, null) +// .add("col-bool", AttributeType.BOOLEAN, null) +// .add("col-long", AttributeType.LONG, null) +// .add("col-double", AttributeType.DOUBLE, null) +// .add("col-timestamp", AttributeType.TIMESTAMP, null) +//// .add("col-binary", AttributeType.BINARY, null) +// .build() ) } }