Skip to content

Commit

Permalink
remove sink desc
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicong-Huang committed Dec 19, 2024
1 parent 9d670d8 commit 69674b9
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{DefaultResourceAllocator, ExecutionClusterInfo}
import edu.uci.ics.amber.operator.SpecialPhysicalOpFactory
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity}
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity}
import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink}
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.traverse.TopologicalOrderIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object WorkflowCompiler {
errorList: ArrayBuffer[(OperatorIdentity, Throwable)] // Mandatory error list
): Map[OperatorIdentity, List[Option[Schema]]] = {
val physicalInputSchemas =
physicalPlan.operators.filter(op => !op.isSinkOperator).map { physicalOp =>
physicalPlan.operators.map { physicalOp =>
// Process inputPorts and capture Throwable values in the errorList
physicalOp.id -> physicalOp.inputPorts.values
.filterNot(_._1.id.internal)
Expand Down Expand Up @@ -164,19 +164,14 @@ class WorkflowCompiler(
val errorList = new ArrayBuffer[(OperatorIdentity, Throwable)]()
var opIdToInputSchema: Map[OperatorIdentity, List[Option[Schema]]] = Map()
// 1. convert the pojo to logical plan
var logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo)
val logicalPlan: LogicalPlan = LogicalPlan(logicalPlanPojo)

// 2. Manipulate logical plan by:
// - inject sink
logicalPlan = SinkInjectionTransformer.transform(
logicalPlanPojo.opsToViewResult,
logicalPlan
)
// - resolve the file name in each scan source operator
logicalPlan.resolveScanSourceOpFileName(Some(errorList))

// 3. expand the logical plan to the physical plan,
// 3. expand the logical plan to the physical plan
val physicalPlan = expandLogicalPlan(logicalPlan, Some(errorList))

if (errorList.isEmpty) {
// no error during the expansion, then do:
// - collect the input schema for each op
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import edu.uci.ics.amber.operator.randomksampling.RandomKSamplingOpDesc
import edu.uci.ics.amber.operator.regex.RegexOpDesc
import edu.uci.ics.amber.operator.reservoirsampling.ReservoirSamplingOpDesc
import edu.uci.ics.amber.operator.sentiment.SentimentAnalysisOpDesc
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.sklearn.{
SklearnAdaptiveBoostingOpDesc,
SklearnBaggingOpDesc,
Expand Down Expand Up @@ -158,7 +157,6 @@ trait StateTransferFunc
value = classOf[TwitterSearchSourceOpDesc],
name = "TwitterSearch"
),
new Type(value = classOf[ProgressiveSinkOpDesc], name = "SimpleSink"),
new Type(value = classOf[CandlestickChartOpDesc], name = "CandlestickChart"),
new Type(value = classOf[SplitOpDesc], name = "Split"),
new Type(value = classOf[ContourPlotOpDesc], name = "ContourPlot"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package edu.uci.ics.amber.operator

import edu.uci.ics.amber.core.storage.FileResolver
import edu.uci.ics.amber.operator.aggregate.{
AggregateOpDesc,
AggregationFunction,
AggregationOperation
}
import edu.uci.ics.amber.operator.aggregate.{AggregateOpDesc, AggregationFunction, AggregationOperation}
import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc
import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.source.scan.csv.CSVScanSourceOpDesc
import edu.uci.ics.amber.operator.source.scan.json.JSONLScanSourceOpDesc
import edu.uci.ics.amber.operator.source.sql.asterixdb.AsterixDBSourceOpDesc
Expand Down Expand Up @@ -144,10 +139,6 @@ object TestOperators {
asterixDBOp
}

def sinkOpDesc(): ProgressiveSinkOpDesc = {
new ProgressiveSinkOpDesc()
}

def pythonOpDesc(): PythonUDFOpDescV2 = {
val udf = new PythonUDFOpDescV2()
udf.workers = 1
Expand Down

This file was deleted.

0 comments on commit 69674b9

Please sign in to comment.