Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the schema fetching during sink storage assignment when using MongoDB #3173

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,66 +67,6 @@ class WorkflowCompiler(
.foldLeft(physicalPlan) { (plan, link) => plan.addLink(link) }
}
})

// assign the sinks to toAddSink operators' external output ports
subPlan
.topologicalIterator()
.map(subPlan.getOperator)
.flatMap { physicalOp =>
physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort))
}
.filter({
case (physicalOp, (_, (outputPort, _, _))) =>
toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal
})
.foreach({
case (physicalOp, (_, (outputPort, _, schema))) =>
val storage = ResultStorage.getOpResultStorage(context.workflowId)
val storageKey = physicalOp.id.logicalOpId

// due to the size limit of single document in mongoDB (16MB)
// for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage.
val storageType = {
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY
else OpResultStorage.defaultStorageMode
}
if (!storage.contains(storageKey)) {
// get the schema for result storage in certain mode
val sinkStorageSchema: Option[Schema] =
if (storageType == OpResultStorage.MONGODB) {
// use the output schema on the first output port as the schema for storage
Some(schema.right.get)
} else {
None
}
storage.create(
s"${context.executionId}_",
storageKey,
storageType,
sinkStorageSchema
)
// add the sink collection name to the JSON array of sinks
val storageNode = objectMapper.createObjectNode()
storageNode.put("storageType", storageType)
storageNode.put("storageKey", s"${context.executionId}_$storageKey")
sinksPointers.add(storageNode)
}

val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp(
context.workflowId,
context.executionId,
storageKey.id,
outputPort.mode
)
val sinkLink =
PhysicalLink(
physicalOp.id,
outputPort.id,
sinkPhysicalOp.id,
PortIdentity(internal = true)
)
physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink)
})
} match {
case Success(_) =>

Expand All @@ -138,6 +78,66 @@ class WorkflowCompiler(
}
)

// assign the sinks to toAddSink operators' external output ports
physicalPlan
.topologicalIterator()
.map(physicalPlan.getOperator)
.flatMap { physicalOp =>
physicalOp.outputPorts.map(outputPort => (physicalOp, outputPort))
}
.filter({
case (physicalOp, (_, (outputPort, _, _))) =>
toAddSink.contains(physicalOp.id.logicalOpId) && !outputPort.id.internal
})
.foreach({
case (physicalOp, (_, (outputPort, _, schema))) =>
val storage = ResultStorage.getOpResultStorage(context.workflowId)
val storageKey = physicalOp.id.logicalOpId

// due to the size limit of single document in mongoDB (16MB)
// for sinks visualizing HTMLs which could possibly be large in size, we always use the memory storage.
val storageType = {
if (outputPort.mode == SINGLE_SNAPSHOT) OpResultStorage.MEMORY
else OpResultStorage.defaultStorageMode
}
if (!storage.contains(storageKey)) {
// get the schema for result storage in certain mode
val sinkStorageSchema: Option[Schema] =
if (storageType == OpResultStorage.MONGODB) {
// use the output schema on the first output port as the schema for storage
Some(schema.right.get)
} else {
None
}
storage.create(
s"${context.executionId}_",
storageKey,
storageType,
sinkStorageSchema
)
// add the sink collection name to the JSON array of sinks
val storageNode = objectMapper.createObjectNode()
storageNode.put("storageType", storageType)
storageNode.put("storageKey", s"${context.executionId}_$storageKey")
sinksPointers.add(storageNode)
}

val sinkPhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp(
context.workflowId,
context.executionId,
storageKey.id,
outputPort.mode
)
val sinkLink =
PhysicalLink(
physicalOp.id,
outputPort.id,
sinkPhysicalOp.id,
PortIdentity(internal = true)
)
physicalPlan = physicalPlan.addOperator(sinkPhysicalOp).addLink(sinkLink)
})

// update execution entry in MySQL to have pointers to the mongo collections
resultsJSON.set("results", sinksPointers)
ExecutionsMetadataPersistService.tryUpdateExistingExecution(context.executionId) {
Expand Down
Loading