diff --git a/core/workflow-core/src/main/resources/storage-config.yaml b/core/workflow-core/src/main/resources/storage-config.yaml index 2b6ed20ccaa..66db873a663 100644 --- a/core/workflow-core/src/main/resources/storage-config.yaml +++ b/core/workflow-core/src/main/resources/storage-config.yaml @@ -4,7 +4,23 @@ storage: url: "mongodb://localhost:27017" database: "texera_storage" commit-batch-size: 1000 + iceberg: + catalog: + jdbc: # currently we only support storing catalog info via jdbc, i.e. https://iceberg.apache.org/docs/1.7.1/jdbc/ + url: "jdbc:mysql://0.0.0.0:3306/texera_iceberg?serverTimezone=UTC" + username: "root" + password: "123456" + table: + commit: + batch-size: 4096 # decide the buffer size of our IcebergTableWriter + retry: + # retry configures the OCC parameter for concurrent write operations in Iceberg + # Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/ + # Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties + num-retries: 10 + min-wait-ms: 100 # 0.1s + max-wait-ms: 10000 # 10s jdbc: - url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC" - username: "" - password: "" \ No newline at end of file + url: "jdbc:mysql://0.0.0.0:3306/texera_db?serverTimezone=UTC" + username: "root" + password: "123456" \ No newline at end of file diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala index 92a676032e3..968db00f61f 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala @@ -3,6 +3,7 @@ package edu.uci.ics.amber.core.storage import edu.uci.ics.amber.util.PathUtils.corePath import org.yaml.snakeyaml.Yaml +import java.net.URI import java.util.{Map => JMap} import scala.jdk.CollectionConverters._ @@ -14,34 +15,117 @@ object StorageConfig { val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergCatalogMap = icebergMap("catalog").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergCatalogJdbcMap = + icebergCatalogMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap + val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap val jdbcMap = storageMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap - javaConf - .updated("storage", storageMap.updated("mongodb", mongodbMap).updated("jdbc", jdbcMap)) + + javaConf.updated( + "storage", + storageMap + .updated("mongodb", mongodbMap) + .updated( + "iceberg", + icebergMap + .updated( + "catalog", + icebergCatalogMap.updated("jdbc", icebergCatalogJdbcMap) + ) + .updated( + "table", + icebergTableMap.updated( + "commit", + icebergCommitMap.updated("retry", icebergRetryMap) + ) + ) + ) + .updated("jdbc", jdbcMap) + ) } + // Result storage mode val resultStorageMode: String = conf("storage").asInstanceOf[Map[String, Any]]("result-storage-mode").asInstanceOf[String] - // For MongoDB specifics + // MongoDB configurations val mongodbUrl: String = conf("storage") .asInstanceOf[Map[String, Any]]("mongodb") .asInstanceOf[Map[String, Any]]("url") .asInstanceOf[String] + val mongodbDatabaseName: String = conf("storage") .asInstanceOf[Map[String, Any]]("mongodb") .asInstanceOf[Map[String, Any]]("database") .asInstanceOf[String] + val mongodbBatchSize: Int = conf("storage") .asInstanceOf[Map[String, Any]]("mongodb") .asInstanceOf[Map[String, Any]]("commit-batch-size") .asInstanceOf[Int] + // Iceberg catalog configurations + val icebergCatalogUrl: String = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("catalog") + .asInstanceOf[Map[String, Any]]("jdbc") + .asInstanceOf[Map[String, Any]]("url") + .asInstanceOf[String] + + val icebergCatalogUsername: String = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("catalog") + .asInstanceOf[Map[String, Any]]("jdbc") + .asInstanceOf[Map[String, Any]]("username") + .asInstanceOf[String] + + val icebergCatalogPassword: String = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("catalog") + .asInstanceOf[Map[String, Any]]("jdbc") + .asInstanceOf[Map[String, Any]]("password") + .asInstanceOf[String] + + val icebergTableCommitBatchSize: Int = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("commit") + .asInstanceOf[Map[String, Any]]("batch-size") + .asInstanceOf[Int] + + val icebergTableCommitNumRetries: Int = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("commit") + .asInstanceOf[Map[String, Any]]("retry") + .asInstanceOf[Map[String, Any]]("num-retries") + .asInstanceOf[Int] + + val icebergTableCommitMinRetryWaitMs: Int = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("commit") + .asInstanceOf[Map[String, Any]]("retry") + .asInstanceOf[Map[String, Any]]("min-wait-ms") + .asInstanceOf[Int] + + val icebergTableCommitMaxRetryWaitMs: Int = conf("storage") + .asInstanceOf[Map[String, Any]]("iceberg") + .asInstanceOf[Map[String, Any]]("table") + .asInstanceOf[Map[String, Any]]("commit") + .asInstanceOf[Map[String, Any]]("retry") + .asInstanceOf[Map[String, Any]]("max-wait-ms") + .asInstanceOf[Int] + + // JDBC configurations val jdbcUrl: String = conf("storage") .asInstanceOf[Map[String, Any]]("jdbc") .asInstanceOf[Map[String, Any]]("url") .asInstanceOf[String] - // For jdbc specifics val jdbcUsername: String = conf("storage") .asInstanceOf[Map[String, Any]]("jdbc") .asInstanceOf[Map[String, Any]]("username") @@ -52,7 +136,7 @@ object StorageConfig { .asInstanceOf[Map[String, Any]]("password") .asInstanceOf[String] - // For file storage specifics - val fileStorageDirectoryUri = + // File storage configurations + val fileStorageDirectoryUri: URI = corePath.resolve("amber").resolve("user-resources").resolve("workflow-results").toUri } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala index 51705e77a62..91b946e3bdb 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -23,10 +23,9 @@ class IcebergDocument[T >: Null <: AnyRef]( ) extends VirtualDocument[T] { private val lock = new ReentrantReadWriteLock() + synchronized { - IcebergUtil - .loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = true) - .get + IcebergUtil.createTable(catalog, tableNamespace, tableName, tableSchema) } /** @@ -34,7 +33,7 @@ class IcebergDocument[T >: Null <: AnyRef]( */ override def getURI: URI = { val table = IcebergUtil - .loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = false) + .loadTable(catalog, tableNamespace, tableName) .getOrElse( throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist") ) @@ -64,9 +63,7 @@ class IcebergDocument[T >: Null <: AnyRef]( IcebergUtil.loadTable( catalog, tableNamespace, - tableName, - tableSchema, - createIfNotExist = false + tableName ) } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 4802e0001cd..2d5c328fb9f 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -1,7 +1,8 @@ package edu.uci.ics.amber.core.storage.result.iceberg +import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.core.storage.model.BufferedItemWriter -import edu.uci.ics.amber.core.storage.util.StorageUtil.withLock +import edu.uci.ics.amber.core.storage.util.StorageUtil.{withLock, withReadLock} import edu.uci.ics.amber.util.IcebergUtil import org.apache.iceberg.{Schema, Table} import org.apache.iceberg.catalog.{Catalog, TableIdentifier} @@ -24,14 +25,13 @@ class IcebergTableWriter[T]( private val lock = new ReentrantLock() private val buffer = new ArrayBuffer[T]() - override val bufferSize: Int = 3000 + override val bufferSize: Int = StorageConfig.icebergTableCommitBatchSize // Load the Iceberg table - private val table: Table = synchronized { + private val table: Table = IcebergUtil - .loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = false) + .loadTable(catalog, tableNamespace, tableName) .get - } override def open(): Unit = withLock(lock) { diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala index 273a883416a..2dca393f833 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala @@ -8,7 +8,13 @@ import org.apache.iceberg.types.Types import org.apache.iceberg.data.{GenericRecord, Record} import org.apache.iceberg.jdbc.JdbcCatalog import org.apache.iceberg.types.Type.PrimitiveType -import org.apache.iceberg.{CatalogProperties, PartitionSpec, Table, Schema => IcebergSchema} +import org.apache.iceberg.{ + CatalogProperties, + PartitionSpec, + Table, + TableProperties, + Schema => IcebergSchema +} import java.net.URI import java.nio.ByteBuffer @@ -50,32 +56,36 @@ object IcebergUtil { catalog } - def loadTable( + def createTable( catalog: Catalog, tableNamespace: String, tableName: String, - tableSchema: IcebergSchema, - createIfNotExist: Boolean - ): Option[Table] = { + tableSchema: IcebergSchema + ): Table = { val tableProperties = Map( - "commit.retry.num-retries" -> "10", - "commit.retry.min-wait-ms" -> "10" + TableProperties.COMMIT_NUM_RETRIES -> StorageConfig.icebergTableCommitNumRetries.toString, + TableProperties.COMMIT_MAX_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMaxRetryWaitMs.toString, + TableProperties.COMMIT_MIN_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMinRetryWaitMs.toString ) val identifier = TableIdentifier.of(tableNamespace, tableName) - if (!catalog.tableExists(identifier)) { - if (!createIfNotExist) { - return None - } - Some( - catalog.createTable( - identifier, - tableSchema, - PartitionSpec.unpartitioned, - tableProperties.asJava - ) - ) - } else { + catalog.createTable( + identifier, + tableSchema, + PartitionSpec.unpartitioned, + tableProperties.asJava + ) + } + + def loadTable( + catalog: Catalog, + tableNamespace: String, + tableName: String + ): Option[Table] = { + val identifier = TableIdentifier.of(tableNamespace, tableName) + try { Some(catalog.loadTable(identifier)) + } catch { + case exception: Exception => None } } diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 88b4ee67755..ba594a1df51 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -42,9 +42,9 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] { val catalog: Catalog = IcebergUtil.createJdbcCatalog( "iceberg_document_test", StorageConfig.fileStorageDirectoryUri, - StorageConfig.jdbcUrl, - StorageConfig.jdbcUsername, - StorageConfig.jdbcPassword + StorageConfig.icebergCatalogUrl, + StorageConfig.icebergCatalogUsername, + StorageConfig.icebergCatalogPassword ) val tableNamespace = "test_namespace"