diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala index dea62ba6c00..af6fa86fc68 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala @@ -73,8 +73,15 @@ case class Tuple @JsonCreator() ( override def equals(obj: Any): Boolean = obj match { - case that: Tuple => (this.getFields sameElements that.getFields) && this.schema == that.schema - case _ => false + case that: Tuple => + this.schema == that.schema && + this.getFields.zip(that.getFields).forall { + case (field1: Array[Byte], field2: Array[Byte]) => + field1.sameElements(field2) // for Binary, use sameElements instead of == to compare + case (field1, field2) => + field1 == field2 + } + case _ => false } def getPartialTuple(attributeNames: List[String]): Tuple = { diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala index 0c9b27be99c..9b816f31ebd 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala @@ -147,6 +147,59 @@ trait VirtualDocumentSpec[T] extends AnyFlatSpec with BeforeAndAfterEach { ) } + it should "allow a reader to read data while a writer is writing items incrementally" in { + val allItems = generateSampleItems() + val batchSize = allItems.length / 5 // Divide items into 5 incremental batches + + // Split items into 5 batches + val itemBatches = allItems.grouped(batchSize).toList + + // Flag to indicate when writing is done + @volatile var writingComplete = false + + // Start the writer in a Future to write batches with delays + val writerFuture = Future { + val writer = document.writer() + writer.open() + try { + itemBatches.foreach { batch => + batch.foreach(writer.putOne) + Thread.sleep(500) // Simulate delay between batches + } + } finally { + writer.close() + writingComplete = true + } + } + + // Start the reader in another Future + val readerFuture = Future { + val reader = document.get() + val retrievedItems = scala.collection.mutable.ListBuffer[T]() + + // Keep checking for new data until writing is complete and no more items are available + while (!writingComplete || reader.hasNext) { + if (reader.hasNext) { + retrievedItems += reader.next() + } else { + Thread.sleep(200) // Wait before retrying to avoid busy-waiting + } + } + + retrievedItems.toList + } + + // Wait for both writer and reader to complete + val retrievedItems = Await.result(readerFuture, 30.seconds) + Await.result(writerFuture, 30.seconds) + + // Verify that the retrieved items match the original items + assert( + retrievedItems.toSet == allItems.toSet, + "All items should be read correctly while writing is happening concurrently." + ) + } + /** * Generates a sample list of items for testing. * Subclasses should override this to provide their specific sample items. diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 8db2d87cc5d..88b4ee67755 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -23,8 +23,8 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] { new Attribute("col-bool", AttributeType.BOOLEAN), new Attribute("col-long", AttributeType.LONG), new Attribute("col-double", AttributeType.DOUBLE), - new Attribute("col-timestamp", AttributeType.TIMESTAMP) -// new Attribute("col-binary", AttributeType.BINARY) + new Attribute("col-timestamp", AttributeType.TIMESTAMP), + new Attribute("col-binary", AttributeType.BINARY) ) ) @@ -84,6 +84,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] { .add("col-long", AttributeType.LONG, 12345678901234L) .add("col-double", AttributeType.DOUBLE, 3.14159) .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis())) + .add("col-binary", AttributeType.BINARY, Array[Byte](0, 1, 2, 3, 4, 5, 6, 7)) .build(), Tuple .builder(amberSchema) @@ -93,6 +94,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] { .add("col-long", AttributeType.LONG, -98765432109876L) .add("col-double", AttributeType.DOUBLE, -0.001) .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(0L)) + .add("col-binary", AttributeType.BINARY, Array[Byte](127, -128, 0, 64)) .build(), Tuple .builder(amberSchema) @@ -102,22 +104,57 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] { .add("col-long", AttributeType.LONG, Long.MaxValue) .add("col-double", AttributeType.DOUBLE, Double.MaxValue) .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(1234567890L)) + .add("col-binary", AttributeType.BINARY, Array[Byte](1, 2, 3, 4, 5)) .build() ) - // Generate additional tuples with random data + // Function to generate random binary data + def generateRandomBinary(size: Int): Array[Byte] = { + val array = new Array[Byte](size) + scala.util.Random.nextBytes(array) + array + } + + // Generate additional tuples with random data and occasional nulls val additionalTuples = (1 to 20000).map { i => Tuple .builder(amberSchema) - .add("col-string", AttributeType.STRING, s"Generated String $i") - .add("col-int", AttributeType.INTEGER, i) - .add("col-bool", AttributeType.BOOLEAN, i % 2 == 0) - .add("col-long", AttributeType.LONG, i.toLong * 1000000L) - .add("col-double", AttributeType.DOUBLE, i * 0.12345) + .add( + "col-string", + AttributeType.STRING, + if (i % 7 == 0) null else s"Generated String $i" + ) + .add( + "col-int", + AttributeType.INTEGER, + if (i % 5 == 0) null else i + ) + .add( + "col-bool", + AttributeType.BOOLEAN, + if (i % 6 == 0) null else i % 2 == 0 + ) + .add( + "col-long", + AttributeType.LONG, + if (i % 4 == 0) null else i.toLong * 1000000L + ) + .add( + "col-double", + AttributeType.DOUBLE, + if (i % 3 == 0) null else i * 0.12345 + ) .add( "col-timestamp", AttributeType.TIMESTAMP, - new Timestamp(System.currentTimeMillis() + i * 1000L) + if (i % 8 == 0) null + else new Timestamp(System.currentTimeMillis() + i * 1000L) + ) + .add( + "col-binary", + AttributeType.BINARY, + if (i % 9 == 0) null + else generateRandomBinary(scala.util.Random.nextInt(10) + 1) ) .build() } diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/util/IcebergUtilSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/util/IcebergUtilSpec.scala new file mode 100644 index 00000000000..9549c22a0ac --- /dev/null +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/util/IcebergUtilSpec.scala @@ -0,0 +1,184 @@ +package edu.uci.ics.amber.util + +import edu.uci.ics.amber.core.tuple.{AttributeType, Schema, Tuple} +import org.apache.iceberg.types.Types +import org.apache.iceberg.{Schema => IcebergSchema} +import org.apache.iceberg.data.GenericRecord +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.ByteBuffer +import java.sql.Timestamp +import java.time.{LocalDateTime, ZoneId} +import scala.jdk.CollectionConverters._ + +class IcebergUtilSpec extends AnyFlatSpec { + + val texeraSchema: Schema = Schema + .builder() + .add("test-1", AttributeType.INTEGER) + .add("test-2", AttributeType.LONG) + .add("test-3", AttributeType.BOOLEAN) + .add("test-4", AttributeType.DOUBLE) + .add("test-5", AttributeType.TIMESTAMP) + .add("test-6", AttributeType.STRING) + .add("test-7", AttributeType.BINARY) + .build() + + val icebergSchema: IcebergSchema = new IcebergSchema( + List( + Types.NestedField.optional(1, "test-1", Types.IntegerType.get()), + Types.NestedField.optional(2, "test-2", Types.LongType.get()), + Types.NestedField.optional(3, "test-3", Types.BooleanType.get()), + Types.NestedField.optional(4, "test-4", Types.DoubleType.get()), + Types.NestedField.optional(5, "test-5", Types.TimestampType.withoutZone()), + Types.NestedField.optional(6, "test-6", Types.StringType.get()), + Types.NestedField.optional(7, "test-7", Types.BinaryType.get()) + ).asJava + ) + + behavior of "IcebergUtil" + + it should "convert from AttributeType to Iceberg Type correctly" in { + assert(IcebergUtil.toIcebergType(AttributeType.INTEGER) == Types.IntegerType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.LONG) == Types.LongType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.BOOLEAN) == Types.BooleanType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.DOUBLE) == Types.DoubleType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.TIMESTAMP) == Types.TimestampType.withoutZone()) + assert(IcebergUtil.toIcebergType(AttributeType.STRING) == Types.StringType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.BINARY) == Types.BinaryType.get()) + } + + it should "convert from Iceberg Type to AttributeType correctly" in { + assert(IcebergUtil.fromIcebergType(Types.IntegerType.get()) == AttributeType.INTEGER) + assert(IcebergUtil.fromIcebergType(Types.LongType.get()) == AttributeType.LONG) + assert(IcebergUtil.fromIcebergType(Types.BooleanType.get()) == AttributeType.BOOLEAN) + assert(IcebergUtil.fromIcebergType(Types.DoubleType.get()) == AttributeType.DOUBLE) + assert( + IcebergUtil.fromIcebergType(Types.TimestampType.withoutZone()) == AttributeType.TIMESTAMP + ) + assert(IcebergUtil.fromIcebergType(Types.StringType.get()) == AttributeType.STRING) + assert(IcebergUtil.fromIcebergType(Types.BinaryType.get()) == AttributeType.BINARY) + } + + it should "convert from Texera Schema to Iceberg Schema correctly" in { + assert(IcebergUtil.toIcebergSchema(texeraSchema).sameSchema(icebergSchema)) + } + + it should "convert from Iceberg Schema to Texera Schema correctly" in { + assert(IcebergUtil.fromIcebergSchema(icebergSchema) == texeraSchema) + } + + it should "convert Texera Tuple to Iceberg GenericRecord correctly" in { + val tuple = Tuple + .builder(texeraSchema) + .addSequentially( + Array( + Int.box(42), + Long.box(123456789L), + Boolean.box(true), + Double.box(3.14), + new Timestamp(10000L), + "hello world", + Array[Byte](1, 2, 3, 4) + ) + ) + .build() + + val record = IcebergUtil.toGenericRecord(tuple) + + assert(record.getField("test-1") == 42) + assert(record.getField("test-2") == 123456789L) + assert(record.getField("test-3") == true) + assert(record.getField("test-4") == 3.14) + assert(record.getField("test-5") == new Timestamp(10000L).toLocalDateTime) + assert(record.getField("test-6") == "hello world") + assert(record.getField("test-7") == ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))) + + val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema) + assert(tupleFromRecord == tuple) + } + + it should "convert Texera Tuple with null values to Iceberg GenericRecord correctly" in { + val tuple = Tuple + .builder(texeraSchema) + .addSequentially( + Array( + Int.box(42), // Non-null + null, // Null Long + Boolean.box(true), // Non-null + null, // Null Double + null, // Null Timestamp + "hello world", // Non-null String + null // Null Binary + ) + ) + .build() + + val record = IcebergUtil.toGenericRecord(tuple) + + assert(record.getField("test-1") == 42) + assert(record.getField("test-2") == null) + assert(record.getField("test-3") == true) + assert(record.getField("test-4") == null) + assert(record.getField("test-5") == null) + assert(record.getField("test-6") == "hello world") + assert(record.getField("test-7") == null) + + val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema) + assert(tupleFromRecord == tuple) + } + + it should "convert a fully null Texera Tuple to Iceberg GenericRecord correctly" in { + val tuple = Tuple + .builder(texeraSchema) + .addSequentially( + Array( + null, // Null Integer + null, // Null Long + null, // Null Boolean + null, // Null Double + null, // Null Timestamp + null, // Null String + null // Null Binary + ) + ) + .build() + + val record = IcebergUtil.toGenericRecord(tuple) + + assert(record.getField("test-1") == null) + assert(record.getField("test-2") == null) + assert(record.getField("test-3") == null) + assert(record.getField("test-4") == null) + assert(record.getField("test-5") == null) + assert(record.getField("test-6") == null) + assert(record.getField("test-7") == null) + + val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema) + assert(tupleFromRecord == tuple) + } + + it should "convert Iceberg GenericRecord to Texera Tuple correctly" in { + val record = GenericRecord.create(icebergSchema) + record.setField("test-1", 42) + record.setField("test-2", 123456789L) + record.setField("test-3", true) + record.setField("test-4", 3.14) + record.setField( + "test-5", + LocalDateTime.ofInstant(new Timestamp(10000L).toInstant, ZoneId.systemDefault()) + ) + record.setField("test-6", "hello world") + record.setField("test-7", ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))) + + val tuple = IcebergUtil.fromRecord(record, texeraSchema) + + assert(tuple.getField[Integer]("test-1") == 42) + assert(tuple.getField[Long]("test-2") == 123456789L) + assert(tuple.getField[Boolean]("test-3") == true) + assert(tuple.getField[Double]("test-4") == 3.14) + assert(tuple.getField[Timestamp]("test-5") == new Timestamp(10000L)) + assert(tuple.getField[String]("test-6") == "hello world") + assert(tuple.getField[Array[Byte]]("test-7") sameElements Array[Byte](1, 2, 3, 4)) + } +}