diff --git a/core/gui/src/assets/operator_images/BulkDownloader.png b/core/gui/src/assets/operator_images/BulkDownloader.png deleted file mode 100644 index 0b363c6ac81..00000000000 Binary files a/core/gui/src/assets/operator_images/BulkDownloader.png and /dev/null differ diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index 1c6339efcfc..ac88f098cd3 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -11,7 +11,6 @@ import edu.uci.ics.amber.operator.cartesianProduct.CartesianProductOpDesc import edu.uci.ics.amber.operator.dictionary.DictionaryMatcherOpDesc import edu.uci.ics.amber.operator.difference.DifferenceOpDesc import edu.uci.ics.amber.operator.distinct.DistinctOpDesc -import edu.uci.ics.amber.operator.download.BulkDownloaderOpDesc import edu.uci.ics.amber.operator.dummy.DummyOpDesc import edu.uci.ics.amber.operator.filter.SpecializedFilterOpDesc import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc @@ -202,7 +201,6 @@ trait StateTransferFunc new Type(value = classOf[RedditSearchSourceOpDesc], name = "RedditSearch"), new Type(value = classOf[PythonLambdaFunctionOpDesc], name = "PythonLambdaFunction"), new Type(value = classOf[PythonTableReducerOpDesc], name = "PythonTableReducer"), - new Type(value = classOf[BulkDownloaderOpDesc], name = "BulkDownloader"), new Type(value = classOf[URLFetcherOpDesc], name = "URLFetcher"), new Type(value = classOf[CartesianProductOpDesc], name = "CartesianProduct"), new Type(value = classOf[FilledAreaPlotOpDesc], name = "FilledAreaPlot"), diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala deleted file mode 100644 index 212f815feaf..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala +++ /dev/null @@ -1,87 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} -import com.google.common.base.Preconditions -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle -import edu.uci.ics.amber.core.executor.OpExecInitInfo -import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema} -import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} -import edu.uci.ics.amber.operator.LogicalOp -import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} -import edu.uci.ics.amber.operator.metadata.annotations.AutofillAttributeName -import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import edu.uci.ics.amber.workflow.{InputPort, OutputPort} - -class BulkDownloaderOpDesc extends LogicalOp { - - @JsonProperty(required = true) - @JsonSchemaTitle("URL Attribute") - @JsonPropertyDescription( - "Only accepts standard URL format" - ) - @AutofillAttributeName - var urlAttribute: String = _ - - @JsonProperty(required = true) - @JsonSchemaTitle("Result Attribute") - @JsonPropertyDescription( - "Attribute name for results(downloaded file paths)" - ) - var resultAttribute: String = _ - - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = { - PhysicalOp - .oneToOnePhysicalOp( - workflowId, - executionId, - operatorIdentifier, - OpExecInitInfo((_, _) => - new BulkDownloaderOpExec( - getContext, - urlAttribute - ) - ) - ) - .withInputPorts(operatorInfo.inputPorts) - .withOutputPorts(operatorInfo.outputPorts) - .withPropagateSchema( - SchemaPropagationFunc(inputSchemas => - Map( - operatorInfo.outputPorts.head.id -> getOutputSchema( - operatorInfo.inputPorts.map(_.id).map(inputSchemas(_)).toArray - ) - ) - ) - ) - } - - override def operatorInfo: OperatorInfo = - OperatorInfo( - userFriendlyName = "Bulk Downloader", - operatorDescription = "Download urls in a string column", - operatorGroupName = OperatorGroupConstants.UTILITY_GROUP, - inputPorts = List(InputPort()), - outputPorts = List(OutputPort()) - ) - - override def getOutputSchema(schemas: Array[Schema]): Schema = { - Preconditions.checkArgument(schemas.length == 1) - val inputSchema = schemas(0) - val outputSchemaBuilder = Schema.builder() - // keep the same schema from input - outputSchemaBuilder.add(inputSchema) - if (resultAttribute == null || resultAttribute.isEmpty) { - resultAttribute = urlAttribute + " result" - } - outputSchemaBuilder.add( - new Attribute( - resultAttribute, - AttributeType.STRING - ) - ) - outputSchemaBuilder.build() - } -} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala deleted file mode 100644 index b69405d0e82..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala +++ /dev/null @@ -1,80 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import edu.uci.ics.amber.core.executor.OperatorExecutor -import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} -import edu.uci.ics.amber.core.workflow.WorkflowContext -import edu.uci.ics.amber.operator.source.fetcher.URLFetchUtil.getInputStreamFromURL - -import java.net.URL -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} - -class BulkDownloaderOpExec( - workflowContext: WorkflowContext, - urlAttribute: String -) extends OperatorExecutor { - - private val downloading = new mutable.Queue[Future[TupleLike]]() - - private class DownloadResultIterator(blocking: Boolean) extends Iterator[TupleLike] { - override def hasNext: Boolean = { - if (downloading.isEmpty) { - return false - } - if (blocking) { - Await.result(downloading.head, 5.seconds) - } - downloading.head.isCompleted - } - - override def next(): TupleLike = { - downloading.dequeue().value.get.get - } - } - - override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { - - downloading.enqueue(Future { - downloadTuple(tuple) - }) - new DownloadResultIterator(false) - } - - override def onFinish(port: Int): Iterator[TupleLike] = { - new DownloadResultIterator(true) - } - - private def downloadTuple(tuple: Tuple): TupleLike = { - TupleLike(tuple.getFields ++ Seq(downloadUrl(tuple.getField(urlAttribute)))) - } - - private def downloadUrl(url: String): String = { - try { - Await.result( - Future { - val urlObj = new URL(url) - val input = getInputStreamFromURL(urlObj) - input match { - case Some(contentStream) => - if (contentStream.available() > 0) { - val filename = - s"w${workflowContext.workflowId.id}-e${workflowContext.executionId.id}-${urlObj.getHost - .replace(".", "")}.download" - filename - } else { - throw new RuntimeException(s"content is not available for $url") - } - case None => - throw new RuntimeException(s"fetch content failed for $url") - } - }, - 5.seconds - ) - } catch { - case throwable: Throwable => s"Failed: ${throwable.getMessage}" - } - } - -} diff --git a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala b/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala deleted file mode 100644 index 6a9a1f2239b..00000000000 --- a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} -import edu.uci.ics.amber.core.workflow.WorkflowContext -import edu.uci.ics.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} -import org.scalatest.BeforeAndAfter -import org.scalatest.flatspec.AnyFlatSpec -class BulkDownloaderOpExecSpec extends AnyFlatSpec with BeforeAndAfter { - val tupleSchema: Schema = Schema - .builder() - .add(new Attribute("url", AttributeType.STRING)) - .build() - - val resultSchema: Schema = Schema - .builder() - .add(new Attribute("url", AttributeType.STRING)) - .add(new Attribute("url result", AttributeType.STRING)) - .build() - - val tuple: () => Tuple = () => - Tuple - .builder(tupleSchema) - .add(new Attribute("url", AttributeType.STRING), "http://www.google.com") - .build() - - val tuple2: () => Tuple = () => - Tuple - .builder(tupleSchema) - .add(new Attribute("url", AttributeType.STRING), "https://www.google.com") - .build() - - var opExec: BulkDownloaderOpExec = _ - before { - opExec = new BulkDownloaderOpExec( - new WorkflowContext(DEFAULT_WORKFLOW_ID, DEFAULT_EXECUTION_ID), - urlAttribute = "url" - ) - } - - it should "open" in { - opExec.open() - } - -}