Skip to content

Commit

Permalink
refactor the util and config
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Dec 20, 2024
1 parent 385499b commit 1edb551
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 44 deletions.
22 changes: 19 additions & 3 deletions core/workflow-core/src/main/resources/storage-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,23 @@ storage:
url: "mongodb://localhost:27017"
database: "texera_storage"
commit-batch-size: 1000
iceberg:
catalog:
jdbc: # currently we only support storing catalog info via jdbc, i.e. https://iceberg.apache.org/docs/1.7.1/jdbc/
url: "jdbc:mysql://0.0.0.0:3306/texera_iceberg?serverTimezone=UTC"
username: "root"
password: "123456"
table:
commit:
batch-size: 4096 # decide the buffer size of our IcebergTableWriter
retry:
# retry configures the OCC parameter for concurrent write operations in Iceberg
# Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/
# Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties
num-retries: 10
min-wait-ms: 100 # 0.1s
max-wait-ms: 10000 # 10s
jdbc:
url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC"
username: ""
password: ""
url: "jdbc:mysql://0.0.0.0:3306/texera_db?serverTimezone=UTC"
username: "root"
password: "123456"
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package edu.uci.ics.amber.core.storage
import edu.uci.ics.amber.util.PathUtils.corePath
import org.yaml.snakeyaml.Yaml

import java.net.URI
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._

Expand All @@ -14,34 +15,117 @@ object StorageConfig {

val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap
val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCatalogMap = icebergMap("catalog").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCatalogJdbcMap =
icebergCatalogMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap
val jdbcMap = storageMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
javaConf
.updated("storage", storageMap.updated("mongodb", mongodbMap).updated("jdbc", jdbcMap))

javaConf.updated(
"storage",
storageMap
.updated("mongodb", mongodbMap)
.updated(
"iceberg",
icebergMap
.updated(
"catalog",
icebergCatalogMap.updated("jdbc", icebergCatalogJdbcMap)
)
.updated(
"table",
icebergTableMap.updated(
"commit",
icebergCommitMap.updated("retry", icebergRetryMap)
)
)
)
.updated("jdbc", jdbcMap)
)
}

// Result storage mode
val resultStorageMode: String =
conf("storage").asInstanceOf[Map[String, Any]]("result-storage-mode").asInstanceOf[String]

// For MongoDB specifics
// MongoDB configurations
val mongodbUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val mongodbDatabaseName: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("database")
.asInstanceOf[String]

val mongodbBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("commit-batch-size")
.asInstanceOf[Int]

// Iceberg catalog configurations
val icebergCatalogUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val icebergCatalogUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
.asInstanceOf[String]

val icebergCatalogPassword: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

val icebergTableCommitBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("batch-size")
.asInstanceOf[Int]

val icebergTableCommitNumRetries: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("num-retries")
.asInstanceOf[Int]

val icebergTableCommitMinRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("min-wait-ms")
.asInstanceOf[Int]

val icebergTableCommitMaxRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("max-wait-ms")
.asInstanceOf[Int]

// JDBC configurations
val jdbcUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

// For jdbc specifics
val jdbcUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
Expand All @@ -52,7 +136,7 @@ object StorageConfig {
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

// For file storage specifics
val fileStorageDirectoryUri =
// File storage configurations
val fileStorageDirectoryUri: URI =
corePath.resolve("amber").resolve("user-resources").resolve("workflow-results").toUri
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ class IcebergDocument[T >: Null <: AnyRef](
) extends VirtualDocument[T] {

private val lock = new ReentrantReadWriteLock()

synchronized {
IcebergUtil
.loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = true)
.get
IcebergUtil.createTable(catalog, tableNamespace, tableName, tableSchema)
}

/**
* Returns the URI of the table location.
*/
override def getURI: URI = {
val table = IcebergUtil
.loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = false)
.loadTable(catalog, tableNamespace, tableName)
.getOrElse(
throw new NoSuchTableException(f"table ${tableNamespace}.${tableName} doesn't exist")
)
Expand Down Expand Up @@ -64,9 +63,7 @@ class IcebergDocument[T >: Null <: AnyRef](
IcebergUtil.loadTable(
catalog,
tableNamespace,
tableName,
tableSchema,
createIfNotExist = false
tableName
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package edu.uci.ics.amber.core.storage.result.iceberg

import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.core.storage.model.BufferedItemWriter
import edu.uci.ics.amber.core.storage.util.StorageUtil.withLock
import edu.uci.ics.amber.core.storage.util.StorageUtil.{withLock, withReadLock}
import edu.uci.ics.amber.util.IcebergUtil
import org.apache.iceberg.{Schema, Table}
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
Expand All @@ -24,14 +25,13 @@ class IcebergTableWriter[T](

private val lock = new ReentrantLock()
private val buffer = new ArrayBuffer[T]()
override val bufferSize: Int = 3000
override val bufferSize: Int = StorageConfig.icebergTableCommitBatchSize

// Load the Iceberg table
private val table: Table = synchronized {
private val table: Table =
IcebergUtil
.loadTable(catalog, tableNamespace, tableName, tableSchema, createIfNotExist = false)
.loadTable(catalog, tableNamespace, tableName)
.get
}

override def open(): Unit =
withLock(lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ import org.apache.iceberg.types.Types
import org.apache.iceberg.data.{GenericRecord, Record}
import org.apache.iceberg.jdbc.JdbcCatalog
import org.apache.iceberg.types.Type.PrimitiveType
import org.apache.iceberg.{CatalogProperties, PartitionSpec, Table, Schema => IcebergSchema}
import org.apache.iceberg.{
CatalogProperties,
PartitionSpec,
Table,
TableProperties,
Schema => IcebergSchema
}

import java.net.URI
import java.nio.ByteBuffer
Expand Down Expand Up @@ -50,32 +56,36 @@ object IcebergUtil {
catalog
}

def loadTable(
def createTable(
catalog: Catalog,
tableNamespace: String,
tableName: String,
tableSchema: IcebergSchema,
createIfNotExist: Boolean
): Option[Table] = {
tableSchema: IcebergSchema
): Table = {
val tableProperties = Map(
"commit.retry.num-retries" -> "10",
"commit.retry.min-wait-ms" -> "10"
TableProperties.COMMIT_NUM_RETRIES -> StorageConfig.icebergTableCommitNumRetries.toString,
TableProperties.COMMIT_MAX_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMaxRetryWaitMs.toString,
TableProperties.COMMIT_MIN_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMinRetryWaitMs.toString
)
val identifier = TableIdentifier.of(tableNamespace, tableName)
if (!catalog.tableExists(identifier)) {
if (!createIfNotExist) {
return None
}
Some(
catalog.createTable(
identifier,
tableSchema,
PartitionSpec.unpartitioned,
tableProperties.asJava
)
)
} else {
catalog.createTable(
identifier,
tableSchema,
PartitionSpec.unpartitioned,
tableProperties.asJava
)
}

def loadTable(
catalog: Catalog,
tableNamespace: String,
tableName: String
): Option[Table] = {
val identifier = TableIdentifier.of(tableNamespace, tableName)
try {
Some(catalog.loadTable(identifier))
} catch {
case exception: Exception => None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] {
val catalog: Catalog = IcebergUtil.createJdbcCatalog(
"iceberg_document_test",
StorageConfig.fileStorageDirectoryUri,
StorageConfig.jdbcUrl,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
StorageConfig.icebergCatalogUrl,
StorageConfig.icebergCatalogUsername,
StorageConfig.icebergCatalogPassword
)

val tableNamespace = "test_namespace"
Expand Down

0 comments on commit 1edb551

Please sign in to comment.