From 6e348eb7df446b133a04bfde25b405fe19cebf35 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 16 Dec 2024 14:55:37 -0800 Subject: [PATCH 1/4] use physical op directly --- .../scheduling/ScheduleGenerator.scala | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 217d77aa14e..8da5d420142 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -1,17 +1,15 @@ package edu.uci.ics.amber.engine.architecture.scheduling +import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex -import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ - DefaultResourceAllocator, - ExecutionClusterInfo -} +import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{DefaultResourceAllocator, ExecutionClusterInfo} import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc -import edu.uci.ics.amber.operator.source.cache.CacheSourceOpDesc +import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity} -import edu.uci.ics.amber.workflow.PhysicalLink +import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink} import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.traverse.TopologicalOrderIterator @@ -169,7 +167,7 @@ abstract class ScheduleGenerator( // create cache reader and link val matReaderPhysicalOp: PhysicalOp = - createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink, workflowContext.workflowId) + createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink) val readerToDestLink = PhysicalLink( matReaderPhysicalOp.id, @@ -186,20 +184,31 @@ abstract class ScheduleGenerator( private def createMatReader( matWriterLogicalOpId: OperatorIdentity, - physicalLink: PhysicalLink, - workflowIdentity: WorkflowIdentity + physicalLink: PhysicalLink ): PhysicalOp = { - val matReader = new CacheSourceOpDesc( - matWriterLogicalOpId, - ResultStorage.getOpResultStorage(workflowIdentity) - ) - matReader.setContext(workflowContext) - matReader.setOperatorId(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}") - - matReader - .getPhysicalOp( + println("Creating mat reader!!!") + PhysicalOp + .sourcePhysicalOp( workflowContext.workflowId, - workflowContext.executionId + workflowContext.executionId, + OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"), + OpExecInitInfo((_, _) => + new CacheSourceOpExec( + ResultStorage.getOpResultStorage(workflowContext.workflowId).get(matWriterLogicalOpId) + ) + ) + ) + .withInputPorts(List.empty) + .withOutputPorts(List(OutputPort())) + .withPropagateSchema( + SchemaPropagationFunc(_ => + Map( + OutputPort().id -> ResultStorage + .getOpResultStorage(workflowContext.workflowId) + .getSchema(matWriterLogicalOpId) + .get + ) + ) ) .propagateSchema() From f86620535499877c464340af4ebb61623de420ff Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 16 Dec 2024 14:55:47 -0800 Subject: [PATCH 2/4] remove cache source desc --- .../source/cache/CacheSourceOpDesc.scala | 45 ------------------- 1 file changed, 45 deletions(-) delete mode 100644 core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpDesc.scala diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpDesc.scala deleted file mode 100644 index 535e2d315c3..00000000000 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/cache/CacheSourceOpDesc.scala +++ /dev/null @@ -1,45 +0,0 @@ -package edu.uci.ics.amber.operator.source.cache - -import edu.uci.ics.amber.core.executor.OpExecInitInfo -import edu.uci.ics.amber.core.storage.result.OpResultStorage -import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} -import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} -import edu.uci.ics.amber.operator.source.SourceOperatorDescriptor -import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} -import edu.uci.ics.amber.workflow.OutputPort - -class CacheSourceOpDesc(val targetSinkStorageId: OperatorIdentity, opResultStorage: OpResultStorage) - extends SourceOperatorDescriptor { - assert(null != targetSinkStorageId) - assert(null != opResultStorage) - - override def sourceSchema(): Schema = opResultStorage.getSchema(targetSinkStorageId).get - - override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = { - PhysicalOp - .sourcePhysicalOp( - workflowId, - executionId, - operatorIdentifier, - OpExecInitInfo((_, _) => new CacheSourceOpExec(opResultStorage.get(targetSinkStorageId))) - ) - .withInputPorts(operatorInfo.inputPorts) - .withOutputPorts(operatorInfo.outputPorts) - .withPropagateSchema( - SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema())) - ) - } - - override def operatorInfo: OperatorInfo = - OperatorInfo( - "Cache Source Operator", - "Retrieve the cached output to src", - OperatorGroupConstants.UTILITY_GROUP, - inputPorts = List.empty, - outputPorts = List(OutputPort()) - ) -} From 713bebbe6fce7d1ea7aa55ab536d5a2ead037d2c Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 16 Dec 2024 15:02:13 -0800 Subject: [PATCH 3/4] update --- .../scheduling/ScheduleGenerator.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 8da5d420142..6c9cec81dec 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -153,7 +153,7 @@ abstract class ScheduleGenerator( // create cache writer and link val matWriterInputSchema = fromOp.outputPorts(fromPortId)._3.toOption.get val matWriterPhysicalOp: PhysicalOp = - createMatWriter(physicalLink, Array(matWriterInputSchema), workflowContext.workflowId) + createMatWriter(physicalLink, Array(matWriterInputSchema)) val sourceToWriterLink = PhysicalLink( fromOp.id, @@ -186,7 +186,7 @@ abstract class ScheduleGenerator( matWriterLogicalOpId: OperatorIdentity, physicalLink: PhysicalLink ): PhysicalOp = { - println("Creating mat reader!!!") + val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId) PhysicalOp .sourcePhysicalOp( workflowContext.workflowId, @@ -194,7 +194,7 @@ abstract class ScheduleGenerator( OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"), OpExecInitInfo((_, _) => new CacheSourceOpExec( - ResultStorage.getOpResultStorage(workflowContext.workflowId).get(matWriterLogicalOpId) + opResultStorage.get(matWriterLogicalOpId) ) ) ) @@ -203,10 +203,7 @@ abstract class ScheduleGenerator( .withPropagateSchema( SchemaPropagationFunc(_ => Map( - OutputPort().id -> ResultStorage - .getOpResultStorage(workflowContext.workflowId) - .getSchema(matWriterLogicalOpId) - .get + OutputPort().id -> opResultStorage.getSchema(matWriterLogicalOpId).get ) ) ) @@ -216,8 +213,7 @@ abstract class ScheduleGenerator( private def createMatWriter( physicalLink: PhysicalLink, - inputSchema: Array[Schema], - workflowIdentity: WorkflowIdentity + inputSchema: Array[Schema] ): PhysicalOp = { val matWriter = new ProgressiveSinkOpDesc() matWriter.setContext(workflowContext) @@ -225,7 +221,7 @@ abstract class ScheduleGenerator( // expect exactly one input port and one output port val schema = matWriter.getOutputSchema(inputSchema) ResultStorage - .getOpResultStorage(workflowIdentity) + .getOpResultStorage(workflowContext.workflowId) .create( key = matWriter.operatorIdentifier, mode = OpResultStorage.defaultStorageMode, From a5703613bcd7dedf8c55ca229030b88cee138975 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Mon, 16 Dec 2024 22:05:33 -0800 Subject: [PATCH 4/4] fix format --- .../scheduling/ScheduleGenerator.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index 6c9cec81dec..800d8344fd3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -3,9 +3,17 @@ package edu.uci.ics.amber.engine.architecture.scheduling import edu.uci.ics.amber.core.executor.OpExecInitInfo import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage} import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, SchemaPropagationFunc, WorkflowContext} +import edu.uci.ics.amber.core.workflow.{ + PhysicalOp, + PhysicalPlan, + SchemaPropagationFunc, + WorkflowContext +} import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex -import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{DefaultResourceAllocator, ExecutionClusterInfo} +import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ + DefaultResourceAllocator, + ExecutionClusterInfo +} import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity} @@ -186,7 +194,7 @@ abstract class ScheduleGenerator( matWriterLogicalOpId: OperatorIdentity, physicalLink: PhysicalLink ): PhysicalOp = { - val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId) + val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId) PhysicalOp .sourcePhysicalOp( workflowContext.workflowId,