Skip to content

Commit

Permalink
use physical op directly
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicong-Huang committed Dec 16, 2024
1 parent 41c47bc commit 6e348eb
Showing 1 changed file with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
DefaultResourceAllocator,
ExecutionClusterInfo
}
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{DefaultResourceAllocator, ExecutionClusterInfo}
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpDesc
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.PhysicalLink
import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink}
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.traverse.TopologicalOrderIterator

Expand Down Expand Up @@ -169,7 +167,7 @@ abstract class ScheduleGenerator(

// create cache reader and link
val matReaderPhysicalOp: PhysicalOp =
createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink, workflowContext.workflowId)
createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink)
val readerToDestLink =
PhysicalLink(
matReaderPhysicalOp.id,
Expand All @@ -186,20 +184,31 @@ abstract class ScheduleGenerator(

private def createMatReader(
matWriterLogicalOpId: OperatorIdentity,
physicalLink: PhysicalLink,
workflowIdentity: WorkflowIdentity
physicalLink: PhysicalLink
): PhysicalOp = {
val matReader = new CacheSourceOpDesc(
matWriterLogicalOpId,
ResultStorage.getOpResultStorage(workflowIdentity)
)
matReader.setContext(workflowContext)
matReader.setOperatorId(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}")

matReader
.getPhysicalOp(
println("Creating mat reader!!!")
PhysicalOp
.sourcePhysicalOp(
workflowContext.workflowId,
workflowContext.executionId
workflowContext.executionId,
OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"),
OpExecInitInfo((_, _) =>
new CacheSourceOpExec(
ResultStorage.getOpResultStorage(workflowContext.workflowId).get(matWriterLogicalOpId)
)
)
)
.withInputPorts(List.empty)
.withOutputPorts(List(OutputPort()))
.withPropagateSchema(
SchemaPropagationFunc(_ =>
Map(
OutputPort().id -> ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.getSchema(matWriterLogicalOpId)
.get
)
)
)
.propagateSchema()

Expand Down

0 comments on commit 6e348eb

Please sign in to comment.