diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala index eba2c81df2..69527bf371 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala @@ -183,6 +183,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with // rely on the server-side result cleanup logic. case OpResultStorage.MONGODB => MongoDatabaseManager.dropCollection(collectionName) + case OpResultStorage.ICEBERG => } }) } catch { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala index 9144fa63d4..e2673cacb9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala @@ -93,7 +93,9 @@ class WorkflowCompiler( if (!storage.contains(storageKey)) { // get the schema for result storage in certain mode val sinkStorageSchema: Option[Schema] = - if (storageType == OpResultStorage.MONGODB) { + if ( + storageType == OpResultStorage.MONGODB || storageType == OpResultStorage.ICEBERG + ) { // use the output schema on the first output port as the schema for storage Some(schema.right.get) } else { diff --git a/core/build.sbt b/core/build.sbt index bd24d6e42b..ff37461914 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -15,7 +15,8 @@ lazy val WorkflowCompilingService = (project in file("workflow-compiling-service .settings( dependencyOverrides ++= Seq( // override it as io.dropwizard 4 require 2.16.1 or higher - "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1" + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1", ) ) diff --git a/core/workflow-core/src/main/resources/storage-config.yaml b/core/workflow-core/src/main/resources/storage-config.yaml index 66db873a66..da7a99d277 100644 --- a/core/workflow-core/src/main/resources/storage-config.yaml +++ b/core/workflow-core/src/main/resources/storage-config.yaml @@ -1,5 +1,5 @@ storage: - result-storage-mode: memory + result-storage-mode: iceberg mongodb: url: "mongodb://localhost:27017" database: "texera_storage" @@ -11,6 +11,7 @@ storage: username: "root" password: "123456" table: + namespace: "operator-result" commit: batch-size: 4096 # decide the buffer size of our IcebergTableWriter retry: diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala index 968db00f61..c71b94a475 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala @@ -89,6 +89,12 @@ object StorageConfig { .asInstanceOf[Map[String, Any]]("password") .asInstanceOf[String] + val icebergTableNamespace: String = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("namespace") + .asInstanceOf[String] + val icebergTableCommitBatchSize: Int = conf("storage") .asInstanceOf[Map[String, Any]]("iceberg") .asInstanceOf[Map[String, Any]]("table") diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala index ab69d6f94d..e2ddbd5a79 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala @@ -3,8 +3,12 @@ package edu.uci.ics.amber.core.storage.result import com.typesafe.scalalogging.LazyLogging import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.core.storage.model.VirtualDocument +import edu.uci.ics.amber.core.storage.result.iceberg.IcebergDocument import edu.uci.ics.amber.core.tuple.{Schema, Tuple} +import edu.uci.ics.amber.util.IcebergUtil import edu.uci.ics.amber.virtualidentity.OperatorIdentity +import org.apache.iceberg.data.Record +import org.apache.iceberg.{Schema => IcebergSchema} import java.util.concurrent.ConcurrentHashMap import scala.collection.convert.ImplicitConversions.`iterator asScala` @@ -13,6 +17,7 @@ object OpResultStorage { val defaultStorageMode: String = StorageConfig.resultStorageMode.toLowerCase val MEMORY = "memory" val MONGODB = "mongodb" + val ICEBERG = "iceberg" } /** @@ -60,7 +65,7 @@ class OpResultStorage extends Serializable with LazyLogging { val storage: VirtualDocument[Tuple] = if (mode == "memory") { new MemoryDocument[Tuple](key.id) - } else { + } else if (mode == OpResultStorage.MONGODB) { try { val fromDocument = schema.map(Tuple.fromDocument) new MongoDocument[Tuple](executionId + key, Tuple.toDocument, fromDocument) @@ -71,6 +76,27 @@ class OpResultStorage extends Serializable with LazyLogging { // fall back to memory new MemoryDocument[Tuple](key.id) } + } else { + val icebergCatalog = IcebergUtil.createJdbcCatalog( + "operator-result", + StorageConfig.fileStorageDirectoryUri, + StorageConfig.icebergCatalogUrl, + StorageConfig.icebergCatalogUsername, + StorageConfig.icebergCatalogPassword + ) + val icebergSchema = IcebergUtil.toIcebergSchema(schema.get) + val serde: Tuple => Record = tuple => IcebergUtil.toGenericRecord(tuple) + val deserde: (IcebergSchema, Record) => Tuple = (_, record) => + IcebergUtil.fromRecord(record, schema.get) + + new IcebergDocument[Tuple]( + icebergCatalog, + StorageConfig.icebergTableNamespace, + executionId + key, + icebergSchema, + serde, + deserde + ) } cache.put(key, (storage, schema)) storage