Skip to content

Commit

Permalink
resolve the binary type issue
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Dec 20, 2024
1 parent c69a019 commit 385499b
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,15 @@ case class Tuple @JsonCreator() (

override def equals(obj: Any): Boolean =
obj match {
case that: Tuple => (this.getFields sameElements that.getFields) && this.schema == that.schema
case _ => false
case that: Tuple =>
this.schema == that.schema &&
this.getFields.zip(that.getFields).forall {
case (field1: Array[Byte], field2: Array[Byte]) =>
field1.sameElements(field2) // for Binary, use sameElements instead of == to compare
case (field1, field2) =>
field1 == field2
}
case _ => false
}

def getPartialTuple(attributeNames: List[String]): Tuple = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,59 @@ trait VirtualDocumentSpec[T] extends AnyFlatSpec with BeforeAndAfterEach {
)
}

it should "allow a reader to read data while a writer is writing items incrementally" in {
val allItems = generateSampleItems()
val batchSize = allItems.length / 5 // Divide items into 5 incremental batches

// Split items into 5 batches
val itemBatches = allItems.grouped(batchSize).toList

// Flag to indicate when writing is done
@volatile var writingComplete = false

// Start the writer in a Future to write batches with delays
val writerFuture = Future {
val writer = document.writer()
writer.open()
try {
itemBatches.foreach { batch =>
batch.foreach(writer.putOne)
Thread.sleep(500) // Simulate delay between batches
}
} finally {
writer.close()
writingComplete = true
}
}

// Start the reader in another Future
val readerFuture = Future {
val reader = document.get()
val retrievedItems = scala.collection.mutable.ListBuffer[T]()

// Keep checking for new data until writing is complete and no more items are available
while (!writingComplete || reader.hasNext) {
if (reader.hasNext) {
retrievedItems += reader.next()
} else {
Thread.sleep(200) // Wait before retrying to avoid busy-waiting
}
}

retrievedItems.toList
}

// Wait for both writer and reader to complete
val retrievedItems = Await.result(readerFuture, 30.seconds)
Await.result(writerFuture, 30.seconds)

// Verify that the retrieved items match the original items
assert(
retrievedItems.toSet == allItems.toSet,
"All items should be read correctly while writing is happening concurrently."
)
}

/**
* Generates a sample list of items for testing.
* Subclasses should override this to provide their specific sample items.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] {
new Attribute("col-bool", AttributeType.BOOLEAN),
new Attribute("col-long", AttributeType.LONG),
new Attribute("col-double", AttributeType.DOUBLE),
new Attribute("col-timestamp", AttributeType.TIMESTAMP)
// new Attribute("col-binary", AttributeType.BINARY)
new Attribute("col-timestamp", AttributeType.TIMESTAMP),
new Attribute("col-binary", AttributeType.BINARY)
)
)

Expand Down Expand Up @@ -84,6 +84,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] {
.add("col-long", AttributeType.LONG, 12345678901234L)
.add("col-double", AttributeType.DOUBLE, 3.14159)
.add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis()))
.add("col-binary", AttributeType.BINARY, Array[Byte](0, 1, 2, 3, 4, 5, 6, 7))
.build(),
Tuple
.builder(amberSchema)
Expand All @@ -93,6 +94,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] {
.add("col-long", AttributeType.LONG, -98765432109876L)
.add("col-double", AttributeType.DOUBLE, -0.001)
.add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(0L))
.add("col-binary", AttributeType.BINARY, Array[Byte](127, -128, 0, 64))
.build(),
Tuple
.builder(amberSchema)
Expand All @@ -102,22 +104,57 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] {
.add("col-long", AttributeType.LONG, Long.MaxValue)
.add("col-double", AttributeType.DOUBLE, Double.MaxValue)
.add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(1234567890L))
.add("col-binary", AttributeType.BINARY, Array[Byte](1, 2, 3, 4, 5))
.build()
)

// Generate additional tuples with random data
// Function to generate random binary data
def generateRandomBinary(size: Int): Array[Byte] = {
val array = new Array[Byte](size)
scala.util.Random.nextBytes(array)
array
}

// Generate additional tuples with random data and occasional nulls
val additionalTuples = (1 to 20000).map { i =>
Tuple
.builder(amberSchema)
.add("col-string", AttributeType.STRING, s"Generated String $i")
.add("col-int", AttributeType.INTEGER, i)
.add("col-bool", AttributeType.BOOLEAN, i % 2 == 0)
.add("col-long", AttributeType.LONG, i.toLong * 1000000L)
.add("col-double", AttributeType.DOUBLE, i * 0.12345)
.add(
"col-string",
AttributeType.STRING,
if (i % 7 == 0) null else s"Generated String $i"
)
.add(
"col-int",
AttributeType.INTEGER,
if (i % 5 == 0) null else i
)
.add(
"col-bool",
AttributeType.BOOLEAN,
if (i % 6 == 0) null else i % 2 == 0
)
.add(
"col-long",
AttributeType.LONG,
if (i % 4 == 0) null else i.toLong * 1000000L
)
.add(
"col-double",
AttributeType.DOUBLE,
if (i % 3 == 0) null else i * 0.12345
)
.add(
"col-timestamp",
AttributeType.TIMESTAMP,
new Timestamp(System.currentTimeMillis() + i * 1000L)
if (i % 8 == 0) null
else new Timestamp(System.currentTimeMillis() + i * 1000L)
)
.add(
"col-binary",
AttributeType.BINARY,
if (i % 9 == 0) null
else generateRandomBinary(scala.util.Random.nextInt(10) + 1)
)
.build()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package edu.uci.ics.amber.util

import edu.uci.ics.amber.core.tuple.{AttributeType, Schema, Tuple}
import org.apache.iceberg.types.Types
import org.apache.iceberg.{Schema => IcebergSchema}
import org.apache.iceberg.data.GenericRecord
import org.scalatest.flatspec.AnyFlatSpec

import java.nio.ByteBuffer
import java.sql.Timestamp
import java.time.{LocalDateTime, ZoneId}
import scala.jdk.CollectionConverters._

class IcebergUtilSpec extends AnyFlatSpec {

val texeraSchema: Schema = Schema
.builder()
.add("test-1", AttributeType.INTEGER)
.add("test-2", AttributeType.LONG)
.add("test-3", AttributeType.BOOLEAN)
.add("test-4", AttributeType.DOUBLE)
.add("test-5", AttributeType.TIMESTAMP)
.add("test-6", AttributeType.STRING)
.add("test-7", AttributeType.BINARY)
.build()

val icebergSchema: IcebergSchema = new IcebergSchema(
List(
Types.NestedField.optional(1, "test-1", Types.IntegerType.get()),
Types.NestedField.optional(2, "test-2", Types.LongType.get()),
Types.NestedField.optional(3, "test-3", Types.BooleanType.get()),
Types.NestedField.optional(4, "test-4", Types.DoubleType.get()),
Types.NestedField.optional(5, "test-5", Types.TimestampType.withoutZone()),
Types.NestedField.optional(6, "test-6", Types.StringType.get()),
Types.NestedField.optional(7, "test-7", Types.BinaryType.get())
).asJava
)

behavior of "IcebergUtil"

it should "convert from AttributeType to Iceberg Type correctly" in {
assert(IcebergUtil.toIcebergType(AttributeType.INTEGER) == Types.IntegerType.get())
assert(IcebergUtil.toIcebergType(AttributeType.LONG) == Types.LongType.get())
assert(IcebergUtil.toIcebergType(AttributeType.BOOLEAN) == Types.BooleanType.get())
assert(IcebergUtil.toIcebergType(AttributeType.DOUBLE) == Types.DoubleType.get())
assert(IcebergUtil.toIcebergType(AttributeType.TIMESTAMP) == Types.TimestampType.withoutZone())
assert(IcebergUtil.toIcebergType(AttributeType.STRING) == Types.StringType.get())
assert(IcebergUtil.toIcebergType(AttributeType.BINARY) == Types.BinaryType.get())
}

it should "convert from Iceberg Type to AttributeType correctly" in {
assert(IcebergUtil.fromIcebergType(Types.IntegerType.get()) == AttributeType.INTEGER)
assert(IcebergUtil.fromIcebergType(Types.LongType.get()) == AttributeType.LONG)
assert(IcebergUtil.fromIcebergType(Types.BooleanType.get()) == AttributeType.BOOLEAN)
assert(IcebergUtil.fromIcebergType(Types.DoubleType.get()) == AttributeType.DOUBLE)
assert(
IcebergUtil.fromIcebergType(Types.TimestampType.withoutZone()) == AttributeType.TIMESTAMP
)
assert(IcebergUtil.fromIcebergType(Types.StringType.get()) == AttributeType.STRING)
assert(IcebergUtil.fromIcebergType(Types.BinaryType.get()) == AttributeType.BINARY)
}

it should "convert from Texera Schema to Iceberg Schema correctly" in {
assert(IcebergUtil.toIcebergSchema(texeraSchema).sameSchema(icebergSchema))
}

it should "convert from Iceberg Schema to Texera Schema correctly" in {
assert(IcebergUtil.fromIcebergSchema(icebergSchema) == texeraSchema)
}

it should "convert Texera Tuple to Iceberg GenericRecord correctly" in {
val tuple = Tuple
.builder(texeraSchema)
.addSequentially(
Array(
Int.box(42),
Long.box(123456789L),
Boolean.box(true),
Double.box(3.14),
new Timestamp(10000L),
"hello world",
Array[Byte](1, 2, 3, 4)
)
)
.build()

val record = IcebergUtil.toGenericRecord(tuple)

assert(record.getField("test-1") == 42)
assert(record.getField("test-2") == 123456789L)
assert(record.getField("test-3") == true)
assert(record.getField("test-4") == 3.14)
assert(record.getField("test-5") == new Timestamp(10000L).toLocalDateTime)
assert(record.getField("test-6") == "hello world")
assert(record.getField("test-7") == ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)))

val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema)
assert(tupleFromRecord == tuple)
}

it should "convert Texera Tuple with null values to Iceberg GenericRecord correctly" in {
val tuple = Tuple
.builder(texeraSchema)
.addSequentially(
Array(
Int.box(42), // Non-null
null, // Null Long
Boolean.box(true), // Non-null
null, // Null Double
null, // Null Timestamp
"hello world", // Non-null String
null // Null Binary
)
)
.build()

val record = IcebergUtil.toGenericRecord(tuple)

assert(record.getField("test-1") == 42)
assert(record.getField("test-2") == null)
assert(record.getField("test-3") == true)
assert(record.getField("test-4") == null)
assert(record.getField("test-5") == null)
assert(record.getField("test-6") == "hello world")
assert(record.getField("test-7") == null)

val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema)
assert(tupleFromRecord == tuple)
}

it should "convert a fully null Texera Tuple to Iceberg GenericRecord correctly" in {
val tuple = Tuple
.builder(texeraSchema)
.addSequentially(
Array(
null, // Null Integer
null, // Null Long
null, // Null Boolean
null, // Null Double
null, // Null Timestamp
null, // Null String
null // Null Binary
)
)
.build()

val record = IcebergUtil.toGenericRecord(tuple)

assert(record.getField("test-1") == null)
assert(record.getField("test-2") == null)
assert(record.getField("test-3") == null)
assert(record.getField("test-4") == null)
assert(record.getField("test-5") == null)
assert(record.getField("test-6") == null)
assert(record.getField("test-7") == null)

val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema)
assert(tupleFromRecord == tuple)
}

it should "convert Iceberg GenericRecord to Texera Tuple correctly" in {
val record = GenericRecord.create(icebergSchema)
record.setField("test-1", 42)
record.setField("test-2", 123456789L)
record.setField("test-3", true)
record.setField("test-4", 3.14)
record.setField(
"test-5",
LocalDateTime.ofInstant(new Timestamp(10000L).toInstant, ZoneId.systemDefault())
)
record.setField("test-6", "hello world")
record.setField("test-7", ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)))

val tuple = IcebergUtil.fromRecord(record, texeraSchema)

assert(tuple.getField[Integer]("test-1") == 42)
assert(tuple.getField[Long]("test-2") == 123456789L)
assert(tuple.getField[Boolean]("test-3") == true)
assert(tuple.getField[Double]("test-4") == 3.14)
assert(tuple.getField[Timestamp]("test-5") == new Timestamp(10000L))
assert(tuple.getField[String]("test-6") == "hello world")
assert(tuple.getField[Array[Byte]]("test-7") sameElements Array[Byte](1, 2, 3, 4))
}
}

0 comments on commit 385499b

Please sign in to comment.