diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 8da5d42014..6c9cec81de 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -153,7 +153,7 @@ abstract class ScheduleGenerator( // create cache writer and link val matWriterInputSchema = fromOp.outputPorts(fromPortId)._3.toOption.get val matWriterPhysicalOp: PhysicalOp = - createMatWriter(physicalLink, Array(matWriterInputSchema), workflowContext.workflowId) + createMatWriter(physicalLink, Array(matWriterInputSchema)) val sourceToWriterLink = PhysicalLink( fromOp.id, @@ -186,7 +186,7 @@ abstract class ScheduleGenerator( matWriterLogicalOpId: OperatorIdentity, physicalLink: PhysicalLink ): PhysicalOp = { - println("Creating mat reader!!!") + val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId) PhysicalOp .sourcePhysicalOp( workflowContext.workflowId, @@ -194,7 +194,7 @@ abstract class ScheduleGenerator( OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"), OpExecInitInfo((_, _) => new CacheSourceOpExec( - ResultStorage.getOpResultStorage(workflowContext.workflowId).get(matWriterLogicalOpId) + opResultStorage.get(matWriterLogicalOpId) ) ) ) @@ -203,10 +203,7 @@ abstract class ScheduleGenerator( .withPropagateSchema( SchemaPropagationFunc(_ => Map( - OutputPort().id -> ResultStorage - .getOpResultStorage(workflowContext.workflowId) - .getSchema(matWriterLogicalOpId) - .get + OutputPort().id -> opResultStorage.getSchema(matWriterLogicalOpId).get ) ) ) @@ -216,8 +213,7 @@ abstract class ScheduleGenerator( private def createMatWriter( physicalLink: PhysicalLink, - inputSchema: Array[Schema], - workflowIdentity: WorkflowIdentity + inputSchema: Array[Schema] ): PhysicalOp = { val matWriter = new ProgressiveSinkOpDesc() matWriter.setContext(workflowContext) @@ -225,7 +221,7 @@ abstract class ScheduleGenerator( // expect exactly one input port and one output port val schema = matWriter.getOutputSchema(inputSchema) ResultStorage - .getOpResultStorage(workflowIdentity) + .getOpResultStorage(workflowContext.workflowId) .create( key = matWriter.operatorIdentifier, mode = OpResultStorage.defaultStorageMode,