diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 217d77aa14e..8da5d420142 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -1,17 +1,15 @@ package edu.uci.ics.amber.engine.architecture.scheduling +import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex -import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ - DefaultResourceAllocator, - ExecutionClusterInfo -} +import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{DefaultResourceAllocator, ExecutionClusterInfo} import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc -import edu.uci.ics.amber.operator.source.cache.CacheSourceOpDesc +import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity} -import edu.uci.ics.amber.workflow.PhysicalLink +import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink} import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.traverse.TopologicalOrderIterator @@ -169,7 +167,7 @@ abstract class ScheduleGenerator( // create cache reader and link val matReaderPhysicalOp: PhysicalOp = - createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink, workflowContext.workflowId) + createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink) val readerToDestLink = PhysicalLink( matReaderPhysicalOp.id, @@ -186,20 +184,31 @@ abstract class ScheduleGenerator( private def createMatReader( matWriterLogicalOpId: OperatorIdentity, - physicalLink: PhysicalLink, - workflowIdentity: WorkflowIdentity + physicalLink: PhysicalLink ): PhysicalOp = { - val matReader = new CacheSourceOpDesc( - matWriterLogicalOpId, - ResultStorage.getOpResultStorage(workflowIdentity) - ) - matReader.setContext(workflowContext) - matReader.setOperatorId(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}") - - matReader - .getPhysicalOp( + println("Creating mat reader!!!") + PhysicalOp + .sourcePhysicalOp( workflowContext.workflowId, - workflowContext.executionId + workflowContext.executionId, + OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"), + OpExecInitInfo((_, _) => + new CacheSourceOpExec( + ResultStorage.getOpResultStorage(workflowContext.workflowId).get(matWriterLogicalOpId) + ) + ) + ) + .withInputPorts(List.empty) + .withOutputPorts(List(OutputPort())) + .withPropagateSchema( + SchemaPropagationFunc(_ => + Map( + OutputPort().id -> ResultStorage + .getOpResultStorage(workflowContext.workflowId) + .getSchema(matWriterLogicalOpId) + .get + ) + ) ) .propagateSchema()