Skip to content

Commit

Permalink
fix storage key issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Yicong-Huang committed Dec 19, 2024
1 parent 3dbfc33 commit dffc702
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ class ExecutionResultService(
ResultStorage
.getOpResultStorage(workflowIdentity)
.getAllKeys
.filter(!_.id.startsWith("materialized_"))
.map(storageKey => {
val count = ResultStorage
.getOpResultStorage(workflowIdentity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class WorkflowCompiler(
physicalOp.id,
outputPort.id,
sinkPhysicalOp.id,
PortIdentity(internal = true)
PortIdentity()
)
physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ object SpecialPhysicalOpFactory {
)
)
)
.withInputPorts(List(InputPort(PortIdentity(internal = true))))
.withOutputPorts(List(OutputPort(PortIdentity(internal = true))))
.withInputPorts(List(InputPort(PortIdentity())))
.withOutputPorts(List(OutputPort(PortIdentity())))
.withPropagateSchema(
SchemaPropagationFunc((inputSchemas: Map[PortIdentity, Schema]) => {
// Get the first schema from inputSchemas
Expand Down

0 comments on commit dffc702

Please sign in to comment.