Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IcebergDocument as one implementation of VirtualDocument #3147

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1fe9f17
add itemized file document and partition document
bobbai00 Dec 10, 2024
219b82d
add unit test for PartitionDocument
bobbai00 Dec 10, 2024
e446e9c
add more to unit tests
bobbai00 Dec 10, 2024
9627b25
make PartitionDocument return T
bobbai00 Dec 11, 2024
b85fd45
fix partition document test
bobbai00 Dec 11, 2024
8e6fec3
refining the documents
bobbai00 Dec 12, 2024
288aea4
add type R to PartitionedItemizedFileDocument
bobbai00 Dec 12, 2024
c3a1d00
do a rename
bobbai00 Dec 12, 2024
97c601e
adding the arrow file document, TODO: fix the test
bobbai00 Dec 12, 2024
e2c5515
pass the compilation
bobbai00 Dec 12, 2024
c17a54e
finish arrow document
bobbai00 Dec 14, 2024
bc38cc4
start to add some iceberg related
bobbai00 Dec 16, 2024
51dd7cf
finish initial iceberg writer
bobbai00 Dec 17, 2024
481c437
finish initial version of iceberg
bobbai00 Dec 19, 2024
0274f66
refactor test parts
bobbai00 Dec 19, 2024
4663fef
finish 1st viable version
bobbai00 Dec 19, 2024
9607f98
fix the append read
bobbai00 Dec 19, 2024
d2d0ed7
finish append read
bobbai00 Dec 20, 2024
f4ea0e3
finish concurrent write test
bobbai00 Dec 20, 2024
e46311c
resolve the binary type issue
bobbai00 Dec 20, 2024
5467b6f
refactor the util and config
bobbai00 Dec 20, 2024
d92a5ad
add a simple implementation for getRange and getAfter
bobbai00 Dec 21, 2024
15508b9
try to add iceberg as new type of result storage
bobbai00 Dec 21, 2024
c234dfd
closing to fix the dependency
bobbai00 Dec 21, 2024
4213a5b
fix the websocket connection
bobbai00 Dec 21, 2024
11ce821
add create override
bobbai00 Dec 22, 2024
129453e
add more comments and adjust the dependency
bobbai00 Dec 22, 2024
d553e32
add worker id when creating the writer
bobbai00 Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
case OpResultStorage.ICEBERG =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments to explain why it's blank.

}
})
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ class WorkflowCompiler(
if (!storage.contains(storageKey)) {
// get the schema for result storage in certain mode
val sinkStorageSchema: Option[Schema] =
if (storageType == OpResultStorage.MONGODB) {
if (
storageType == OpResultStorage.MONGODB || storageType == OpResultStorage.ICEBERG
) {
// use the output schema on the first output port as the schema for storage
Some(schema.right.get)
} else {
Expand Down
29 changes: 27 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,43 @@ lazy val WorkflowCore = (project in file("workflow-core"))
.dependsOn(DAO)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency
lazy val WorkflowOperator = (project in file("workflow-operator")).dependsOn(WorkflowCore)
lazy val WorkflowOperator = (project in file("workflow-operator"))
.dependsOn(WorkflowCore)
.settings(
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-compress" % "1.23.0", // because of the dependency introduced by iceberg
)
)
lazy val WorkflowCompilingService = (project in file("workflow-compiling-service"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1"
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1",
"org.glassfish.jersey.core" % "jersey-common" % "3.0.12"
)
)

lazy val WorkflowExecutionService = (project in file("amber"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
"com.fasterxml.jackson.core" % "jackson-core" % "2.15.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.15.1",
"org.slf4j" % "slf4j-api" % "1.7.26",
"org.glassfish.jersey.media" % "jersey-media-multipart" % "2.25.1",
"org.glassfish.jersey.core" % "jersey-common" % "2.25.1",
"org.glassfish.jersey.core" % "jersey-server" % "2.25.1",
"javax.xml.bind" % "jaxb-api" % "2.3.1",
"com.sun.xml.bind" % "jaxb-impl" % "2.3.1",
"org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813",
)
)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency

Expand Down
21 changes: 20 additions & 1 deletion core/workflow-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,25 @@ val arrowDependencies = Seq(

libraryDependencies ++= arrowDependencies

val excludeHadoopJersey = ExclusionRule(organization = "com.sun.jersey")
val excludeHadoopSlf4j = ExclusionRule(organization = "org.slf4j")
val excludeHadoopJetty = ExclusionRule(organization = "org.eclipse.jetty")
val excludeHadoopJsp = ExclusionRule(organization = "javax.servlet.jsp")
libraryDependencies ++= Seq(
"org.apache.iceberg" % "iceberg-api" % "1.7.1",
"org.apache.iceberg" % "iceberg-core" % "1.7.1",
"org.apache.iceberg" % "iceberg-parquet" % "1.7.1",
"org.apache.iceberg" % "iceberg-data" % "1.7.1",
"org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
ExclusionRule("javax.xml.bind"),
ExclusionRule("org.glassfish.jersey"),
excludeHadoopJersey,
excludeHadoopSlf4j,
excludeHadoopJetty,
excludeHadoopJsp,
)
)

/////////////////////////////////////////////////////////////////////////////
// Additional Dependencies
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -123,5 +142,5 @@ libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging
"org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit
"org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber)
"org.apache.commons" % "commons-vfs2" % "2.9.0" // for FileResolver throw VFS-related exceptions
"org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions // for Kyro serde/deserde
)
25 changes: 21 additions & 4 deletions core/workflow-core/src/main/resources/storage-config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
storage:
result-storage-mode: memory
result-storage-mode: iceberg
mongodb:
url: "mongodb://localhost:27017"
database: "texera_storage"
commit-batch-size: 1000
iceberg:
catalog:
jdbc: # currently we only support storing catalog info via jdbc, i.e. https://iceberg.apache.org/docs/1.7.1/jdbc/
url: "jdbc:mysql://0.0.0.0:3306/texera_iceberg?serverTimezone=UTC"
username: "root"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure to clean up those username and passwords.

password: "123456"
table:
namespace: "operator-result"
commit:
batch-size: 4096 # decide the buffer size of our IcebergTableWriter
retry:
# retry configures the OCC parameter for concurrent write operations in Iceberg
# Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/
# Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties
num-retries: 10
min-wait-ms: 100 # 0.1s
max-wait-ms: 10000 # 10s
jdbc:
url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC"
username: ""
password: ""
url: "jdbc:mysql://0.0.0.0:3306/texera_db?serverTimezone=UTC"
username: "root"
password: "123456"
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.util.PathUtils.corePath
import org.yaml.snakeyaml.Yaml

import java.net.URI
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._

Expand All @@ -13,34 +15,123 @@ object StorageConfig {

val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap
val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCatalogMap = icebergMap("catalog").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCatalogJdbcMap =
icebergCatalogMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap
val jdbcMap = storageMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
javaConf
.updated("storage", storageMap.updated("mongodb", mongodbMap).updated("jdbc", jdbcMap))

javaConf.updated(
"storage",
storageMap
.updated("mongodb", mongodbMap)
.updated(
"iceberg",
icebergMap
.updated(
"catalog",
icebergCatalogMap.updated("jdbc", icebergCatalogJdbcMap)
)
.updated(
"table",
icebergTableMap.updated(
"commit",
icebergCommitMap.updated("retry", icebergRetryMap)
)
)
)
.updated("jdbc", jdbcMap)
)
}

// Result storage mode
val resultStorageMode: String =
conf("storage").asInstanceOf[Map[String, Any]]("result-storage-mode").asInstanceOf[String]

// For MongoDB specifics
// MongoDB configurations
val mongodbUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val mongodbDatabaseName: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("database")
.asInstanceOf[String]

val mongodbBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("commit-batch-size")
.asInstanceOf[Int]

// Iceberg catalog configurations
val icebergCatalogUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val icebergCatalogUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
.asInstanceOf[String]

val icebergCatalogPassword: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

val icebergTableNamespace: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("namespace")
.asInstanceOf[String]

val icebergTableCommitBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("batch-size")
.asInstanceOf[Int]

val icebergTableCommitNumRetries: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("num-retries")
.asInstanceOf[Int]

val icebergTableCommitMinRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("min-wait-ms")
.asInstanceOf[Int]

val icebergTableCommitMaxRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("max-wait-ms")
.asInstanceOf[Int]

// JDBC configurations
val jdbcUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

// For jdbc specifics
val jdbcUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
Expand All @@ -50,4 +141,8 @@ object StorageConfig {
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

// File storage configurations
val fileStorageDirectoryUri: URI =
corePath.resolve("amber").resolve("user-resources").resolve("workflow-results").toUri
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] {

/**
* return a writer that buffers the items and performs the flush operation at close time
* @param writerIdentifier the id of the writer, maybe required by some implementations
* @return a buffered item writer
*/
def writer(): BufferedItemWriter[T] =
def writer(writerIdentifier: String): BufferedItemWriter[T] =
throw new NotImplementedError("write method is not implemented")

/**
Expand Down
Loading
Loading