Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IcebergDocument as one implementation of VirtualDocument #3147

Open
wants to merge 59 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
1fe9f17
add itemized file document and partition document
bobbai00 Dec 10, 2024
219b82d
add unit test for PartitionDocument
bobbai00 Dec 10, 2024
e446e9c
add more to unit tests
bobbai00 Dec 10, 2024
9627b25
make PartitionDocument return T
bobbai00 Dec 11, 2024
b85fd45
fix partition document test
bobbai00 Dec 11, 2024
8e6fec3
refining the documents
bobbai00 Dec 12, 2024
288aea4
add type R to PartitionedItemizedFileDocument
bobbai00 Dec 12, 2024
c3a1d00
do a rename
bobbai00 Dec 12, 2024
97c601e
adding the arrow file document, TODO: fix the test
bobbai00 Dec 12, 2024
e2c5515
pass the compilation
bobbai00 Dec 12, 2024
c17a54e
finish arrow document
bobbai00 Dec 14, 2024
bc38cc4
start to add some iceberg related
bobbai00 Dec 16, 2024
51dd7cf
finish initial iceberg writer
bobbai00 Dec 17, 2024
481c437
finish initial version of iceberg
bobbai00 Dec 19, 2024
0274f66
refactor test parts
bobbai00 Dec 19, 2024
4663fef
finish 1st viable version
bobbai00 Dec 19, 2024
9607f98
fix the append read
bobbai00 Dec 19, 2024
d2d0ed7
finish append read
bobbai00 Dec 20, 2024
f4ea0e3
finish concurrent write test
bobbai00 Dec 20, 2024
e46311c
resolve the binary type issue
bobbai00 Dec 20, 2024
5467b6f
refactor the util and config
bobbai00 Dec 20, 2024
d92a5ad
add a simple implementation for getRange and getAfter
bobbai00 Dec 21, 2024
15508b9
try to add iceberg as new type of result storage
bobbai00 Dec 21, 2024
c234dfd
closing to fix the dependency
bobbai00 Dec 21, 2024
4213a5b
fix the websocket connection
bobbai00 Dec 21, 2024
11ce821
add create override
bobbai00 Dec 22, 2024
129453e
add more comments and adjust the dependency
bobbai00 Dec 22, 2024
d553e32
add worker id when creating the writer
bobbai00 Dec 30, 2024
78df063
drop the write lock for iceberg table writer
bobbai00 Dec 30, 2024
92e2caf
clean up the build sbt
bobbai00 Dec 30, 2024
bb6961a
fix py result storage issue
bobbai00 Dec 30, 2024
1be10bf
clean up the iceberg document
bobbai00 Dec 30, 2024
7adfda4
clean up the iceberg writer
bobbai00 Dec 30, 2024
4617564
add more comments on the iceberg util
bobbai00 Dec 30, 2024
13731cb
add more comments
bobbai00 Dec 30, 2024
2baa661
refactor local file IO
bobbai00 Dec 30, 2024
8639579
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Dec 30, 2024
e105913
merge master
bobbai00 Dec 30, 2024
4cf144b
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Dec 30, 2024
9b69f59
cleanup the config
bobbai00 Dec 30, 2024
60445e6
Merge remote-tracking branch 'origin/jiadong-add-file-result-storage'…
bobbai00 Dec 30, 2024
9a482b1
cleanup the clear logic
bobbai00 Dec 30, 2024
decab8d
fmt
bobbai00 Dec 30, 2024
9cb2674
refactor the test to use the test db
bobbai00 Dec 30, 2024
51d8a1e
make the test harder
bobbai00 Dec 30, 2024
39b0448
make the test more clean
bobbai00 Dec 30, 2024
2655dae
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Jan 1, 2025
73106dd
incorporate worker idx to sink
bobbai00 Jan 1, 2025
a2e53b5
add format version and row lineage to the iceberg table
bobbai00 Jan 1, 2025
cffafe0
Merge branch 'master' into jiadong-add-file-result-storage
bobbai00 Jan 1, 2025
f54e38c
Revert "add format version and row lineage to the iceberg table"
bobbai00 Jan 1, 2025
7176864
fix iceberg util spec
bobbai00 Jan 1, 2025
76dd31c
try to add the record id
bobbai00 Jan 2, 2025
31070be
try debugging the test
bobbai00 Jan 2, 2025
1156db4
half way to have a consistent order
bobbai00 Jan 2, 2025
c712c1d
fix the get range
bobbai00 Jan 3, 2025
a16ff80
fix the get's refresh
bobbai00 Jan 3, 2025
d2e710f
add getAfter test
bobbai00 Jan 3, 2025
a8bb3db
remove redundant dependency
bobbai00 Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
// rely on the server-side result cleanup logic.
case OpResultStorage.MONGODB =>
MongoDatabaseManager.dropCollection(collectionName)
case OpResultStorage.ICEBERG =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments to explain why it's blank.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merged it with the MEMORY. Its the same reason as the MEMORY case.

}
})
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ class WorkflowCompiler(
if (!storage.contains(storageKey)) {
// get the schema for result storage in certain mode
val sinkStorageSchema: Option[Schema] =
if (storageType == OpResultStorage.MONGODB) {
if (
storageType == OpResultStorage.MONGODB || storageType == OpResultStorage.ICEBERG
) {
// use the output schema on the first output port as the schema for storage
Some(schema.right.get)
} else {
Expand Down
29 changes: 27 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,43 @@ lazy val WorkflowCore = (project in file("workflow-core"))
.dependsOn(DAO)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency
lazy val WorkflowOperator = (project in file("workflow-operator")).dependsOn(WorkflowCore)
lazy val WorkflowOperator = (project in file("workflow-operator"))
.dependsOn(WorkflowCore)
.settings(
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-compress" % "1.23.0", // because of the dependency introduced by iceberg
)
)
lazy val WorkflowCompilingService = (project in file("workflow-compiling-service"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1"
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.16.1",
"org.glassfish.jersey.core" % "jersey-common" % "3.0.12"
)
)

lazy val WorkflowExecutionService = (project in file("amber"))
.dependsOn(WorkflowOperator)
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
"com.fasterxml.jackson.core" % "jackson-core" % "2.15.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.15.1",
"org.slf4j" % "slf4j-api" % "1.7.26",
"org.glassfish.jersey.media" % "jersey-media-multipart" % "2.25.1",
"org.glassfish.jersey.core" % "jersey-common" % "2.25.1",
"org.glassfish.jersey.core" % "jersey-server" % "2.25.1",
"javax.xml.bind" % "jaxb-api" % "2.3.1",
"com.sun.xml.bind" % "jaxb-impl" % "2.3.1",
"org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813",
)
)
.configs(Test)
.dependsOn(DAO % "test->test") // test scope dependency

Expand Down
21 changes: 20 additions & 1 deletion core/workflow-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,25 @@ val arrowDependencies = Seq(

libraryDependencies ++= arrowDependencies

val excludeHadoopJersey = ExclusionRule(organization = "com.sun.jersey")
val excludeHadoopSlf4j = ExclusionRule(organization = "org.slf4j")
val excludeHadoopJetty = ExclusionRule(organization = "org.eclipse.jetty")
val excludeHadoopJsp = ExclusionRule(organization = "javax.servlet.jsp")
libraryDependencies ++= Seq(
"org.apache.iceberg" % "iceberg-api" % "1.7.1",
"org.apache.iceberg" % "iceberg-core" % "1.7.1",
"org.apache.iceberg" % "iceberg-parquet" % "1.7.1",
"org.apache.iceberg" % "iceberg-data" % "1.7.1",
"org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
ExclusionRule("javax.xml.bind"),
ExclusionRule("org.glassfish.jersey"),
excludeHadoopJersey,
excludeHadoopSlf4j,
excludeHadoopJetty,
excludeHadoopJsp,
)
)

/////////////////////////////////////////////////////////////////////////////
// Additional Dependencies
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -123,5 +142,5 @@ libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", // Scala Logging
"org.eclipse.jgit" % "org.eclipse.jgit" % "5.13.0.202109080827-r", // jgit
"org.yaml" % "snakeyaml" % "1.30", // yaml reader (downgrade to 1.30 due to dropwizard 1.3.23 required by amber)
"org.apache.commons" % "commons-vfs2" % "2.9.0" // for FileResolver throw VFS-related exceptions
"org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions // for Kyro serde/deserde
)
25 changes: 21 additions & 4 deletions core/workflow-core/src/main/resources/storage-config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
storage:
result-storage-mode: memory
result-storage-mode: iceberg
mongodb:
url: "mongodb://localhost:27017"
database: "texera_storage"
commit-batch-size: 1000
iceberg:
catalog:
jdbc: # currently we only support storing catalog info via jdbc, i.e. https://iceberg.apache.org/docs/1.7.1/jdbc/
url: "jdbc:mysql://0.0.0.0:3306/texera_iceberg?serverTimezone=UTC"
username: "root"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure to clean up those username and passwords.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaned

password: "123456"
table:
namespace: "operator-result"
commit:
batch-size: 4096 # decide the buffer size of our IcebergTableWriter
retry:
# retry configures the OCC parameter for concurrent write operations in Iceberg
# Docs about Reliability in Iceberg: https://iceberg.apache.org/docs/1.7.1/reliability/
# Docs about full parameter list and their meaning: https://iceberg.apache.org/docs/1.7.1/configuration/#write-properties
num-retries: 10
min-wait-ms: 100 # 0.1s
max-wait-ms: 10000 # 10s
jdbc:
url: "jdbc:mysql://localhost:3306/texera_db?serverTimezone=UTC"
username: ""
password: ""
url: "jdbc:mysql://0.0.0.0:3306/texera_db?serverTimezone=UTC"
username: "root"
password: "123456"
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package edu.uci.ics.amber.core.storage

import edu.uci.ics.amber.util.PathUtils.corePath
import org.yaml.snakeyaml.Yaml

import java.net.URI
import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._

Expand All @@ -13,34 +15,123 @@ object StorageConfig {

val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap
val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCatalogMap = icebergMap("catalog").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCatalogJdbcMap =
icebergCatalogMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap
val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap
val jdbcMap = storageMap("jdbc").asInstanceOf[JMap[String, Any]].asScala.toMap
javaConf
.updated("storage", storageMap.updated("mongodb", mongodbMap).updated("jdbc", jdbcMap))

javaConf.updated(
"storage",
storageMap
.updated("mongodb", mongodbMap)
.updated(
"iceberg",
icebergMap
.updated(
"catalog",
icebergCatalogMap.updated("jdbc", icebergCatalogJdbcMap)
)
.updated(
"table",
icebergTableMap.updated(
"commit",
icebergCommitMap.updated("retry", icebergRetryMap)
)
)
)
.updated("jdbc", jdbcMap)
)
}

// Result storage mode
val resultStorageMode: String =
conf("storage").asInstanceOf[Map[String, Any]]("result-storage-mode").asInstanceOf[String]

// For MongoDB specifics
// MongoDB configurations
val mongodbUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val mongodbDatabaseName: String = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("database")
.asInstanceOf[String]

val mongodbBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("mongodb")
.asInstanceOf[Map[String, Any]]("commit-batch-size")
.asInstanceOf[Int]

// Iceberg catalog configurations
val icebergCatalogUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

val icebergCatalogUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
.asInstanceOf[String]

val icebergCatalogPassword: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("catalog")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

val icebergTableNamespace: String = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("namespace")
.asInstanceOf[String]

val icebergTableCommitBatchSize: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("batch-size")
.asInstanceOf[Int]

val icebergTableCommitNumRetries: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("num-retries")
.asInstanceOf[Int]

val icebergTableCommitMinRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("min-wait-ms")
.asInstanceOf[Int]

val icebergTableCommitMaxRetryWaitMs: Int = conf("storage")
.asInstanceOf[Map[String, Any]]("iceberg")
.asInstanceOf[Map[String, Any]]("table")
.asInstanceOf[Map[String, Any]]("commit")
.asInstanceOf[Map[String, Any]]("retry")
.asInstanceOf[Map[String, Any]]("max-wait-ms")
.asInstanceOf[Int]

// JDBC configurations
val jdbcUrl: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("url")
.asInstanceOf[String]

// For jdbc specifics
val jdbcUsername: String = conf("storage")
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("username")
Expand All @@ -50,4 +141,8 @@ object StorageConfig {
.asInstanceOf[Map[String, Any]]("jdbc")
.asInstanceOf[Map[String, Any]]("password")
.asInstanceOf[String]

// File storage configurations
val fileStorageDirectoryUri: URI =
corePath.resolve("amber").resolve("user-resources").resolve("workflow-results").toUri
}
Loading
Loading