-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AccumManager Usage + Refactoring #14
base: rapids-tools-815-1202
Are you sure you want to change the base?
Conversation
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: Partho Sarthi <[email protected]>
…1237) * Fix status report for wildcards Signed-off-by: Partho Sarthi <[email protected]> * Add unit tests for wildcard status report Signed-off-by: Partho Sarthi <[email protected]> * Rename 'numExecutors' to 'numExecutorNodes' Signed-off-by: Partho Sarthi <[email protected]> * Revert "Rename 'numExecutors' to 'numExecutorNodes'" This reverts commit 45fd8bd186ac3962fb1bc34be025492554e8cbac. --------- Signed-off-by: Partho Sarthi <[email protected]>
* Add exception handling and NA cols Signed-off-by: Partho Sarthi <[email protected]> bla Signed-off-by: Partho Sarthi <[email protected]> * Rename 'numExecutors' to 'numExecutorNodes' Signed-off-by: Partho Sarthi <[email protected]> * Add typing in for-loops Signed-off-by: Partho Sarthi <[email protected]> * Fix linter Signed-off-by: Partho Sarthi <[email protected]> * Revert "Rename 'numExecutors' to 'numExecutorNodes'" This reverts commit e78e871. --------- Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: cindyyuanjiang <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
…1204) Qualification tool should print kryo related recommendations --------- Signed-off-by: Thomas Graves <[email protected]>
…#1243) Signed-off-by: Partho Sarthi <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Lee Yang <[email protected]>
value match { | ||
case Some(v) => | ||
// This assert prevents out of order events to be processed | ||
assert( v >= existingValue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't keep the assert so that it does not crash on the user side.
we use the max of the two values
stageValuesMap.put(stageId, Max(v, existingValue))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to remove assert
val incomingUpdate = update.getOrElse(0L) | ||
assert( incomingUpdate >= existingValue, | ||
s"Stage $stageId: Out of order events detected.") | ||
// this case is for metrics that are not parsed as long | ||
// We track the accumId to stageId and taskId mapping | ||
stageValuesMap.put(stageId, incomingUpdate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. We do not keep the assert. it was used for debugging purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to remove assert
new mutable.HashMap[Int, Long]() | ||
|
||
def addAccToStage(stageId: Int, | ||
accumulableInfo: AccumulableInfo, | ||
update: Option[Long] = None): Unit = { | ||
val value = accumulableInfo.value.flatMap(parseAccumFieldToLong) | ||
val existingValue = stageValuesMap.getOrElse(stageId, 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified code..
val existingValue = stageValuesMap.getOrElse(stageId, 0L) | |
val existingValue = stageValuesMap.getOrElse(stageId, 0L) | |
val incomingValue = value match { | |
case Some(v) => v | |
case _ => | |
// ...coments | |
update.getOrElse(0L) | |
} | |
stageValuesMap.put(stageId, max(existingValue, incomingValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with simplified code
// TODO: Task can update an accum multiple times. Should account for that case. | ||
// This is for cases where same task updates the same accum multiple times |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the TODO because it has been resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
} | ||
// update the stage value map if necessary | ||
if (updateStageFlag) { | ||
addAccToStage(stageId, accumulableInfo, update) | ||
addAccToStage(stageId, accumulableInfo, update.map(_ + existingUpdate)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tricky..I doubt we need that change line change.
If a task accumulates to the accumulable multiple time, then the "Value" coming with information should be fine.
We don't need to add it before calling the stageMethod because we did that the first time.
If it is None, then we handle that already in the addAccToStage
method.
This saves us an extra allocation everytime we call the method by getting rid of the update.map(_ + existingUpdate)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
min = accumStats.map(_.min).getOrElse(0L), | ||
median = accumStats.map(_.med).getOrElse(0L), | ||
max = accumStats.map(_.max).getOrElse(0L), | ||
total = accumStats.map(_.total).getOrElse(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too many accumStats.map
here
we can do that stats are defined after val accumStats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
) | ||
} | ||
}.toSeq | ||
app.accumManager.accumInfoMap.map( entry => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look correct.
calculateAccStats(accumId)
gets the stats by accumulating the taskMap.
In order to get stageLevel accumulable values, we need to filter stageIds that belong to the specific stage. This is because an accumulable can be updated by more than one Stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes have been corrected in the latest code to do stage level metric aggregation
Signed-off-by: Sayed Bilal Bari <[email protected]>
* Fix stage level metrics report Signed-off-by: Niranjan Artal <[email protected]> * report based on stageID Signed-off-by: Niranjan Artal <[email protected]> --------- Signed-off-by: Niranjan Artal <[email protected]>
Signed-off-by: Lee Yang <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
No description provided.