From edaf799ca05be56d5550cb5340aa31076e1c2664 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 16 Dec 2024 12:49:19 -0800 Subject: [PATCH] Remove BulkDownloader operator (#3161) This operator's design requires some more discussion. The current issues are: 1. it is accessing the workflow context (i.e., workflowId and executionId), which may not be permitted in the future. 2. It is directly interacting with the local file system. For the short term, we will drop the support for such as downloader. We can discuss how to support it in the future when needed. --- .../assets/operator_images/BulkDownloader.png | Bin 7262 -> 0 bytes .../uci/ics/amber/operator/LogicalOp.scala | 2 - .../download/BulkDownloaderOpDesc.scala | 87 ------------------ .../download/BulkDownloaderOpExec.scala | 80 ---------------- .../download/BulkDownloaderOpExecSpec.scala | 44 --------- 5 files changed, 213 deletions(-) delete mode 100644 core/gui/src/assets/operator_images/BulkDownloader.png delete mode 100644 core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala delete mode 100644 core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala delete mode 100644 core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala diff --git a/core/gui/src/assets/operator_images/BulkDownloader.png b/core/gui/src/assets/operator_images/BulkDownloader.png deleted file mode 100644 index 0b363c6ac813aa2f312fc496a87b1d2364c3c817..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 7262 zcmd6MdmvQX`~DiEsFdy!xwVm$OUfme(J5)WpKeNFx}Xp>1~HgjKIQl(YLEN4%sJ(9 zj$BHVA*NKC3T2co24$ilL*v#kX1-gUI-gGG{Qmm=@%>|FmbKpftoK>Z`@U=KCEm@& ze(vn$vjG5e;cZ*C10aJnWq|xl>}TX;>jd^Q>-e_4App#)pZ?$)f(s90oyugpJ>(t1 zzU1%&Bp(nS9&T_nC?Mp}f#W^~!6ZMrz-l=FD*f=5O*>C!4REQ+$9$qYKPuwin9cra zvEHdYKWkK-{_Wt(t=b16n3H+^`OeMzcmMqKvc68yQU5EB^tTjOsl)o|mJPs&K)$ zs$WmUptav%xi?}Lz6~wVv9M$(Lb|#UThMxU$z(PhF_kk*de5S5lh!yPHS2NBlG z0P(sBZGW1?OfuSE1m~O7M(&rIEfmmp`Pnr$ISr{ z+wihDzb-E<=V#faFQ+0i1EqHKg7ATA@-NkUP{Tr1Nt|x&y7Ae4u&%0Pn778l^TV}Ke~QXVgCD+ zzl|`0gJflw%>P0q^!O6+ME}3IeIq&Duh5U;<9@L3i{umBJKa$H&C-yQ!fQ)nT7o$cSy|36Cmo<0e_X)qmPKXCh= z{+}`-8~;g^Z+?A8f1+or?RSy)O`Xu=f1^Lr_zzL@Rg`ZkeQSJ`$#%v58DPZWA5!C6 zvS21N<)=;lHO@Y(QyBXQocgNUM32q)Df91+{~2fU@>raG3GS}dfZ_hP?E2>T7nAAu z|F0t9RRH&MQJe3R=tnZ6U{2gg5`Hso(n{N4I|Ms58yAEW;@ zN57K&j@y@f`+H1+xf>ZHV10KPq&r=|HDQ5+Z~!vXdAaYn0irVt_`drO)a)!wfc$hU zVrYbLSJY5jbP~;mZL)ZR2nG2wsu{clNH>3yX&(4)JlI+3UK>=kkzJz2!8sk!V-p;O zSE0!k?!vH^PzriO+ajW=T#*}K!X4gw(AsBf^QOj}3%u}_V;{Psju*Tg#3vduv8M+so#Ai3mFj zv_<_;L#o<;Anw!mzinL2wqKXPk*Gxcj5Z{zo!%%9?w{Di$K5~f&kQKbo<-b!Hj-EO zTmpM}M#}eTfMxHxAi}=LUyLmlv92ggJ)jmVU8X!b_5uaOWOe89@_Qava%O#!w8WMf z`I`&9`+tRVc0|gbbp=KbIgW}(XRu|{{-~8btB`*t^xVjDBr{RmCTu#D0Aa`pEqEt> zD>;1IzGM^JhGTzhxc6{(qKl`!B;eUH!Q%{U2Fz`uidxz|i$!jne2*Ph0(%O{1cyHc z9#EgL`5hVaO|`+Yys^fRr%kf0jQl+>w>8>Jism_y4)QgZnc`Tx#~j0@(pPI+GLYr1 zQ8O81E2R~Q2G*cn2fTQw6&+)~h_&k6ab8{Ojaj~q=SJq{NCuoq;e5@=k(;fQT;!E& zks?=Xg{YKyV+1JBF@J*7)EIsRmzywmb9bvVFVQ|ceyfU&U;1<0Finj()8A@QmjWyB zikgT}JECDSbiM^|6exrIY~vn@DE~g{1g7@3u9QMmYuUwlal{w0;8Il02r${%J?-UC z_y0Vz6t8`Ik&5oWxKR?$(7>@)rTJa;lE2xZ2OGqj;G&mLBrtvY`9ZHdNnH(OAwB-M@4}*)NGMq@zov4+wC__DC=t01YIxEmiHHOW5V=V?!$) z6@Lo$uo%dN%%BKwzGh)8AnsSv7M;T;XZLBJ#&jr6;YfIz z-fG`sXRF_NtC{ORZ=|By-9>%LPW6z=OcaNOkrv=jQQ)prjzGlx6c|RG4;6G@q zl=X{Ohz1>nEf95Y3(_2`9vyk&I#kiTavX7gn>6}A-hZ^-k zpakHxWPC5>gOhg>a$+l{|8Dh=jnXSm0=VM@I)Q~ukseaJvwAZ_v9ghffy!tx2r{f5 zMYkmjo+LfTPq#D|lttHBAPpM?eU{UZOBUf?bNs;zF>Wa$pYC-IM;w;hqfwV7Q1QfG zPKG_A?Q8=+_t|O@NgHOcx^IZ~9Fu@KPO?tISn{BB4P{6 zJSG5?CspUq0(63H>=xFMbE*KNb~YXW^?}p z+~g#D0qN>ktYQBO*V1@R5H(FeZ(jn3jSmf8eIYsWPAVCPCR0=oySTVqGhrubS6v-i z6TMgMV`q{R2}X4r52R}DaUpFINZZdAtEL9IT5Gcpp{hB80N-$QP^Zd)E?KBb9 ztJaK*n|dK(ur3t!=RmzK>()415RTcgHt@9=m7n37*oR96gOsT^H)Cs=U6|Pzxs7mV zt49JdN6x6>X20@PQT-n~)^&0u^kVPj8{X4)RBnG>BBk>fKjS%9-1Ca%3y*Tm5#^qW zF`bAL#6xnrRqNgnT+?Xn*}QoATqc$2d}HK7KwEIaM_EqE^CsCn^&W|0<#(fc(g!-hhDu(0mhK_EAJ40LK-ctV<8e z&K2wo=ryA@mYJPV9c^ilO%X^B~dgNErebqG84P!f8inbsdlBP}U{nG8F zMg27u^gxj6_sHYj4*Ld8Zpwr))s>j}I61i}KOk`3P zv&=hk1S(^-EwOI5-a$%`=n~dCUvQXv?!(N!qmFP);mG1c8}mlM&UY&o-~y8xbB|_u zL?Xu7%w(Q=YR*9?-4r#X?-!7d`Rx>UckB>C<(M*OL=zD z5uT@SeJ?!`dzF5+5SM^?6t=<(Lt1e)SzrzP-5zBr(Bz4W>4fM#t1+0dQXjYX^_FPO z8ZAVwT*Ohu*#>kdFJR$cijSgzVW)-cGNoo}gCi;OtR7d6EWmgUToZAuaJKzhY$<_~ z`8WV8;dsp+U69{nWlDX407PyP?p%G3 zB7Ht6GhX0^)v~}%fYr0b_Ew<8=rllz~)b>ea7#UFSb^7u9q!U zhE({!F?Lm>;z)D_9JVxPM&?SuNU(-G(s@_mjts%l8^t#b*{?a)W1Y?<*Ow<4&NATP zgfElwnxMn&$1fH6pNfWU(gnXrec+(gmh-MV1D4VLG{Gp&vgHiR)2D&|OaM{SdCBB7 zJLaZ^3U^7`{rK`o86uJh3p2@Pi&sxom<>#jm=q0F?r;cx^Vq3m=_{4+CF;a>EaINV zz)dc~9J1Lxy{S-he{QOPJCReh&hmPGCKw7ri~=9Q2Htz+@+dp9a$!;sYQHoqpjZXn zpT%>f?O$lv7rR;>6gK3L2Te~teOT)1D7*q;Un?`b=eMWxf?K#{y)BS?BfoP-Bm@{< z?j={GHNyTy9J||zDWioDA)SARXu(}F+uLzm=3de6=P7V)?fHyTr?9v+FY(^1R6IEq z%egf_ICMoS??NcGr-U5-rrO7cnpyODw2=WAo)|=EoPT|KmN3 zJcxpw;qvj0iU6!s-H8>F0Z#{#D_`$E`6M3)i2sR8RwFu5Qqd23Sk=MKfdDLfGl?D0 z^Q|4>ur#c4CB?U`0Y>{LbuE4$nSW?PupQm!=xUrF%drDM;VJv~-VNY}fp%5Z$l^sO zeWxK8`C*4$8EI)HFvpI^pe()@n?mNs8f{S|Ur%R<24g2}S#V{ztaS1+8X!kwHG734&zvFEzQ|;_38*N*P1*XglG0zshE(XQ1?_d z8L(Tt9qo2dH2&2PV#mN?c~R$`t_8V9yPAS?y`P})%i>ydHghdI#!aJKuEse8lQCCPW! z2Y91tV`IsHb+XA-WPSYAs7ZkO1Hv+vEvIA>tvSq*#sh|TZ^3Jd!yuJYVt<8d>%E-i z#+kk#o?ki(5W~h?g>=2T-eZ2A6;JV#Q#gf6ZYazwR|ZcCx@r4Ug_G@JZG*=~x^hgA z+Y@`Rr1o!LBaaMV%VI7BSJ=97j)Z>tfG1nXRHS5^91+Og~2EC(8!;7O?lf@L7nAPk! zotRgB*L2zLXp}xXTfc55!&)w{Iv%@&R?EI8efqHqHt?FdHv!BwxUca9QqCeAFc~ez z?hdNLN6nsln}&O%gq+?jfc5y6Nr0)8ciRu3yvq>v2Ahfpeb=&rf~9@0I@{mhxA1lm zrVXcKrK?KOhuf1I7}HxLHGjPqAQlnBTkEQx9hFFFyo8sJWx(Ux1vK%c^OE}AZFf79 zdA+{t<^dx`Ls!zkR_TG(Ia4;-yvH34IN&>A0_UItJ4A6^A9vWRZPFaDti+L~FU7Q{ zxZ@{K#qo{Ug8_rC*FuU2MokqGUG1uk`)W=BV6-g!GQ~SqrE+3(ca3pVF`T%Kv|DjA z4wR2(Levd?isvae>NcbMGX#N{v6H)Q#81z;W8o_=dEv|YO2}yo11Ap307hXnZ17+i zow}|g3GDVv;}r&yFbqavCoB}4kPclA{uJWIq~Nn`-gM%DZ|@E`C&xHRLgROQ9E{`G zu@3AIdtyOjRxnv5?Q|8U-H)Ej@^II+F!aK5HE*qM1gz68rm?X(pQp8WF8|c3C$#}( z(ZLSF=SL-Puj4L2X*w3#?8IYH3V^tgit?09-YJWYPTP-bSFK`)REkbuo9%#;kOq%A zP2wt506cyPc|^0i8kaN!1P+vA^_kQgX0uHl!)ky;TTmKh;v`_jMr(76MZvEGa+y7= zk@M|YymqKZe!VrZES_oH0UaqWn^s*<&!klP4*CG+ab#r8d+hhw5p=|#Z|1;Dyr zvP~!^8(zu%eeaY!@s=mtK^G+7=XBf4fU@T#RGP`_McX$fQOoXb6(>N!@Ob4N@Pyf{ z8?g&bY93b4Zc|1oN?e7N9t(VQIKhL2-+z?xKj zv_{jU_ImKb&2x}l1-hbVkm0tBHa)-!TdP~OhOG`&6%MUYT-OTrjNj%%(4CZR`j68Tx+m<#RWmAOtY)79kNedAqUjDDj{krZ;GEdF{p3ZbfiL-b!=xl-BI12BP&93fSB~?GC@5>36jSR*9Hwa)mmn|hf5l{awY84ez diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index 1c6339efcfc..ac88f098cd3 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -11,7 +11,6 @@ import edu.uci.ics.amber.operator.cartesianProduct.CartesianProductOpDesc import edu.uci.ics.amber.operator.dictionary.DictionaryMatcherOpDesc import edu.uci.ics.amber.operator.difference.DifferenceOpDesc import edu.uci.ics.amber.operator.distinct.DistinctOpDesc -import edu.uci.ics.amber.operator.download.BulkDownloaderOpDesc import edu.uci.ics.amber.operator.dummy.DummyOpDesc import edu.uci.ics.amber.operator.filter.SpecializedFilterOpDesc import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc @@ -202,7 +201,6 @@ trait StateTransferFunc new Type(value = classOf[RedditSearchSourceOpDesc], name = "RedditSearch"), new Type(value = classOf[PythonLambdaFunctionOpDesc], name = "PythonLambdaFunction"), new Type(value = classOf[PythonTableReducerOpDesc], name = "PythonTableReducer"), - new Type(value = classOf[BulkDownloaderOpDesc], name = "BulkDownloader"), new Type(value = classOf[URLFetcherOpDesc], name = "URLFetcher"), new Type(value = classOf[CartesianProductOpDesc], name = "CartesianProduct"), new Type(value = classOf[FilledAreaPlotOpDesc], name = "FilledAreaPlot"), diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala deleted file mode 100644 index 212f815feaf..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpDesc.scala +++ /dev/null @@ -1,87 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} -import com.google.common.base.Preconditions -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle -import edu.uci.ics.amber.core.executor.OpExecInitInfo -import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema} -import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} -import edu.uci.ics.amber.operator.LogicalOp -import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} -import edu.uci.ics.amber.operator.metadata.annotations.AutofillAttributeName -import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import edu.uci.ics.amber.workflow.{InputPort, OutputPort} - -class BulkDownloaderOpDesc extends LogicalOp { - - @JsonProperty(required = true) - @JsonSchemaTitle("URL Attribute") - @JsonPropertyDescription( - "Only accepts standard URL format" - ) - @AutofillAttributeName - var urlAttribute: String = _ - - @JsonProperty(required = true) - @JsonSchemaTitle("Result Attribute") - @JsonPropertyDescription( - "Attribute name for results(downloaded file paths)" - ) - var resultAttribute: String = _ - - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = { - PhysicalOp - .oneToOnePhysicalOp( - workflowId, - executionId, - operatorIdentifier, - OpExecInitInfo((_, _) => - new BulkDownloaderOpExec( - getContext, - urlAttribute - ) - ) - ) - .withInputPorts(operatorInfo.inputPorts) - .withOutputPorts(operatorInfo.outputPorts) - .withPropagateSchema( - SchemaPropagationFunc(inputSchemas => - Map( - operatorInfo.outputPorts.head.id -> getOutputSchema( - operatorInfo.inputPorts.map(_.id).map(inputSchemas(_)).toArray - ) - ) - ) - ) - } - - override def operatorInfo: OperatorInfo = - OperatorInfo( - userFriendlyName = "Bulk Downloader", - operatorDescription = "Download urls in a string column", - operatorGroupName = OperatorGroupConstants.UTILITY_GROUP, - inputPorts = List(InputPort()), - outputPorts = List(OutputPort()) - ) - - override def getOutputSchema(schemas: Array[Schema]): Schema = { - Preconditions.checkArgument(schemas.length == 1) - val inputSchema = schemas(0) - val outputSchemaBuilder = Schema.builder() - // keep the same schema from input - outputSchemaBuilder.add(inputSchema) - if (resultAttribute == null || resultAttribute.isEmpty) { - resultAttribute = urlAttribute + " result" - } - outputSchemaBuilder.add( - new Attribute( - resultAttribute, - AttributeType.STRING - ) - ) - outputSchemaBuilder.build() - } -} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala deleted file mode 100644 index b69405d0e82..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExec.scala +++ /dev/null @@ -1,80 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import edu.uci.ics.amber.core.executor.OperatorExecutor -import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} -import edu.uci.ics.amber.core.workflow.WorkflowContext -import edu.uci.ics.amber.operator.source.fetcher.URLFetchUtil.getInputStreamFromURL - -import java.net.URL -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} - -class BulkDownloaderOpExec( - workflowContext: WorkflowContext, - urlAttribute: String -) extends OperatorExecutor { - - private val downloading = new mutable.Queue[Future[TupleLike]]() - - private class DownloadResultIterator(blocking: Boolean) extends Iterator[TupleLike] { - override def hasNext: Boolean = { - if (downloading.isEmpty) { - return false - } - if (blocking) { - Await.result(downloading.head, 5.seconds) - } - downloading.head.isCompleted - } - - override def next(): TupleLike = { - downloading.dequeue().value.get.get - } - } - - override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { - - downloading.enqueue(Future { - downloadTuple(tuple) - }) - new DownloadResultIterator(false) - } - - override def onFinish(port: Int): Iterator[TupleLike] = { - new DownloadResultIterator(true) - } - - private def downloadTuple(tuple: Tuple): TupleLike = { - TupleLike(tuple.getFields ++ Seq(downloadUrl(tuple.getField(urlAttribute)))) - } - - private def downloadUrl(url: String): String = { - try { - Await.result( - Future { - val urlObj = new URL(url) - val input = getInputStreamFromURL(urlObj) - input match { - case Some(contentStream) => - if (contentStream.available() > 0) { - val filename = - s"w${workflowContext.workflowId.id}-e${workflowContext.executionId.id}-${urlObj.getHost - .replace(".", "")}.download" - filename - } else { - throw new RuntimeException(s"content is not available for $url") - } - case None => - throw new RuntimeException(s"fetch content failed for $url") - } - }, - 5.seconds - ) - } catch { - case throwable: Throwable => s"Failed: ${throwable.getMessage}" - } - } - -} diff --git a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala b/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala deleted file mode 100644 index 6a9a1f2239b..00000000000 --- a/core/workflow-operator/src/test/scala/edu/uci/ics/amber/operator/download/BulkDownloaderOpExecSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -package edu.uci.ics.amber.operator.download - -import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} -import edu.uci.ics.amber.core.workflow.WorkflowContext -import edu.uci.ics.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} -import org.scalatest.BeforeAndAfter -import org.scalatest.flatspec.AnyFlatSpec -class BulkDownloaderOpExecSpec extends AnyFlatSpec with BeforeAndAfter { - val tupleSchema: Schema = Schema - .builder() - .add(new Attribute("url", AttributeType.STRING)) - .build() - - val resultSchema: Schema = Schema - .builder() - .add(new Attribute("url", AttributeType.STRING)) - .add(new Attribute("url result", AttributeType.STRING)) - .build() - - val tuple: () => Tuple = () => - Tuple - .builder(tupleSchema) - .add(new Attribute("url", AttributeType.STRING), "http://www.google.com") - .build() - - val tuple2: () => Tuple = () => - Tuple - .builder(tupleSchema) - .add(new Attribute("url", AttributeType.STRING), "https://www.google.com") - .build() - - var opExec: BulkDownloaderOpExec = _ - before { - opExec = new BulkDownloaderOpExec( - new WorkflowContext(DEFAULT_WORKFLOW_ID, DEFAULT_EXECUTION_ID), - urlAttribute = "url" - ) - } - - it should "open" in { - opExec.open() - } - -}