diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index c727f422da9..d5f4bc1064b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -28,6 +28,7 @@ trait InitializeExecutorHandler { case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code) case OpExecSink(storageKey, workflowIdentity, outputMode) => new ProgressiveSinkOpExec( + workerIdx, outputMode, storageKey, workflowIdentity diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala index f641440c643..e0d82451c5a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala @@ -179,7 +179,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with val storageType = collection.get("storageType").asText() val collectionName = collection.get("storageKey").asText() storageType match { - case OpResultStorage.MEMORY => + case OpResultStorage.MEMORY | OpResultStorage.ICEBERG => // rely on the server-side result cleanup logic. case OpResultStorage.MONGODB => MongoDatabaseManager.dropCollection(collectionName) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index a01a1d3b38b..493015de005 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -80,9 +80,9 @@ class WorkflowCompiler( val storageKey = OpResultStorage.createStorageKey(physicalOp.id.logicalOpId, outputPortId) - // Determine the storage type, defaulting to memory for large HTML visualizations + // Determine the storage type, defaulting to iceberg for large HTML visualizations val storageType = - if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY + if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.ICEBERG else OpResultStorage.defaultStorageMode if (!storage.contains(storageKey)) { diff --git a/core/build.sbt b/core/build.sbt index 648a9a5d7f3..8fb469af416 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -3,18 +3,37 @@ lazy val WorkflowCore = (project in file("workflow-core")) .dependsOn(DAO) .configs(Test) .dependsOn(DAO % "test->test") // test scope dependency -lazy val WorkflowOperator = (project in file("workflow-operator")).dependsOn(WorkflowCore) +lazy val WorkflowOperator = (project in file("workflow-operator")) + .dependsOn(WorkflowCore) + .settings( + dependencyOverrides ++= Seq( + "org.apache.commons" % "commons-compress" % "1.23.0", // because of the dependency introduced by iceberg + ) + ) lazy val WorkflowCompilingService = (project in file("workflow-compiling-service")) .dependsOn(WorkflowOperator) .settings( dependencyOverrides ++= Seq( // override it as io.dropwizard 4 require 2.16.1 or higher - "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1" + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1", + "org.glassfish.jersey.core" % "jersey-common" % "3.0.12" ) ) lazy val WorkflowExecutionService = (project in file("amber")) .dependsOn(WorkflowOperator) + .settings( + dependencyOverrides ++= Seq( + "com.fasterxml.jackson.core" % "jackson-core" % "2.15.1", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1", + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.15.1", + "org.slf4j" % "slf4j-api" % "1.7.26", + "org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813", + "org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813", + "org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813", + ) + ) .configs(Test) .dependsOn(DAO % "test->test") // test scope dependency diff --git a/core/workflow-core/build.sbt b/core/workflow-core/build.sbt index 299aab37bd3..46deac0557d 100644 --- a/core/workflow-core/build.sbt +++ b/core/workflow-core/build.sbt @@ -111,6 +111,54 @@ val arrowDependencies = Seq( libraryDependencies ++= arrowDependencies +///////////////////////////////////////////////////////////////////////////// +// Iceberg-related Dependencies +///////////////////////////////////////////////////////////////////////////// +val excludeJersey = ExclusionRule(organization = "com.sun.jersey") +val excludeGlassfishJersey = ExclusionRule(organization = "org.glassfish.jersey") +val excludeSlf4j = ExclusionRule(organization = "org.slf4j") +val excludeJetty = ExclusionRule(organization = "org.eclipse.jetty") +val excludeJsp = ExclusionRule(organization = "javax.servlet.jsp") +val excludeXmlBind = ExclusionRule(organization = "javax.xml.bind") +val excludeJackson = ExclusionRule(organization = "com.fasterxml.jackson.core") +val excludeJacksonModule = ExclusionRule(organization = "com.fasterxml.jackson.module") + +libraryDependencies ++= Seq( + "org.apache.iceberg" % "iceberg-api" % "1.7.1", + "org.apache.iceberg" % "iceberg-parquet" % "1.7.1" excludeAll( + excludeJackson, + excludeJacksonModule + ), + "org.apache.iceberg" % "iceberg-core" % "1.7.1" excludeAll( + excludeJackson, + excludeJacksonModule + ), + "org.apache.iceberg" % "iceberg-data" % "1.7.1" excludeAll( + excludeJackson, + excludeJacksonModule + ), + "org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll( + excludeXmlBind, + excludeGlassfishJersey, + excludeJersey, + excludeSlf4j, + excludeJetty, + excludeJsp, + excludeJackson, + excludeJacksonModule + ), + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.3.1" excludeAll( + excludeXmlBind, + excludeGlassfishJersey, + excludeJersey, + excludeSlf4j, + excludeJetty, + excludeJsp, + excludeJackson, + excludeJacksonModule + ), +) + ///////////////////////////////////////////////////////////////////////////// // Additional Dependencies ///////////////////////////////////////////////////////////////////////////// @@ -123,5 +171,5 @@ libraryDependencies ++= Seq( "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging "org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit "org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber) - "org.apache.commons" % "commons-vfs2" % "2.9.0" // for FileResolver throw VFS-related exceptions + "org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions ) \ No newline at end of file diff --git a/core/workflow-core/src/main/resources/storage-config.yaml b/core/workflow-core/src/main/resources/storage-config.yaml index 2b6ed20ccaa..28a3aef4a3a 100644 --- a/core/workflow-core/src/main/resources/storage-config.yaml +++ b/core/workflow-core/src/main/resources/storage-config.yaml @@ -1,9 +1,21 @@ storage: - result-storage-mode: memory + result-storage-mode: iceberg mongodb: url: "mongodb://localhost:27017" database: "texera_storage" commit-batch-size: 1000 + iceberg: + table: + namespace: "operator-result" + commit: + batch-size: 4096 # decide the buffer size of our IcebergTableWriter + retry: + # retry configures the OCC parameter for concurrent write operations in Iceberg + # Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/ + # Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties + num-retries: 10 + min-wait-ms: 100 # 0.1s + max-wait-ms: 10000 # 10s jdbc: url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC" username: "" diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/IcebergCatalogInstance.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/IcebergCatalogInstance.scala new file mode 100644 index 00000000000..6e5354905b8 --- /dev/null +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/IcebergCatalogInstance.scala @@ -0,0 +1,43 @@ +package edu.uci.ics.amber.core.storage + +import edu.uci.ics.amber.util.IcebergUtil +import org.apache.iceberg.catalog.Catalog + +/** + * IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance. + * - Provides a single shared catalog for all Iceberg table-related operations in the Texera application. + * - Lazily initializes the catalog on first access. + * - Supports replacing the catalog instance primarily for testing or reconfiguration. + */ +object IcebergCatalogInstance { + + private var instance: Option[Catalog] = None + + /** + * Retrieves the singleton Iceberg catalog instance. + * - If the catalog is not initialized, it is lazily created using the configured properties. + * @return the Iceberg catalog instance. + */ + def getInstance(): Catalog = { + instance match { + case Some(catalog) => catalog + case None => + val hadoopCatalog = IcebergUtil.createHadoopCatalog( + "texera_iceberg", + StorageConfig.fileStorageDirectoryPath + ) + instance = Some(hadoopCatalog) + hadoopCatalog + } + } + + /** + * Replaces the existing Iceberg catalog instance. + * - This method is useful for testing or dynamically updating the catalog. + * + * @param catalog the new Iceberg catalog instance to replace the current one. + */ + def replaceInstance(catalog: Catalog): Unit = { + instance = Some(catalog) + } +} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala index 08e25051995..615270e5328 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala @@ -1,7 +1,9 @@ package edu.uci.ics.amber.core.storage +import edu.uci.ics.amber.util.PathUtils.corePath import org.yaml.snakeyaml.Yaml +import java.nio.file.Path import java.util.{Map => JMap} import scala.jdk.CollectionConverters._ @@ -13,34 +15,94 @@ object StorageConfig { val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap val jdbcMap = storageMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap - javaConf - .updated("storage", storageMap.updated("mongodb", mongodbMap).updated("jdbc", jdbcMap)) + + javaConf.updated( + "storage", + storageMap + .updated("mongodb", mongodbMap) + .updated( + "iceberg", + icebergMap + .updated( + "table", + icebergTableMap.updated( + "commit", + icebergCommitMap.updated("retry", icebergRetryMap) + ) + ) + ) + .updated("jdbc", jdbcMap) + ) } + // Result storage mode val resultStorageMode: String = conf("storage").asInstanceOf[Map[String, Any]]("result-storage-mode").asInstanceOf[String] - // For MongoDB specifics + // MongoDB configurations val mongodbUrl: String = conf("storage") .asInstanceOf[Map[String, Any]]("mongodb") .asInstanceOf[Map[String, Any]]("url") .asInstanceOf[String] + val mongodbDatabaseName: String = conf("storage") .asInstanceOf[Map[String, Any]]("mongodb") .asInstanceOf[Map[String, Any]]("database") .asInstanceOf[String] + val mongodbBatchSize: Int = conf("storage") .asInstanceOf[Map[String, Any]]("mongodb") .asInstanceOf[Map[String, Any]]("commit-batch-size") .asInstanceOf[Int] + val icebergTableNamespace: String = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("namespace") + .asInstanceOf[String] + + val icebergTableCommitBatchSize: Int = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("commit") + .asInstanceOf[Map[String, Any]]("batch-size") + .asInstanceOf[Int] + + val icebergTableCommitNumRetries: Int = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("commit") + .asInstanceOf[Map[String, Any]]("retry") + .asInstanceOf[Map[String, Any]]("num-retries") + .asInstanceOf[Int] + + val icebergTableCommitMinRetryWaitMs: Int = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("commit") + .asInstanceOf[Map[String, Any]]("retry") + .asInstanceOf[Map[String, Any]]("min-wait-ms") + .asInstanceOf[Int] + + val icebergTableCommitMaxRetryWaitMs: Int = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("commit") + .asInstanceOf[Map[String, Any]]("retry") + .asInstanceOf[Map[String, Any]]("max-wait-ms") + .asInstanceOf[Int] + + // JDBC configurations val jdbcUrl: String = conf("storage") .asInstanceOf[Map[String, Any]]("jdbc") .asInstanceOf[Map[String, Any]]("url") .asInstanceOf[String] - // For jdbc specifics val jdbcUsername: String = conf("storage") .asInstanceOf[Map[String, Any]]("jdbc") .asInstanceOf[Map[String, Any]]("username") @@ -50,4 +112,8 @@ object StorageConfig { .asInstanceOf[Map[String, Any]]("jdbc") .asInstanceOf[Map[String, Any]]("password") .asInstanceOf[String] + + // File storage configurations + val fileStorageDirectoryPath: Path = + corePath.resolve("amber").resolve("user-resources").resolve("workflow-results") } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/VirtualDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/VirtualDocument.scala index 19e46ce1c36..45ddf018fc9 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/VirtualDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/VirtualDocument.scala @@ -65,9 +65,10 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] { /** * return a writer that buffers the items and performs the flush operation at close time + * @param writerIdentifier the id of the writer, maybe required by some implementations * @return a buffered item writer */ - def writer(): BufferedItemWriter[T] = + def writer(writerIdentifier: String): BufferedItemWriter[T] = throw new NotImplementedError("write method is not implemented") /** diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MemoryDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MemoryDocument.scala index c4cc26ecbcf..b508bb461c1 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MemoryDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MemoryDocument.scala @@ -69,7 +69,7 @@ class MemoryDocument[T >: Null <: AnyRef](key: String) results += item } - override def writer(): BufferedItemWriter[T] = this + override def writer(writerIdentifier: String): BufferedItemWriter[T] = this /** * The size of the buffer for the buffered writer. This number is not used currently diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala index 92fa1cdce11..9cc2f5c2860 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala @@ -56,7 +56,7 @@ class MongoDocument[T >: Null <: AnyRef]( * Return a buffered item writer for the MongoDB collection. * @return a new instance of MongoDBBufferedItemWriter. */ - override def writer(): BufferedItemWriter[T] = { + override def writer(writerIdentifier: String): BufferedItemWriter[T] = { new MongoDBBufferedItemWriter[T]( commitBatchSize, id, diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala index 83c25a2488b..eb295950164 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala @@ -3,9 +3,13 @@ package edu.uci.ics.amber.core.storage.result import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.core.storage.model.VirtualDocument +import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument import edu.uci.ics.amber.core.tuple.{Schema, Tuple} import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity import edu.uci.ics.amber.core.workflow.PortIdentity +import edu.uci.ics.amber.util.IcebergUtil +import org.apache.iceberg.data.Record +import org.apache.iceberg.{Schema => IcebergSchema} import java.util.concurrent.ConcurrentHashMap import scala.jdk.CollectionConverters.IteratorHasAsScala @@ -18,6 +22,7 @@ object OpResultStorage { val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase val MEMORY: String = "memory" val MONGODB: String = "mongodb" + val ICEBERG = "iceberg" /** * Creates a unique storage key by combining operator and port identities. @@ -112,7 +117,7 @@ class OpResultStorage extends Serializable with LazyLogging { val storage: VirtualDocument[Tuple] = if (mode == OpResultStorage.MEMORY) { new MemoryDocument[Tuple](key) - } else { + } else if (mode == OpResultStorage.MONGODB) { try { new MongoDocument[Tuple]( executionId + key, @@ -125,6 +130,19 @@ class OpResultStorage extends Serializable with LazyLogging { logger.info(s"Falling back to memory storage for $key") new MemoryDocument[Tuple](key) } + } else { + val icebergSchema = IcebergUtil.toIcebergSchema(schema) + val serde: (IcebergSchema, Tuple) => Record = IcebergUtil.toGenericRecord + val deserde: (IcebergSchema, Record) => Tuple = (_, record) => + IcebergUtil.fromRecord(record, schema) + + new IcebergDocument[Tuple]( + StorageConfig.icebergTableNamespace, + executionId + key, + icebergSchema, + serde, + deserde + ) } cache.put(key, (storage, schema)) storage diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala new file mode 100644 index 00000000000..24ee6b6956c --- /dev/null +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -0,0 +1,261 @@ +package edu.uci.ics.amber.core.storage.result.iceberg + +import edu.uci.ics.amber.core.storage.IcebergCatalogInstance +import edu.uci.ics.amber.core.storage.model.{BufferedItemWriter, VirtualDocument} +import edu.uci.ics.amber.core.storage.util.StorageUtil.{withLock, withReadLock, withWriteLock} +import edu.uci.ics.amber.util.IcebergUtil +import org.apache.iceberg.{FileScanTask, Table} +import org.apache.iceberg.catalog.{Catalog, TableIdentifier} +import org.apache.iceberg.data.Record +import org.apache.iceberg.exceptions.NoSuchTableException + +import java.net.URI +import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} +import scala.jdk.CollectionConverters._ + +/** + * IcebergDocument is used to read and write a set of T as an Iceberg table. + * It provides iterator-based read methods and supports multiple writers to write to the same table. + * + * - On construction, the table will be created if it does not exist. + * - If the table exists, it will be overridden. + * + * @param tableNamespace namespace of the table. + * @param tableName name of the table. + * @param tableSchema schema of the table. + * @param serde function to serialize T into an Iceberg Record. + * @param deserde function to deserialize an Iceberg Record into T. + * @tparam T type of the data items stored in the Iceberg table. + */ +class IcebergDocument[T >: Null <: AnyRef]( + val tableNamespace: String, + val tableName: String, + val tableSchema: org.apache.iceberg.Schema, + val serde: (org.apache.iceberg.Schema, T) => Record, + val deserde: (org.apache.iceberg.Schema, Record) => T +) extends VirtualDocument[T] { + + private val lock = new ReentrantReadWriteLock() + + @transient lazy val catalog: Catalog = IcebergCatalogInstance.getInstance() + + // During construction, create or override the table + IcebergUtil.createTable( + catalog, + tableNamespace, + tableName, + tableSchema, + overrideIfExists = true + ) + + /** + * Returns the URI of the table location. + * @throws NoSuchTableException if the table does not exist. + */ + override def getURI: URI = { + val table = IcebergUtil + .loadTableMetadata(catalog, tableNamespace, tableName) + .getOrElse( + throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist") + ) + URI.create(table.location()) + } + + /** + * Deletes the table and clears its contents. + */ + override def clear(): Unit = + withWriteLock(lock) { + val identifier = TableIdentifier.of(tableNamespace, tableName) + if (catalog.tableExists(identifier)) { + catalog.dropTable(identifier) + } + } + + /** + * Get an iterator for reading records from the table. + */ + override def get(): Iterator[T] = getUsingFileSequenceOrder(0, None) + + /** + * Get records within a specified range [from, until). + */ + override def getRange(from: Int, until: Int): Iterator[T] = { + getUsingFileSequenceOrder(from, Some(until)) + } + + /** + * Get records starting after a specified offset. + */ + override def getAfter(offset: Int): Iterator[T] = { + getUsingFileSequenceOrder(offset, None) + } + + /** + * Get the total count of records in the table. + */ + override def getCount: Long = { + val table = IcebergUtil + .loadTableMetadata(catalog, tableNamespace, tableName) + .getOrElse( + return 0 + ) + table.newScan().planFiles().iterator().asScala.map(f => f.file().recordCount()).sum + } + + /** + * Creates a BufferedItemWriter for writing data to the table. + * @param writerIdentifier The writer's ID. It should be unique within the same table, as each writer will use it as + * the prefix of the files they append + */ + override def writer(writerIdentifier: String): BufferedItemWriter[T] = { + new IcebergTableWriter[T]( + writerIdentifier, + catalog, + tableNamespace, + tableName, + tableSchema, + serde + ) + } + + /** + * Util iterator to get T in certain range + * @param from start from which record inclusively, if 0 means start from the first + * @param until end at which record exclusively, if None means read to the table's EOF + */ + private def getUsingFileSequenceOrder(from: Int, until: Option[Int]): Iterator[T] = + withReadLock(lock) { + new Iterator[T] { + private val iteLock = new ReentrantLock() + // Load the table instance, initially the table instance may not exist + private var table: Option[Table] = loadTableMetadata() + + // Last seen snapshot id(logically it's like a version number). While reading, new snapshots may be created + private var lastSnapshotId: Option[Long] = None + + // Counter for how many records have been skipped + private var numOfSkippedRecords = 0 + + // Counter for how many records have been returned + private var numOfReturnedRecords = 0 + + // Total number of records to return + private val totalRecordsToReturn = until.map(_ - from).getOrElse(Int.MaxValue) + + // Iterator for usable file scan tasks + private var usableFileIterator: Iterator[FileScanTask] = seekToUsableFile() + + // Current record iterator for the active file + private var currentRecordIterator: Iterator[Record] = Iterator.empty + + // Util function to load the table's metadata + private def loadTableMetadata(): Option[Table] = { + IcebergUtil.loadTableMetadata( + catalog, + tableNamespace, + tableName + ) + } + + private def seekToUsableFile(): Iterator[FileScanTask] = + withLock(iteLock) { + if (numOfSkippedRecords > from) { + throw new RuntimeException("seek operation should not be called") + } + + // refresh the table's snapshots + if (table.isEmpty) { + table = loadTableMetadata() + } + table.foreach(_.refresh()) + + // Retrieve and sort the file scan tasks by file sequence number + val fileScanTasksIterator: Iterator[FileScanTask] = table match { + case Some(t) => + val currentSnapshotId = Option(t.currentSnapshot()).map(_.snapshotId()) + val fileScanTasks = (lastSnapshotId, currentSnapshotId) match { + // Read from the start + case (None, Some(_)) => + val tasks = t.newScan().planFiles().iterator().asScala + lastSnapshotId = currentSnapshotId + tasks + + // Read incrementally from the last snapshot + case (Some(lastId), Some(currId)) if lastId != currId => + val tasks = t + .newIncrementalAppendScan() + .fromSnapshotExclusive(lastId) + .toSnapshot(currId) + .planFiles() + .iterator() + .asScala + lastSnapshotId = currentSnapshotId + tasks + + // No new data + case (Some(lastId), Some(currId)) if lastId == currId => + Iterator.empty + + // Default: No data yet + case _ => + Iterator.empty + } + fileScanTasks.toSeq.sortBy(_.file().fileSequenceNumber()).iterator + + case None => + Iterator.empty + } + + // Iterate through sorted FileScanTasks and update numOfSkippedRecords + val usableTasks = fileScanTasksIterator.dropWhile { task => + val recordCount = task.file().recordCount() + if (numOfSkippedRecords + recordCount <= from) { + numOfSkippedRecords += recordCount.toInt + true + } else { + false + } + } + + usableTasks + } + + override def hasNext: Boolean = { + if (numOfReturnedRecords >= totalRecordsToReturn) { + return false + } + + if (!usableFileIterator.hasNext) { + usableFileIterator = seekToUsableFile() + } + + while (!currentRecordIterator.hasNext && usableFileIterator.hasNext) { + val nextFile = usableFileIterator.next() + currentRecordIterator = IcebergUtil.readDataFileAsIterator( + nextFile.file(), + tableSchema, + table.get + ) + + // Skip records within the file if necessary + val recordsToSkipInFile = from - numOfSkippedRecords + if (recordsToSkipInFile > 0) { + currentRecordIterator = currentRecordIterator.drop(recordsToSkipInFile) + numOfSkippedRecords += recordsToSkipInFile + } + } + + currentRecordIterator.hasNext + } + + override def next(): T = { + if (!hasNext) throw new NoSuchElementException("No more records available") + + val record = currentRecordIterator.next() + numOfReturnedRecords += 1 + deserde(tableSchema, record) + } + } + } +} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala new file mode 100644 index 00000000000..3bfcea98539 --- /dev/null +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -0,0 +1,126 @@ +package edu.uci.ics.amber.core.storage.result.iceberg + +import edu.uci.ics.amber.core.storage.StorageConfig +import edu.uci.ics.amber.core.storage.model.BufferedItemWriter +import edu.uci.ics.amber.util.IcebergUtil +import org.apache.iceberg.{Schema, Table} +import org.apache.iceberg.catalog.Catalog +import org.apache.iceberg.data.Record +import org.apache.iceberg.data.parquet.GenericParquetWriter +import org.apache.iceberg.io.{DataWriter, OutputFile} +import org.apache.iceberg.parquet.Parquet + +import java.nio.file.Paths +import scala.collection.mutable.ArrayBuffer + +/** + * IcebergTableWriter writes data to the given Iceberg table in an append-only way. + * - Each time the buffer is flushed, a new data file is created with a unique name. + * - The `writerIdentifier` is used to prefix the created files. + * - Iceberg data files are immutable once created. So each flush will create a distinct file. + * + * **Thread Safety**: This writer is **NOT thread-safe**, so only one thread should call this writer. + * + * @param writerIdentifier a unique identifier used to prefix the created files. + * @param catalog the Iceberg catalog to manage table metadata. + * @param tableNamespace the namespace of the Iceberg table. + * @param tableName the name of the Iceberg table. + * @param tableSchema the schema of the Iceberg table. + * @param serde a function to serialize `T` into an Iceberg `Record`. + * @tparam T the type of the data items written to the table. + */ +class IcebergTableWriter[T]( + val writerIdentifier: String, + val catalog: Catalog, + val tableNamespace: String, + val tableName: String, + val tableSchema: Schema, + val serde: (org.apache.iceberg.Schema, T) => Record +) extends BufferedItemWriter[T] { + + // Buffer to hold items before flushing to the table + private val buffer = new ArrayBuffer[T]() + // Incremental filename index, incremented each time a new buffer is flushed + private var filenameIdx = 0 + // Incremental record ID, incremented for each record + private var recordId = 0 + + override val bufferSize: Int = StorageConfig.icebergTableCommitBatchSize + + // Load the Iceberg table + private val table: Table = + IcebergUtil + .loadTableMetadata(catalog, tableNamespace, tableName) + .get + + /** + * Open the writer and clear the buffer. + */ + override def open(): Unit = { + buffer.clear() + } + + /** + * Add a single item to the buffer. + * - If the buffer size exceeds the configured limit, the buffer is flushed. + * @param item the item to add to the buffer. + */ + override def putOne(item: T): Unit = { + buffer.append(item) + if (buffer.size >= bufferSize) { + flushBuffer() + } + } + + /** + * Remove a single item from the buffer. + * @param item the item to remove from the buffer. + */ + override def removeOne(item: T): Unit = { + buffer -= item + } + + /** + * Flush the current buffer to a new Iceberg data file. + * - Creates a new data file using the writer identifier and an incremental filename index. + * - Writes all buffered items to the new file and commits it to the Iceberg table. + */ + private def flushBuffer(): Unit = { + if (buffer.nonEmpty) { + // Create a unique file path using the writer's identifier and the filename index + val filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}") + // Increment the filename index by 1 + filenameIdx += 1 + val outputFile: OutputFile = table.io().newOutputFile(filepath.toString) + // Create a Parquet data writer to write a new file + val dataWriter: DataWriter[Record] = Parquet + .writeData(outputFile) + .forTable(table) + .createWriterFunc(GenericParquetWriter.buildWriter) + .overwrite() + .build() + // Write each buffered item to the data file + try { + buffer.foreach { item => + dataWriter.write(serde(tableSchema, item)) + } + } finally { + dataWriter.close() + } + // Commit the new file to the table + val dataFile = dataWriter.toDataFile + table.newAppend().appendFile(dataFile).commit() + // Clear the item buffer + buffer.clear() + } + } + + /** + * Close the writer, ensuring any remaining buffered items are flushed. + */ + override def close(): Unit = { + if (buffer.nonEmpty) { + flushBuffer() + } + } +} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/StorageUtil.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/StorageUtil.scala new file mode 100644 index 00000000000..0e606015e8b --- /dev/null +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/StorageUtil.scala @@ -0,0 +1,23 @@ +package edu.uci.ics.amber.core.storage.util + +import java.util.concurrent.locks.{Lock, ReadWriteLock} + +object StorageUtil { + def withWriteLock[M](rwLock: ReadWriteLock)(block: => M): M = { + rwLock.writeLock().lock() + try block + finally rwLock.writeLock().unlock() + } + + def withReadLock[M](rwLock: ReadWriteLock)(block: => M): M = { + rwLock.readLock().lock() + try block + finally rwLock.readLock().unlock() + } + + def withLock[M](lock: Lock)(block: => M): M = { + lock.lock() + try block + finally lock.unlock() + } +} diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala index dea62ba6c00..af6fa86fc68 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/tuple/Tuple.scala @@ -73,8 +73,15 @@ case class Tuple @JsonCreator() ( override def equals(obj: Any): Boolean = obj match { - case that: Tuple => (this.getFields sameElements that.getFields) && this.schema == that.schema - case _ => false + case that: Tuple => + this.schema == that.schema && + this.getFields.zip(that.getFields).forall { + case (field1: Array[Byte], field2: Array[Byte]) => + field1.sameElements(field2) // for Binary, use sameElements instead of == to compare + case (field1, field2) => + field1 == field2 + } + case _ => false } def getPartialTuple(attributeNames: List[String]): Tuple = { diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala new file mode 100644 index 00000000000..9d3c7d2c5e5 --- /dev/null +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala @@ -0,0 +1,268 @@ +package edu.uci.ics.amber.util + +import edu.uci.ics.amber.core.storage.StorageConfig +import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg.catalog.{Catalog, TableIdentifier} +import org.apache.iceberg.data.parquet.GenericParquetReaders +import org.apache.iceberg.types.Types +import org.apache.iceberg.data.{GenericRecord, Record} +import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO} +import org.apache.iceberg.io.{CloseableIterable, InputFile} +import org.apache.iceberg.parquet.{Parquet, ParquetValueReader} +import org.apache.iceberg.types.Type.PrimitiveType +import org.apache.iceberg.{ + CatalogProperties, + DataFile, + PartitionSpec, + Table, + TableProperties, + Schema => IcebergSchema +} + +import java.nio.ByteBuffer +import java.nio.file.Path +import java.sql.Timestamp +import java.time.LocalDateTime +import java.time.ZoneId +import scala.jdk.CollectionConverters._ + +/** + * Util functions to interact with Iceberg Tables + */ +object IcebergUtil { + + /** + * Creates and initializes a HadoopCatalog with the given parameters. + * - Uses an empty Hadoop `Configuration`, meaning the local file system (or `file:/`) will be used by default + * instead of HDFS. + * - The `warehouse` parameter specifies the root directory for storing table data. + * - Sets the file I/O implementation to `HadoopFileIO`. + * + * @param catalogName the name of the catalog. + * @param warehouse the root path for the warehouse where the tables are stored. + * @return the initialized HadoopCatalog instance. + */ + def createHadoopCatalog( + catalogName: String, + warehouse: Path + ): HadoopCatalog = { + val catalog = new HadoopCatalog() + catalog.setConf(new Configuration) // Empty configuration, defaults to `file:/` + catalog.initialize( + catalogName, + Map( + "warehouse" -> warehouse.toString, + CatalogProperties.FILE_IO_IMPL -> classOf[HadoopFileIO].getName + ).asJava + ) + + catalog + } + + /** + * Creates a new Iceberg table with the specified schema and properties. + * - Drops the existing table if `overrideIfExists` is true and the table already exists. + * - Creates an unpartitioned table with custom commit retry properties. + * + * @param catalog the Iceberg catalog to manage the table. + * @param tableNamespace the namespace of the table. + * @param tableName the name of the table. + * @param tableSchema the schema of the table. + * @param overrideIfExists whether to drop and recreate the table if it exists. + * @return the created Iceberg table. + */ + def createTable( + catalog: Catalog, + tableNamespace: String, + tableName: String, + tableSchema: IcebergSchema, + overrideIfExists: Boolean + ): Table = { + val tableProperties = Map( + TableProperties.COMMIT_NUM_RETRIES -> StorageConfig.icebergTableCommitNumRetries.toString, + TableProperties.COMMIT_MAX_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMaxRetryWaitMs.toString, + TableProperties.COMMIT_MIN_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMinRetryWaitMs.toString + ) + val identifier = TableIdentifier.of(tableNamespace, tableName) + if (catalog.tableExists(identifier) && overrideIfExists) { + catalog.dropTable(identifier) + } + catalog.createTable( + identifier, + tableSchema, + PartitionSpec.unpartitioned, + tableProperties.asJava + ) + } + + /** + * Loads metadata for an existing Iceberg table. + * - Returns `Some(Table)` if the table exists and is successfully loaded. + * - Returns `None` if the table does not exist or cannot be loaded. + * + * @param catalog the Iceberg catalog to load the table from. + * @param tableNamespace the namespace of the table. + * @param tableName the name of the table. + * @return an Option containing the table, or None if not found. + */ + def loadTableMetadata( + catalog: Catalog, + tableNamespace: String, + tableName: String + ): Option[Table] = { + val identifier = TableIdentifier.of(tableNamespace, tableName) + try { + Some(catalog.loadTable(identifier)) + } catch { + case _: Exception => None + } + } + + /** + * Converts a custom Amber `Schema` to an Iceberg `Schema`. + * + * @param amberSchema The custom Amber Schema. + * @return An Iceberg Schema. + */ + def toIcebergSchema(amberSchema: Schema): IcebergSchema = { + val icebergFields = amberSchema.getAttributes.zipWithIndex.map { + case (attribute, index) => + Types.NestedField.optional(index + 1, attribute.getName, toIcebergType(attribute.getType)) + } + new IcebergSchema(icebergFields.asJava) + } + + /** + * Converts a custom Amber `AttributeType` to an Iceberg `Type`. + * + * @param attributeType The custom Amber AttributeType. + * @return The corresponding Iceberg Type. + */ + def toIcebergType(attributeType: AttributeType): PrimitiveType = { + attributeType match { + case AttributeType.STRING => Types.StringType.get() + case AttributeType.INTEGER => Types.IntegerType.get() + case AttributeType.LONG => Types.LongType.get() + case AttributeType.DOUBLE => Types.DoubleType.get() + case AttributeType.BOOLEAN => Types.BooleanType.get() + case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone() + case AttributeType.BINARY => Types.BinaryType.get() + case AttributeType.ANY => + throw new IllegalArgumentException("ANY type is not supported in Iceberg") + } + } + + /** + * Converts a custom Amber `Tuple` to an Iceberg `GenericRecord`, handling `null` values. + * + * @param tuple The custom Amber Tuple. + * @return An Iceberg GenericRecord. + */ + def toGenericRecord(icebergSchema: IcebergSchema, tuple: Tuple): Record = { + val record = GenericRecord.create(icebergSchema) + + tuple.schema.getAttributes.zipWithIndex.foreach { + case (attribute, index) => + val value = tuple.getField[AnyRef](index) match { + case null => null + case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + case other => other + } + record.setField(attribute.getName, value) + } + + record + } + + /** + * Converts an Iceberg `Record` to an Amber `Tuple` + * + * @param record The Iceberg Record. + * @param amberSchema The corresponding Amber Schema. + * @return An Amber Tuple. + */ + def fromRecord(record: Record, amberSchema: Schema): Tuple = { + val fieldValues = amberSchema.getAttributes.map { attribute => + val value = record.getField(attribute.getName) match { + case null => null + case ldt: LocalDateTime => Timestamp.valueOf(ldt) + case buffer: ByteBuffer => + val bytes = new Array[Byte](buffer.remaining()) + buffer.get(bytes) + bytes + case other => other + } + value + } + + Tuple(amberSchema, fieldValues.toArray) + } + + /** + * Converts an Iceberg `Schema` to an Amber `Schema`. + * + * @param icebergSchema The Iceberg Schema. + * @return The corresponding Amber Schema. + */ + def fromIcebergSchema(icebergSchema: IcebergSchema): Schema = { + val attributes = icebergSchema + .columns() + .asScala + .map { field => + new Attribute(field.name(), fromIcebergType(field.`type`().asPrimitiveType())) + } + .toList + + Schema(attributes) + } + + /** + * Converts an Iceberg `Type` to an Amber `AttributeType`. + * + * @param icebergType The Iceberg Type. + * @return The corresponding Amber AttributeType. + */ + def fromIcebergType(icebergType: PrimitiveType): AttributeType = { + icebergType match { + case _: Types.StringType => AttributeType.STRING + case _: Types.IntegerType => AttributeType.INTEGER + case _: Types.LongType => AttributeType.LONG + case _: Types.DoubleType => AttributeType.DOUBLE + case _: Types.BooleanType => AttributeType.BOOLEAN + case _: Types.TimestampType => AttributeType.TIMESTAMP + case _: Types.BinaryType => AttributeType.BINARY + case _ => throw new IllegalArgumentException(s"Unsupported Iceberg type: $icebergType") + } + } + + /** + * Util function to create a Record iterator over the given DataFile in Iceberg + * @param dataFile the data file + * @param schema the schema of the table + * @param table the iceberg table + * @return an iterator over the records in the data file + */ + def readDataFileAsIterator( + dataFile: DataFile, + schema: IcebergSchema, + table: Table + ): Iterator[Record] = { + val inputFile: InputFile = table.io().newInputFile(dataFile) + val readerFunc + : java.util.function.Function[org.apache.parquet.schema.MessageType, ParquetValueReader[ + _ + ]] = + (messageType: org.apache.parquet.schema.MessageType) => + GenericParquetReaders.buildReader(schema, messageType) + val closeableIterable: CloseableIterable[Record] = + Parquet + .read(inputFile) + .project(schema) + .createReaderFunc(readerFunc) + .build() + closeableIterable.iterator().asScala + } + +} diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala new file mode 100644 index 00000000000..14d3d82ca36 --- /dev/null +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/core/storage/model/VirtualDocumentSpec.scala @@ -0,0 +1,284 @@ +package edu.uci.ics.amber.core.storage.model + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import java.util.UUID +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Await, Future} + +/** + * A trait for testing VirtualDocument implementations. + * Provides common read/write test cases and hooks for subclasses to customize. + * @tparam T the type of data that the VirtualDocument handles. + */ +trait VirtualDocumentSpec[T] extends AnyFlatSpec with BeforeAndAfterEach { + + /** + * Constructs the VirtualDocument instance to be tested. + * Subclasses should override this to provide their specific implementation. + */ + def getDocument: VirtualDocument[T] + + /** + * Checks if the document has been cleared. + * Subclasses should override this to provide their specific check. + * @return true if the document is cleared, false otherwise. + */ + def isDocumentCleared: Boolean + + // VirtualDocument instance for each test + var document: VirtualDocument[T] = _ + + override def beforeEach(): Unit = { + document = getDocument + } + + "VirtualDocument" should "write and read items successfully" in { + val items = generateSampleItems() + + // Get writer and write items + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + items.foreach(writer.putOne) + writer.close() + + // Read items back + val retrievedItems = document.get().toList + + assert(retrievedItems.toSet == items.toSet) + } + + "VirtualDocument" should "read items while writer is writing new data" in { + val allItems = generateSampleItems() + + // Split the items into two batches + val (batch1, batch2) = allItems.splitAt(allItems.length / 2) + + // Create a reader before any data is written + val reader = document.get() + assert(!reader.hasNext, "Reader should initially have no data.") + + // Write the first batch + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + batch1.foreach(writer.putOne) + writer.close() + + // The reader should detect and read the first batch + val retrievedBatch1 = reader.take(batch1.length).toList + assert(retrievedBatch1.toSet == batch1.toSet, "Reader should read the first batch correctly.") + + // Write the second batch + val writer2 = document.writer(UUID.randomUUID().toString) + writer2.open() + batch2.foreach(writer2.putOne) + writer2.close() + + // The reader should detect and read the second batch + val retrievedBatch2 = reader.toList + assert(retrievedBatch2.toSet == batch2.toSet, "Reader should read the second batch correctly.") + } + it should "clear the document" in { + val items = generateSampleItems() + + // Write items + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + items.foreach(writer.putOne) + writer.close() + + // Ensure items are written + assert(document.get().nonEmpty, "The document should contain items before clearing.") + + // Clear the document + document.clear() + + // Check if the document is cleared + assert(isDocumentCleared, "The document should be cleared after calling clear.") + assert(document.get().isEmpty, "The document should have no items after clearing.") + } + + it should "handle empty reads gracefully" in { + val retrievedItems = document.get().toList + assert(retrievedItems.isEmpty, "Reading from an empty document should return an empty list.") + } + + it should "handle concurrent writes and read all items correctly" in { + val allItems = generateSampleItems() + val numWriters = 10 + + // Calculate the batch size and the remainder + val batchSize = allItems.length / numWriters + val remainder = allItems.length % numWriters + + // Create writer's batches + val itemBatches = (0 until numWriters).map { i => + val start = i * batchSize + Math.min(i, remainder) + val end = start + batchSize + (if (i < remainder) 1 else 0) + allItems.slice(start, end) + }.toList + + assert( + itemBatches.length == numWriters, + s"Expected $numWriters batches but got ${itemBatches.length}" + ) + + // Perform concurrent writes + val writeFutures = itemBatches.map { batch => + Future { + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + batch.foreach(writer.putOne) + writer.close() + } + } + + // Wait for all writers to complete + Await.result(Future.sequence(writeFutures), 30.seconds) + + // Read all items back + val retrievedItems = document.get().toList + + // Verify that the retrieved items match the original items + assert( + retrievedItems.toSet == allItems.toSet, + "All items should be read correctly after concurrent writes." + ) + } + + it should "allow a reader to read data while a writer is writing items incrementally" in { + val allItems = generateSampleItems() + val batchSize = allItems.length / 5 // Divide items into 5 incremental batches + + // Split items into 5 batches + val itemBatches = allItems.grouped(batchSize).toList + + // Flag to indicate when writing is done + @volatile var writingComplete = false + + // Start the writer in a Future to write batches with delays + val writerFuture = Future { + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + try { + itemBatches.foreach { batch => + batch.foreach(writer.putOne) + Thread.sleep(500) // Simulate delay between batches + } + } finally { + writer.close() + writingComplete = true + } + } + + // Start the reader in another Future + val readerFuture = Future { + val reader = document.get() + val retrievedItems = scala.collection.mutable.ListBuffer[T]() + + // Keep checking for new data until writing is complete and no more items are available + while (!writingComplete || reader.hasNext) { + if (reader.hasNext) { + retrievedItems += reader.next() + } else { + Thread.sleep(200) // Wait before retrying to avoid busy-waiting + } + } + + retrievedItems.toList + } + + // Wait for both writer and reader to complete + val retrievedItems = Await.result(readerFuture, 30.seconds) + Await.result(writerFuture, 30.seconds) + + // Verify that the retrieved items match the original items + assert( + retrievedItems.toSet == allItems.toSet, + "All items should be read correctly while writing is happening concurrently." + ) + } + + it should "read all items using ranges correctly" in { + val allItems = generateSampleItems() + + // Write items + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + allItems.foreach(writer.putOne) + writer.close() + + // Read all items using ranges + val batchSize = 1500 + val ranges = allItems.indices.grouped(batchSize).toList + val retrievedItems = ranges.flatMap { range => + document.getRange(range.head, range.lastOption.getOrElse(range.head) + 1).toList + } + + assert(retrievedItems.size == allItems.size) + + // Verify that the retrieved items match the original items + assert( + retrievedItems.toSet == allItems.toSet, + "All items should be retrieved correctly using ranges." + ) + } + + it should "retrieve items correctly using getAfter" in { + val allItems = generateSampleItems() + + // Write items + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + allItems.foreach(writer.putOne) + writer.close() + + // Test getAfter for various offsets + val offsets = List(0, allItems.length / 2, allItems.length - 1) + offsets.foreach { offset => + val expectedItems = if (offset < allItems.length) { + allItems.slice(offset, allItems.length) + } else { + List.empty[T] + } + + val retrievedItems = document.getAfter(offset).toList + assert( + retrievedItems == expectedItems, + s"getAfter($offset) did not return the expected items. Expected: $expectedItems, Got: $retrievedItems" + ) + } + + // Test getAfter for an offset beyond the range + val invalidOffset = allItems.length + val retrievedItems = document.getAfter(invalidOffset).toList + assert( + retrievedItems.isEmpty, + s"getAfter($invalidOffset) should return an empty list, but got: $retrievedItems" + ) + } + + it should "get the count of records correctly" in { + val allItems = generateSampleItems() + + // Write items + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + allItems.foreach(writer.putOne) + writer.close() + + assert( + allItems.length == document.getCount, + "getCount should return the same number with allItems" + ) + } + + /** + * Generates a sample list of items for testing. + * Subclasses should override this to provide their specific sample items. + * @return a list of sample items of type T. + */ + def generateSampleItems(): List[T] +} diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala new file mode 100644 index 00000000000..3c2a22c6894 --- /dev/null +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -0,0 +1,146 @@ +package edu.uci.ics.amber.storage.result.iceberg + +import edu.uci.ics.amber.core.storage.{IcebergCatalogInstance, StorageConfig} +import edu.uci.ics.amber.core.storage.model.VirtualDocumentSpec +import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument +import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import edu.uci.ics.amber.util.IcebergUtil +import edu.uci.ics.texera.dao.MockTexeraDB +import org.apache.iceberg.catalog.Catalog +import org.apache.iceberg.data.Record +import org.apache.iceberg.{Schema => IcebergSchema} +import org.apache.iceberg.catalog.TableIdentifier +import org.scalatest.BeforeAndAfterAll + +import java.sql.Timestamp +import java.util.UUID + +class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfterAll { + + var amberSchema: Schema = _ + var icebergSchema: IcebergSchema = _ + var serde: (IcebergSchema, Tuple) => Record = _ + var deserde: (IcebergSchema, Record) => Tuple = _ + var catalog: Catalog = _ + val tableNamespace = "test_namespace" + var tableName: String = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + // Initialize Amber Schema with all possible attribute types + amberSchema = Schema( + List( + new Attribute("col-string", AttributeType.STRING), + new Attribute("col-int", AttributeType.INTEGER), + new Attribute("col-bool", AttributeType.BOOLEAN), + new Attribute("col-long", AttributeType.LONG), + new Attribute("col-double", AttributeType.DOUBLE), + new Attribute("col-timestamp", AttributeType.TIMESTAMP), + new Attribute("col-binary", AttributeType.BINARY) + ) + ) + + // Initialize Iceberg Schema + icebergSchema = IcebergUtil.toIcebergSchema(amberSchema) + + // Initialize serialization and deserialization functions + serde = IcebergUtil.toGenericRecord + deserde = (schema, record) => IcebergUtil.fromRecord(record, amberSchema) + + // Initialize the the Iceberg catalog + catalog = IcebergUtil.createHadoopCatalog( + "iceberg_document_test", + StorageConfig.fileStorageDirectoryPath + ) + IcebergCatalogInstance.replaceInstance(catalog) + } + + override def beforeEach(): Unit = { + // Generate a unique table name for each test + tableName = s"test_table_${UUID.randomUUID().toString.replace("-", "")}" + super.beforeEach() + } + + override def afterAll(): Unit = { + super.afterAll() + } + + // Implementation of getDocument + override def getDocument: IcebergDocument[Tuple] = { + new IcebergDocument[Tuple]( + tableNamespace, + tableName, + icebergSchema, + serde, + deserde + ) + } + + // Implementation of isDocumentCleared + override def isDocumentCleared: Boolean = { + val identifier = TableIdentifier.of(tableNamespace, tableName) + !catalog.tableExists(identifier) + } + + override def generateSampleItems(): List[Tuple] = { + val baseTuples = List( + Tuple + .builder(amberSchema) + .add("col-string", AttributeType.STRING, "Hello World") + .add("col-int", AttributeType.INTEGER, 42) + .add("col-bool", AttributeType.BOOLEAN, true) + .add("col-long", AttributeType.LONG, 12345678901234L) + .add("col-double", AttributeType.DOUBLE, 3.14159) + .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(System.currentTimeMillis())) + .add("col-binary", AttributeType.BINARY, Array[Byte](0, 1, 2, 3, 4, 5, 6, 7)) + .build(), + Tuple + .builder(amberSchema) + .add("col-string", AttributeType.STRING, "") + .add("col-int", AttributeType.INTEGER, -1) + .add("col-bool", AttributeType.BOOLEAN, false) + .add("col-long", AttributeType.LONG, -98765432109876L) + .add("col-double", AttributeType.DOUBLE, -0.001) + .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(0L)) + .add("col-binary", AttributeType.BINARY, Array[Byte](127, -128, 0, 64)) + .build(), + Tuple + .builder(amberSchema) + .add("col-string", AttributeType.STRING, "Special Characters: \n\t\r") + .add("col-int", AttributeType.INTEGER, Int.MaxValue) + .add("col-bool", AttributeType.BOOLEAN, true) + .add("col-long", AttributeType.LONG, Long.MaxValue) + .add("col-double", AttributeType.DOUBLE, Double.MaxValue) + .add("col-timestamp", AttributeType.TIMESTAMP, new Timestamp(1234567890L)) + .add("col-binary", AttributeType.BINARY, Array[Byte](1, 2, 3, 4, 5)) + .build() + ) + + def generateRandomBinary(size: Int): Array[Byte] = { + val array = new Array[Byte](size) + scala.util.Random.nextBytes(array) + array + } + + val additionalTuples = (1 to 20000).map { i => + Tuple + .builder(amberSchema) + .add("col-string", AttributeType.STRING, if (i % 7 == 0) null else s"Generated String $i") + .add("col-int", AttributeType.INTEGER, if (i % 5 == 0) null else i) + .add("col-bool", AttributeType.BOOLEAN, if (i % 6 == 0) null else i % 2 == 0) + .add("col-long", AttributeType.LONG, if (i % 4 == 0) null else i.toLong * 1000000L) + .add("col-double", AttributeType.DOUBLE, if (i % 3 == 0) null else i * 0.12345) + .add( + "col-timestamp", + AttributeType.TIMESTAMP, + if (i % 8 == 0) null + else new Timestamp(System.currentTimeMillis() + i * 1000L) + ) + .add("col-binary", AttributeType.BINARY, if (i % 9 == 0) null else generateRandomBinary(10)) + .build() + } + + baseTuples ++ additionalTuples + } +} diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/util/IcebergUtilSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/util/IcebergUtilSpec.scala new file mode 100644 index 00000000000..6fc07535e0b --- /dev/null +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/util/IcebergUtilSpec.scala @@ -0,0 +1,183 @@ +package edu.uci.ics.amber.util + +import edu.uci.ics.amber.core.tuple.{AttributeType, Schema, Tuple} +import edu.uci.ics.amber.util.IcebergUtil.toIcebergSchema +import org.apache.iceberg.types.Types +import org.apache.iceberg.{Schema => IcebergSchema} +import org.apache.iceberg.data.GenericRecord +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.ByteBuffer +import java.sql.Timestamp +import java.time.{LocalDateTime, ZoneId} +import scala.jdk.CollectionConverters._ + +class IcebergUtilSpec extends AnyFlatSpec { + + val texeraSchema: Schema = Schema() + .add("test-1", AttributeType.INTEGER) + .add("test-2", AttributeType.LONG) + .add("test-3", AttributeType.BOOLEAN) + .add("test-4", AttributeType.DOUBLE) + .add("test-5", AttributeType.TIMESTAMP) + .add("test-6", AttributeType.STRING) + .add("test-7", AttributeType.BINARY) + + val icebergSchema: IcebergSchema = new IcebergSchema( + List( + Types.NestedField.optional(1, "test-1", Types.IntegerType.get()), + Types.NestedField.optional(2, "test-2", Types.LongType.get()), + Types.NestedField.optional(3, "test-3", Types.BooleanType.get()), + Types.NestedField.optional(4, "test-4", Types.DoubleType.get()), + Types.NestedField.optional(5, "test-5", Types.TimestampType.withoutZone()), + Types.NestedField.optional(6, "test-6", Types.StringType.get()), + Types.NestedField.optional(7, "test-7", Types.BinaryType.get()) + ).asJava + ) + + behavior of "IcebergUtil" + + it should "convert from AttributeType to Iceberg Type correctly" in { + assert(IcebergUtil.toIcebergType(AttributeType.INTEGER) == Types.IntegerType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.LONG) == Types.LongType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.BOOLEAN) == Types.BooleanType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.DOUBLE) == Types.DoubleType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.TIMESTAMP) == Types.TimestampType.withoutZone()) + assert(IcebergUtil.toIcebergType(AttributeType.STRING) == Types.StringType.get()) + assert(IcebergUtil.toIcebergType(AttributeType.BINARY) == Types.BinaryType.get()) + } + + it should "convert from Iceberg Type to AttributeType correctly" in { + assert(IcebergUtil.fromIcebergType(Types.IntegerType.get()) == AttributeType.INTEGER) + assert(IcebergUtil.fromIcebergType(Types.LongType.get()) == AttributeType.LONG) + assert(IcebergUtil.fromIcebergType(Types.BooleanType.get()) == AttributeType.BOOLEAN) + assert(IcebergUtil.fromIcebergType(Types.DoubleType.get()) == AttributeType.DOUBLE) + assert( + IcebergUtil.fromIcebergType(Types.TimestampType.withoutZone()) == AttributeType.TIMESTAMP + ) + assert(IcebergUtil.fromIcebergType(Types.StringType.get()) == AttributeType.STRING) + assert(IcebergUtil.fromIcebergType(Types.BinaryType.get()) == AttributeType.BINARY) + } + + it should "convert from Texera Schema to Iceberg Schema correctly" in { + assert(IcebergUtil.toIcebergSchema(texeraSchema).sameSchema(icebergSchema)) + } + + it should "convert from Iceberg Schema to Texera Schema correctly" in { + assert(IcebergUtil.fromIcebergSchema(icebergSchema) == texeraSchema) + } + + it should "convert Texera Tuple to Iceberg GenericRecord correctly" in { + val tuple = Tuple + .builder(texeraSchema) + .addSequentially( + Array( + Int.box(42), + Long.box(123456789L), + Boolean.box(true), + Double.box(3.14), + new Timestamp(10000L), + "hello world", + Array[Byte](1, 2, 3, 4) + ) + ) + .build() + + val record = IcebergUtil.toGenericRecord(toIcebergSchema(tuple.schema), tuple) + + assert(record.getField("test-1") == 42) + assert(record.getField("test-2") == 123456789L) + assert(record.getField("test-3") == true) + assert(record.getField("test-4") == 3.14) + assert(record.getField("test-5") == new Timestamp(10000L).toLocalDateTime) + assert(record.getField("test-6") == "hello world") + assert(record.getField("test-7") == ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))) + + val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema) + assert(tupleFromRecord == tuple) + } + + it should "convert Texera Tuple with null values to Iceberg GenericRecord correctly" in { + val tuple = Tuple + .builder(texeraSchema) + .addSequentially( + Array( + Int.box(42), // Non-null + null, // Null Long + Boolean.box(true), // Non-null + null, // Null Double + null, // Null Timestamp + "hello world", // Non-null String + null // Null Binary + ) + ) + .build() + + val record = IcebergUtil.toGenericRecord(toIcebergSchema(tuple.schema), tuple) + + assert(record.getField("test-1") == 42) + assert(record.getField("test-2") == null) + assert(record.getField("test-3") == true) + assert(record.getField("test-4") == null) + assert(record.getField("test-5") == null) + assert(record.getField("test-6") == "hello world") + assert(record.getField("test-7") == null) + + val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema) + assert(tupleFromRecord == tuple) + } + + it should "convert a fully null Texera Tuple to Iceberg GenericRecord correctly" in { + val tuple = Tuple + .builder(texeraSchema) + .addSequentially( + Array( + null, // Null Integer + null, // Null Long + null, // Null Boolean + null, // Null Double + null, // Null Timestamp + null, // Null String + null // Null Binary + ) + ) + .build() + + val record = IcebergUtil.toGenericRecord(toIcebergSchema(tuple.schema), tuple) + + assert(record.getField("test-1") == null) + assert(record.getField("test-2") == null) + assert(record.getField("test-3") == null) + assert(record.getField("test-4") == null) + assert(record.getField("test-5") == null) + assert(record.getField("test-6") == null) + assert(record.getField("test-7") == null) + + val tupleFromRecord = IcebergUtil.fromRecord(record, texeraSchema) + assert(tupleFromRecord == tuple) + } + + it should "convert Iceberg GenericRecord to Texera Tuple correctly" in { + val record = GenericRecord.create(icebergSchema) + record.setField("test-1", 42) + record.setField("test-2", 123456789L) + record.setField("test-3", true) + record.setField("test-4", 3.14) + record.setField( + "test-5", + LocalDateTime.ofInstant(new Timestamp(10000L).toInstant, ZoneId.systemDefault()) + ) + record.setField("test-6", "hello world") + record.setField("test-7", ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))) + + val tuple = IcebergUtil.fromRecord(record, texeraSchema) + + assert(tuple.getField[Integer]("test-1") == 42) + assert(tuple.getField[Long]("test-2") == 123456789L) + assert(tuple.getField[Boolean]("test-3") == true) + assert(tuple.getField[Double]("test-4") == 3.14) + assert(tuple.getField[Timestamp]("test-5") == new Timestamp(10000L)) + assert(tuple.getField[String]("test-6") == "hello world") + assert(tuple.getField[Array[Byte]]("test-7") sameElements Array[Byte](1, 2, 3, 4)) + } +} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala index 9c9b2e7fad1..99ac9796820 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala @@ -9,6 +9,7 @@ import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode import edu.uci.ics.amber.core.workflow.PortIdentity class ProgressiveSinkOpExec( + workerId: Int, outputMode: OutputMode, storageKey: String, workflowIdentity: WorkflowIdentity @@ -17,7 +18,7 @@ class ProgressiveSinkOpExec( ResultStorage .getOpResultStorage(workflowIdentity) .get(storageKey) - .writer() + .writer(workerId.toString) override def open(): Unit = { writer.open()