Skip to content

Commit

Permalink
Remove cache source descriptor (#3163)
Browse files Browse the repository at this point in the history
This PR removes the cache source descriptor. We explicitly create a
physical operator to read cache during scheduling (cut off physical
links for materialization).
  • Loading branch information
Yicong-Huang authored Dec 17, 2024
1 parent 752306f commit 270f6d7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.core.workflow.{
PhysicalOp,
PhysicalPlan,
SchemaPropagationFunc,
WorkflowContext
}
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
DefaultResourceAllocator,
ExecutionClusterInfo
}
import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpDesc
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, PhysicalOpIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.PhysicalLink
import edu.uci.ics.amber.workflow.{OutputPort, PhysicalLink}
import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.traverse.TopologicalOrderIterator

Expand Down Expand Up @@ -155,7 +161,7 @@ abstract class ScheduleGenerator(
// create cache writer and link
val matWriterInputSchema = fromOp.outputPorts(fromPortId)._3.toOption.get
val matWriterPhysicalOp: PhysicalOp =
createMatWriter(physicalLink, Array(matWriterInputSchema), workflowContext.workflowId)
createMatWriter(physicalLink, Array(matWriterInputSchema))
val sourceToWriterLink =
PhysicalLink(
fromOp.id,
Expand All @@ -169,7 +175,7 @@ abstract class ScheduleGenerator(

// create cache reader and link
val matReaderPhysicalOp: PhysicalOp =
createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink, workflowContext.workflowId)
createMatReader(matWriterPhysicalOp.id.logicalOpId, physicalLink)
val readerToDestLink =
PhysicalLink(
matReaderPhysicalOp.id,
Expand All @@ -186,37 +192,44 @@ abstract class ScheduleGenerator(

private def createMatReader(
matWriterLogicalOpId: OperatorIdentity,
physicalLink: PhysicalLink,
workflowIdentity: WorkflowIdentity
physicalLink: PhysicalLink
): PhysicalOp = {
val matReader = new CacheSourceOpDesc(
matWriterLogicalOpId,
ResultStorage.getOpResultStorage(workflowIdentity)
)
matReader.setContext(workflowContext)
matReader.setOperatorId(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}")

matReader
.getPhysicalOp(
val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId)
PhysicalOp
.sourcePhysicalOp(
workflowContext.workflowId,
workflowContext.executionId
workflowContext.executionId,
OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"),
OpExecInitInfo((_, _) =>
new CacheSourceOpExec(
opResultStorage.get(matWriterLogicalOpId)
)
)
)
.withInputPorts(List.empty)
.withOutputPorts(List(OutputPort()))
.withPropagateSchema(
SchemaPropagationFunc(_ =>
Map(
OutputPort().id -> opResultStorage.getSchema(matWriterLogicalOpId).get
)
)
)
.propagateSchema()

}

private def createMatWriter(
physicalLink: PhysicalLink,
inputSchema: Array[Schema],
workflowIdentity: WorkflowIdentity
inputSchema: Array[Schema]
): PhysicalOp = {
val matWriter = new ProgressiveSinkOpDesc()
matWriter.setContext(workflowContext)
matWriter.setOperatorId(s"materialized_${getMatIdFromPhysicalLink(physicalLink)}")
// expect exactly one input port and one output port
val schema = matWriter.getOutputSchema(inputSchema)
ResultStorage
.getOpResultStorage(workflowIdentity)
.getOpResultStorage(workflowContext.workflowId)
.create(
key = matWriter.operatorIdentifier,
mode = OpResultStorage.defaultStorageMode,
Expand Down

This file was deleted.

0 comments on commit 270f6d7

Please sign in to comment.