Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicong-Huang committed Dec 16, 2024
1 parent f866205 commit 713bebb
Showing 1 changed file with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ abstract class ScheduleGenerator(
// create cache writer and link
val matWriterInputSchema = fromOp.outputPorts(fromPortId)._3.toOption.get
val matWriterPhysicalOp: PhysicalOp =
createMatWriter(physicalLink, Array(matWriterInputSchema), workflowContext.workflowId)
createMatWriter(physicalLink, Array(matWriterInputSchema))
val sourceToWriterLink =
PhysicalLink(
fromOp.id,
Expand Down Expand Up @@ -186,15 +186,15 @@ abstract class ScheduleGenerator(
matWriterLogicalOpId: OperatorIdentity,
physicalLink: PhysicalLink
): PhysicalOp = {
println("Creating mat reader!!!")
val opResultStorage = ResultStorage.getOpResultStorage(workflowContext.workflowId)
PhysicalOp
.sourcePhysicalOp(
workflowContext.workflowId,
workflowContext.executionId,
OperatorIdentity(s"cacheSource_${getMatIdFromPhysicalLink(physicalLink)}"),
OpExecInitInfo((_, _) =>
new CacheSourceOpExec(
ResultStorage.getOpResultStorage(workflowContext.workflowId).get(matWriterLogicalOpId)
opResultStorage.get(matWriterLogicalOpId)
)
)
)
Expand All @@ -203,10 +203,7 @@ abstract class ScheduleGenerator(
.withPropagateSchema(
SchemaPropagationFunc(_ =>
Map(
OutputPort().id -> ResultStorage
.getOpResultStorage(workflowContext.workflowId)
.getSchema(matWriterLogicalOpId)
.get
OutputPort().id -> opResultStorage.getSchema(matWriterLogicalOpId).get
)
)
)
Expand All @@ -216,16 +213,15 @@ abstract class ScheduleGenerator(

private def createMatWriter(
physicalLink: PhysicalLink,
inputSchema: Array[Schema],
workflowIdentity: WorkflowIdentity
inputSchema: Array[Schema]
): PhysicalOp = {
val matWriter = new ProgressiveSinkOpDesc()
matWriter.setContext(workflowContext)
matWriter.setOperatorId(s"materialized_${getMatIdFromPhysicalLink(physicalLink)}")
// expect exactly one input port and one output port
val schema = matWriter.getOutputSchema(inputSchema)
ResultStorage
.getOpResultStorage(workflowIdentity)
.getOpResultStorage(workflowContext.workflowId)
.create(
key = matWriter.operatorIdentifier,
mode = OpResultStorage.defaultStorageMode,
Expand Down

0 comments on commit 713bebb

Please sign in to comment.