Skip to content

Commit

Permalink
finish 1st viable version
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Dec 19, 2024
1 parent 30ab678 commit 0dded73
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package edu.uci.ics.amber.core.storage.result.iceberg

import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument}
import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.Table
import org.apache.iceberg.{Snapshot, Table}
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.data.{IcebergGenerics, Record}
import org.apache.iceberg.exceptions.NoSuchTableException
import org.apache.iceberg.io.CloseableIterable

import java.net.URI
Expand All @@ -26,7 +27,11 @@ class IcebergDocument[T >: Null <: AnyRef](
* Returns the URI of the table location.
*/
override def getURI: URI = {
val table = IcebergUtil.loadOrCreateTable(catalog, tableNamespace, tableName, tableSchema)
val table = IcebergUtil
.loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = false)
.getOrElse(
throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist")
)
URI.create(table.location())
}

Expand All @@ -41,36 +46,45 @@ class IcebergDocument[T >: Null <: AnyRef](
}
}

/**
* Returns an iterator that iterates over all records in the table, including new records
* from concurrent writers as they commit.
*/
override def get(): Iterator[T] =
override def get(): Iterator[T] = {
new Iterator[T] {
private val table =
IcebergUtil.loadOrCreateTable(catalog, tableNamespace, tableName, tableSchema)
private var currentSnapshot = table.currentSnapshot()
private var recordIterator = loadRecords()
private var table: Option[Table] = loadTable()
private var currentSnapshot: Option[Snapshot] =
table.flatMap(t => Option(t.currentSnapshot()))
private var recordIterator: Iterator[T] = loadRecords()

/**
* Loads the table, handling cases where it may not exist.
*/
private def loadTable(): Option[Table] = {
IcebergUtil.loadTable(
catalog,
tableNamespace,
tableName,
tableSchema,
createIfNotExist = false
)
}

/**
* Loads all records from the current snapshot.
*/
private def loadRecords(): Iterator[T] = {
if (currentSnapshot != null) {
try {
val records: CloseableIterable[Record] = IcebergGenerics.read(table).build()
records.iterator().asScala.map(record => deserde(tableSchema, record))
} catch {
case _: java.io.FileNotFoundException =>
println("Metadata file not found. Returning an empty iterator.")
Iterator.empty
case e: Exception =>
println(s"Error during record loading: ${e.getMessage}")
e.printStackTrace()
Iterator.empty
}
} else {
Iterator.empty
table match {
case Some(t) if currentSnapshot.isDefined =>
try {
val records: CloseableIterable[Record] = IcebergGenerics.read(t).build()
records.iterator().asScala.map(record => deserde(tableSchema, record))
} catch {
case _: java.io.FileNotFoundException =>
println("Metadata file not found. Returning an empty iterator.")
Iterator.empty
case e: Exception =>
println(s"Error during record loading: ${e.getMessage}")
e.printStackTrace()
Iterator.empty
}
case _ => Iterator.empty
}
}

Expand All @@ -79,8 +93,10 @@ class IcebergDocument[T >: Null <: AnyRef](
true
} else {
// Refresh the table and check for new commits
table.refresh()
val newSnapshot = table.currentSnapshot()
table = loadTable()
table.foreach(_.refresh())
val newSnapshot = table.flatMap(t => Option(t.currentSnapshot()))

if (newSnapshot != currentSnapshot) {
currentSnapshot = newSnapshot
recordIterator = loadRecords()
Expand All @@ -91,8 +107,12 @@ class IcebergDocument[T >: Null <: AnyRef](
}
}

override def next(): T = recordIterator.next()
override def next(): T = {
if (!hasNext) throw new NoSuchElementException("No more records available")
recordIterator.next()
}
}
}

/**
* Returns a BufferedItemWriter for writing data to the table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class IcebergTableWriter[T](

// Load the Iceberg table
private val table: Table =
IcebergUtil.loadOrCreateTable(catalog, tableNamespace, tableName, tableSchema)
IcebergUtil
.loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = true)
.get

override def open(): Unit =
withLock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.iceberg.types.Type.PrimitiveType
import org.apache.iceberg.{CatalogProperties, Table, Schema => IcebergSchema}

import java.net.URI
import java.nio.ByteBuffer
import java.sql.Timestamp
import java.time.LocalDateTime
import java.time.ZoneId
Expand All @@ -22,8 +23,8 @@ object IcebergUtil {
* Creates and initializes a JdbcCatalog with the given parameters.
*
* @param catalogName The name of the catalog.
* @param warehouseUri The warehouse directory path.
* @param jdbcUri The JDBC URI for the catalog.
* @param warehouseUri The warehouse directory path.
* @param jdbcUri The JDBC URI for the catalog.
* @param jdbcUser The JDBC username.
* @param jdbcPassword The JDBC password.
* @return The initialized JdbcCatalog.
Expand All @@ -49,17 +50,21 @@ object IcebergUtil {
catalog
}

def loadOrCreateTable(
def loadTable(
catalog: Catalog,
tableNamespace: String,
tableName: String,
tableSchema: IcebergSchema
): Table = {
tableSchema: IcebergSchema,
createIfNotExist: Boolean
): Option[Table] = {
val identifier = TableIdentifier.of(tableNamespace, tableName)
if (!catalog.tableExists(identifier)) {
catalog.createTable(identifier, tableSchema)
if (!createIfNotExist) {
return None
}
Some(catalog.createTable(identifier, tableSchema))
} else {
catalog.loadTable(identifier)
Some(catalog.loadTable(identifier))
}
}

Expand All @@ -72,7 +77,7 @@ object IcebergUtil {
def toIcebergSchema(amberSchema: Schema): IcebergSchema = {
val icebergFields = amberSchema.getAttributes.zipWithIndex.map {
case (attribute, index) =>
Types.NestedField.required(index + 1, attribute.getName, toIcebergType(attribute.getType))
Types.NestedField.optional(index + 1, attribute.getName, toIcebergType(attribute.getType))
}
new IcebergSchema(icebergFields.asJava)
}
Expand All @@ -98,21 +103,22 @@ object IcebergUtil {
}

/**
* Converts a custom Amber `Tuple` to an Iceberg `GenericRecord`.
* Converts a custom Amber `Tuple` to an Iceberg `GenericRecord`, handling `null` values.
*
* @param tuple The custom Amber Tuple.
* @return An Iceberg GenericRecord.
*/
def toGenericRecord(tuple: Tuple): Record = {
// Convert the Amber schema to an Iceberg schema
val icebergSchema = toIcebergSchema(tuple.schema)
val record = GenericRecord.create(icebergSchema)

tuple.schema.getAttributes.zipWithIndex.foreach {
case (attribute, index) =>
val value = tuple.getField[AnyRef](index) match {
case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
case other => other
case null => null
case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
case other => other
}
record.setField(attribute.getName, value)
}
Expand All @@ -121,17 +127,22 @@ object IcebergUtil {
}

/**
* Converts an Iceberg `Record` to an Amber `Tuple`.
* Converts an Iceberg `Record` to an Amber `Tuple`, handling `null` values.
*
* @param record The Iceberg Record.
* @param record The Iceberg Record.
* @param amberSchema The corresponding Amber Schema.
* @return An Amber Tuple.
*/
def fromRecord(record: Record, amberSchema: Schema): Tuple = {
val fieldValues = amberSchema.getAttributes.map { attribute =>
val value = record.getField(attribute.getName) match {
case null => null
case ldt: LocalDateTime => Timestamp.valueOf(ldt)
case other => other
case buffer: ByteBuffer =>
val bytes = new Array[Byte](buffer.remaining())
buffer.get(bytes)
bytes
case other => other
}
value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,43 @@ trait VirtualDocumentSpec[T] extends AnyFlatSpec with BeforeAndAfterEach {
// Read items back
val retrievedItems = document.get().toList

assert(retrievedItems == items, "The retrieved items should match the written items.")
assert(retrievedItems == items)
}

"VirtualDocument" should "read items while writer is writing new data" in {
val allItems = generateSampleItems()

// Split the items into two batches
val (batch1, batch2) = allItems.splitAt(allItems.length / 2)

// Create a reader before any data is written
val reader = document.get()
assert(!reader.hasNext, "Reader should initially have no data.")

// Write the first batch
val writer = document.writer()
writer.open()
batch1.foreach(writer.putOne)
writer.close()

// The reader should detect and read the first batch
val retrievedBatch1 = reader.take(batch1.length).toList
assert(retrievedBatch1.toSet == batch1.toSet, "Reader should read the first batch correctly.")

// Write the second batch
val writer2 = document.writer()
writer2.open()
batch2.foreach(writer2.putOne)
writer2.close()

// The reader should detect and read the second batch
val retrievedBatch2 = reader.toList
assert(retrievedBatch2.toSet == batch2.toSet, "Reader should read the second batch correctly.")

// Verify that the combined retrieved items match the original items
val retrievedItems = retrievedBatch1 ++ retrievedBatch2
assert(retrievedItems.toSet == allItems.toSet, "Reader should read all items correctly.")
}
it should "clear the document" in {
val items = generateSampleItems()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ import java.util.UUID

class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] {

// Define Amber Schema
// Define Amber Schema with all possible attribute types
val amberSchema: Schema = Schema(
List(
new Attribute("id", AttributeType.LONG),
new Attribute("name", AttributeType.STRING),
new Attribute("score", AttributeType.DOUBLE),
new Attribute("timestamp", AttributeType.TIMESTAMP)
new Attribute("col-string", AttributeType.STRING),
new Attribute("col-int", AttributeType.INTEGER),
new Attribute("col-bool", AttributeType.BOOLEAN),
new Attribute("col-long", AttributeType.LONG),
new Attribute("col-double", AttributeType.DOUBLE),
new Attribute("col-timestamp", AttributeType.TIMESTAMP)
// new Attribute("col-binary", AttributeType.BINARY)
)
)

Expand Down Expand Up @@ -65,7 +68,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] {
)
}

// Implementation of isDocumentCleared
// Implementation of isDocumentClearedgetSam
override def isDocumentCleared: Boolean = {
val identifier = TableIdentifier.of(tableNamespace, tableName)
!catalog.tableExists(identifier)
Expand All @@ -76,18 +79,45 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] {
List(
Tuple
.builder(amberSchema)
.add("id", AttributeType.LONG, 1L)
.add("name", AttributeType.STRING, "Alice")
.add("score", AttributeType.DOUBLE, 95.5)
.add("timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis()))
.add("col-string", AttributeType.STRING, "Hello World")
.add("col-int", AttributeType.INTEGER, 42)
.add("col-bool", AttributeType.BOOLEAN, true)
.add("col-long", AttributeType.LONG, 12345678901234L)
.add("col-double", AttributeType.DOUBLE, 3.14159)
.add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis()))
// .add("col-binary", AttributeType.BINARY, Array[Byte](1, 2, 3, 4, 5))
.build(),
Tuple
.builder(amberSchema)
.add("id", AttributeType.LONG, 2L)
.add("name", AttributeType.STRING, "Bob")
.add("score", AttributeType.DOUBLE, 88.0)
.add("timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis()))
.add("col-string", AttributeType.STRING, "")
.add("col-int", AttributeType.INTEGER, -1)
.add("col-bool", AttributeType.BOOLEAN, false)
.add("col-long", AttributeType.LONG, -98765432109876L)
.add("col-double", AttributeType.DOUBLE, -0.001)
.add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(0L))
// .add("col-binary", AttributeType.BINARY, Array[Byte]())
.build()
//
// Tuple
// .builder(amberSchema)
// .add("col-string", AttributeType.STRING, "Special Characters: \n\t\r")
// .add("col-int", AttributeType.INTEGER, Int.MaxValue)
// .add("col-bool", AttributeType.BOOLEAN, true)
// .add("col-long", AttributeType.LONG, Long.MaxValue)
// .add("col-double", AttributeType.DOUBLE, Double.MaxValue)
// .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(1234567890L))
//// .add("col-binary", AttributeType.BINARY, Array.fill[Byte](1000)('a'))
// .build(),
// Tuple
// .builder(amberSchema)
// .add("col-string", AttributeType.STRING, null)
// .add("col-int", AttributeType.INTEGER, null)
// .add("col-bool", AttributeType.BOOLEAN, null)
// .add("col-long", AttributeType.LONG, null)
// .add("col-double", AttributeType.DOUBLE, null)
// .add("col-timestamp", AttributeType.TIMESTAMP, null)
//// .add("col-binary", AttributeType.BINARY, null)
// .build()
)
}
}

0 comments on commit 0dded73

Please sign in to comment.