diff --git a/core/dao/build.sbt b/core/dao/build.sbt index 23ec6c6dcb3..526c37be92e 100644 --- a/core/dao/build.sbt +++ b/core/dao/build.sbt @@ -88,4 +88,5 @@ libraryDependencies ++= Seq( libraryDependencies ++= Seq( "mysql" % "mysql-connector-java" % "8.0.33", // MySQL connector + "org.yaml" % "snakeyaml" % "1.30", // for reading storage config yaml file ) \ No newline at end of file diff --git a/core/dao/src/main/resources/jooq-conf.xml b/core/dao/src/main/resources/jooq-conf.xml new file mode 100644 index 00000000000..2935bce5073 --- /dev/null +++ b/core/dao/src/main/resources/jooq-conf.xml @@ -0,0 +1,46 @@ + + + + + false + + true + true + + + org.jooq.codegen.JavaGenerator + + + + org.jooq.meta.mysql.MySQLDatabase + + + texera_db + + + .* + + + (test_.*)|(ignore_.*) + + + + + + edu.uci.ics.texera.dao.jooq.generated + + + dao/src/main/scala + + + diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/JooqCodeGenerator.scala b/core/dao/src/main/scala/edu/uci/ics/texera/dao/JooqCodeGenerator.scala new file mode 100644 index 00000000000..f0297781434 --- /dev/null +++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/JooqCodeGenerator.scala @@ -0,0 +1,54 @@ +package edu.uci.ics.texera.dao + +import org.jooq.codegen.GenerationTool +import org.jooq.meta.jaxb.{Configuration, Jdbc} +import org.yaml.snakeyaml.Yaml + +import java.io.InputStream +import java.nio.file.{Files, Path} +import java.util.{Map => JMap} +import scala.jdk.CollectionConverters._ + +object JooqCodeGenerator { + @throws[Exception] + def main(args: Array[String]): Unit = { + // Load jOOQ configuration XML + val jooqXmlPath: Path = + Path.of("dao").resolve("src").resolve("main").resolve("resources").resolve("jooq-conf.xml") + val jooqConfig: Configuration = GenerationTool.load(Files.newInputStream(jooqXmlPath)) + + // Load YAML configuration + val yamlConfPath: Path = Path + .of("workflow-core") + .resolve("src") + .resolve("main") + .resolve("resources") + .resolve("storage-config.yaml") + val yaml = new Yaml + val inputStream: InputStream = Files.newInputStream(yamlConfPath) + + val conf: Map[String, Any] = + yaml.load(inputStream).asInstanceOf[JMap[String, Any]].asScala.toMap + + val jdbcConfig = conf("storage") + .asInstanceOf[JMap[String, Any]] + .asScala("jdbc") + .asInstanceOf[JMap[String, Any]] + .asScala + + // Set JDBC configuration for jOOQ + val jooqJdbcConfig = new Jdbc + jooqJdbcConfig.setDriver("com.mysql.cj.jdbc.Driver") + jooqJdbcConfig.setUrl(jdbcConfig("url").toString) + jooqJdbcConfig.setUsername(jdbcConfig("username").toString) + jooqJdbcConfig.setPassword(jdbcConfig("password").toString) + + jooqConfig.setJdbc(jooqJdbcConfig) + + // Generate the code + GenerationTool.generate(jooqConfig) + + // Close input stream + inputStream.close() + } +} diff --git a/core/gui/src/assets/operator_images/BulkDownloader.png b/core/gui/src/assets/operator_images/BulkDownloader.png deleted file mode 100644 index 0b363c6ac81..00000000000 Binary files a/core/gui/src/assets/operator_images/BulkDownloader.png and /dev/null differ diff --git a/core/util/build.sbt b/core/util/build.sbt deleted file mode 100644 index 58d56c4f89f..00000000000 --- a/core/util/build.sbt +++ /dev/null @@ -1,18 +0,0 @@ -name := "util" -organization := "edu.uci.ics" -version := "0.1-SNAPSHOT" - -scalaVersion := "2.13.12" - -lazy val util = project - .in(file(".")) - .settings( - // https://mvnrepository.com/artifact/mysql/mysql-connector-java - libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.23", - // https://mvnrepository.com/artifact/com.typesafe/config - libraryDependencies += "com.typesafe" % "config" % "1.4.1", - // https://mvnrepository.com/artifact/org.jooq/jooq - libraryDependencies += "org.jooq" % "jooq" % "3.14.4", - // https://mvnrepository.com/artifact/org.jooq/jooq-codegen - libraryDependencies += "org.jooq" % "jooq-codegen" % "3.12.4" - ) diff --git a/core/util/conf/jooq-conf.xml b/core/util/conf/jooq-conf.xml deleted file mode 100644 index b30e2abcce3..00000000000 --- a/core/util/conf/jooq-conf.xml +++ /dev/null @@ -1,89 +0,0 @@ - - - - false - - true - true - - - org.jooq.codegen.JavaGenerator - - - - org.jooq.meta.mysql.MySQLDatabase - - - texera_db - - - .* - - - (test_.*)|(ignore_.*) - - - - - - edu.uci.ics.texera.web.model.jooq.generated - - - core/amber/src/main/scala - - - - - - false - - true - true - - - org.jooq.codegen.JavaGenerator - - - - org.jooq.meta.mysql.MySQLDatabase - - - texera_db - - - .* - - - (test_.*)|(ignore_.*) - - - - - - edu.uci.ics.texera.dao.jooq.generated - - - core/dao/src/main/scala - - - diff --git a/core/util/project/build.properties b/core/util/project/build.properties deleted file mode 100644 index bb5389da211..00000000000 --- a/core/util/project/build.properties +++ /dev/null @@ -1 +0,0 @@ -sbt.version=1.5.5 \ No newline at end of file diff --git a/core/util/src/main/java/edu/uci/ics/util/RunCodegen.java b/core/util/src/main/java/edu/uci/ics/util/RunCodegen.java deleted file mode 100644 index 3db167a5ba9..00000000000 --- a/core/util/src/main/java/edu/uci/ics/util/RunCodegen.java +++ /dev/null @@ -1,47 +0,0 @@ -package edu.uci.ics.util; - - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.jooq.codegen.GenerationTool; -import org.jooq.meta.jaxb.Configuration; -import org.jooq.meta.jaxb.Jdbc; - -import java.nio.file.Files; -import java.nio.file.Path; - -/** - * This class is used to generate java classes representing the sql table in Texera database - * These auto generated classes are essential for the connection between backend and database when using JOOQ library. - *

- * Every time the table in the Texera database changes, including creating, dropping and modifying the tables, - * this class must be run to update the corresponding java classes. - *

- * Remember to change the username and password to your owns before you run this class. - *

- * The username, password and connection url is located in texera\core\conf\jdbc.conf - * The configuration file is located in texera\core\conf\jooq-conf.xml - */ -public class RunCodegen { - - public static void main(String[] args) throws Exception { - Path jooqXmlPath = Path.of("core").resolve("util").resolve("conf").resolve("jooq-conf.xml"); - Configuration jooqConfig = GenerationTool.load(Files.newInputStream(jooqXmlPath)); - - Path jdbcConfPath = Path.of("core").resolve("amber").resolve("src").resolve("main").resolve("resources").resolve("application.conf"); - Config jdbcConfig = ConfigFactory.parseFile(jdbcConfPath.toFile()); - - Jdbc jooqJdbcConfig = new Jdbc(); - jooqJdbcConfig.setDriver("com.mysql.cj.jdbc.Driver"); - jooqJdbcConfig.setUrl(jdbcConfig.getString("jdbc.url")); - jooqJdbcConfig.setUsername(jdbcConfig.getString("jdbc.username")); - jooqJdbcConfig.setPassword(jdbcConfig.getString("jdbc.password")); - jooqConfig.setJdbc(jooqJdbcConfig); - - GenerationTool.generate(jooqConfig); - } - -} - - - diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index 1c6339efcfc..ac88f098cd3 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -11,7 +11,6 @@ import edu.uci.ics.amber.operator.cartesianProduct.CartesianProductOpDesc import edu.uci.ics.amber.operator.dictionary.DictionaryMatcherOpDesc import edu.uci.ics.amber.operator.difference.DifferenceOpDesc import edu.uci.ics.amber.operator.distinct.DistinctOpDesc -import edu.uci.ics.amber.operator.download.BulkDownloaderOpDesc import edu.uci.ics.amber.operator.dummy.DummyOpDesc import edu.uci.ics.amber.operator.filter.SpecializedFilterOpDesc import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc @@ -202,7 +201,6 @@ trait StateTransferFunc new Type(value = classOf[RedditSearchSourceOpDesc], name = "RedditSearch"), new Type(value = classOf[PythonLambdaFunctionOpDesc], name = "PythonLambdaFunction"), new Type(value = classOf[PythonTableReducerOpDesc], name = "PythonTableReducer"), - new Type(value = classOf[BulkDownloaderOpDesc], name = "BulkDownloader"), new Type(value = classOf[URLFetcherOpDesc], name = "URLFetcher"), new Type(value = classOf[CartesianProductOpDesc], name = "CartesianProduct"), new Type(value = classOf[FilledAreaPlotOpDesc], name = "FilledAreaPlot"), diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala deleted file mode 100644 index 212f815feaf..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala +++ /dev/null @@ -1,87 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} -import com.google.common.base.Preconditions -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle -import edu.uci.ics.amber.core.executor.OpExecInitInfo -import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema} -import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} -import edu.uci.ics.amber.operator.LogicalOp -import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} -import edu.uci.ics.amber.operator.metadata.annotations.AutofillAttributeName -import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import edu.uci.ics.amber.workflow.{InputPort, OutputPort} - -class BulkDownloaderOpDesc extends LogicalOp { - - @JsonProperty(required = true) - @JsonSchemaTitle("URL Attribute") - @JsonPropertyDescription( - "Only accepts standard URL format" - ) - @AutofillAttributeName - var urlAttribute: String = _ - - @JsonProperty(required = true) - @JsonSchemaTitle("Result Attribute") - @JsonPropertyDescription( - "Attribute name for results(downloaded file paths)" - ) - var resultAttribute: String = _ - - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = { - PhysicalOp - .oneToOnePhysicalOp( - workflowId, - executionId, - operatorIdentifier, - OpExecInitInfo((_, _) => - new BulkDownloaderOpExec( - getContext, - urlAttribute - ) - ) - ) - .withInputPorts(operatorInfo.inputPorts) - .withOutputPorts(operatorInfo.outputPorts) - .withPropagateSchema( - SchemaPropagationFunc(inputSchemas => - Map( - operatorInfo.outputPorts.head.id -> getOutputSchema( - operatorInfo.inputPorts.map(_.id).map(inputSchemas(_)).toArray - ) - ) - ) - ) - } - - override def operatorInfo: OperatorInfo = - OperatorInfo( - userFriendlyName = "Bulk Downloader", - operatorDescription = "Download urls in a string column", - operatorGroupName = OperatorGroupConstants.UTILITY_GROUP, - inputPorts = List(InputPort()), - outputPorts = List(OutputPort()) - ) - - override def getOutputSchema(schemas: Array[Schema]): Schema = { - Preconditions.checkArgument(schemas.length == 1) - val inputSchema = schemas(0) - val outputSchemaBuilder = Schema.builder() - // keep the same schema from input - outputSchemaBuilder.add(inputSchema) - if (resultAttribute == null || resultAttribute.isEmpty) { - resultAttribute = urlAttribute + " result" - } - outputSchemaBuilder.add( - new Attribute( - resultAttribute, - AttributeType.STRING - ) - ) - outputSchemaBuilder.build() - } -} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala deleted file mode 100644 index b69405d0e82..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala +++ /dev/null @@ -1,80 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import edu.uci.ics.amber.core.executor.OperatorExecutor -import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} -import edu.uci.ics.amber.core.workflow.WorkflowContext -import edu.uci.ics.amber.operator.source.fetcher.URLFetchUtil.getInputStreamFromURL - -import java.net.URL -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} - -class BulkDownloaderOpExec( - workflowContext: WorkflowContext, - urlAttribute: String -) extends OperatorExecutor { - - private val downloading = new mutable.Queue[Future[TupleLike]]() - - private class DownloadResultIterator(blocking: Boolean) extends Iterator[TupleLike] { - override def hasNext: Boolean = { - if (downloading.isEmpty) { - return false - } - if (blocking) { - Await.result(downloading.head, 5.seconds) - } - downloading.head.isCompleted - } - - override def next(): TupleLike = { - downloading.dequeue().value.get.get - } - } - - override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { - - downloading.enqueue(Future { - downloadTuple(tuple) - }) - new DownloadResultIterator(false) - } - - override def onFinish(port: Int): Iterator[TupleLike] = { - new DownloadResultIterator(true) - } - - private def downloadTuple(tuple: Tuple): TupleLike = { - TupleLike(tuple.getFields ++ Seq(downloadUrl(tuple.getField(urlAttribute)))) - } - - private def downloadUrl(url: String): String = { - try { - Await.result( - Future { - val urlObj = new URL(url) - val input = getInputStreamFromURL(urlObj) - input match { - case Some(contentStream) => - if (contentStream.available() > 0) { - val filename = - s"w${workflowContext.workflowId.id}-e${workflowContext.executionId.id}-${urlObj.getHost - .replace(".", "")}.download" - filename - } else { - throw new RuntimeException(s"content is not available for $url") - } - case None => - throw new RuntimeException(s"fetch content failed for $url") - } - }, - 5.seconds - ) - } catch { - case throwable: Throwable => s"Failed: ${throwable.getMessage}" - } - } - -} diff --git a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala b/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala deleted file mode 100644 index 6a9a1f2239b..00000000000 --- a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} -import edu.uci.ics.amber.core.workflow.WorkflowContext -import edu.uci.ics.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} -import org.scalatest.BeforeAndAfter -import org.scalatest.flatspec.AnyFlatSpec -class BulkDownloaderOpExecSpec extends AnyFlatSpec with BeforeAndAfter { - val tupleSchema: Schema = Schema - .builder() - .add(new Attribute("url", AttributeType.STRING)) - .build() - - val resultSchema: Schema = Schema - .builder() - .add(new Attribute("url", AttributeType.STRING)) - .add(new Attribute("url result", AttributeType.STRING)) - .build() - - val tuple: () => Tuple = () => - Tuple - .builder(tupleSchema) - .add(new Attribute("url", AttributeType.STRING), "http://www.google.com") - .build() - - val tuple2: () => Tuple = () => - Tuple - .builder(tupleSchema) - .add(new Attribute("url", AttributeType.STRING), "https://www.google.com") - .build() - - var opExec: BulkDownloaderOpExec = _ - before { - opExec = new BulkDownloaderOpExec( - new WorkflowContext(DEFAULT_WORKFLOW_ID, DEFAULT_EXECUTION_ID), - urlAttribute = "url" - ) - } - - it should "open" in { - opExec.open() - } - -}