Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AccumManager Usage + Refactoring #14

Open
wants to merge 19 commits into
base: rapids-tools-815-1202
Choose a base branch
from

Conversation

bilalbari
Copy link
Collaborator

No description provided.

cindyyuanjiang and others added 10 commits July 29, 2024 11:35
…1237)

* Fix status report for wildcards

Signed-off-by: Partho Sarthi <[email protected]>

* Add unit tests for wildcard status report

Signed-off-by: Partho Sarthi <[email protected]>

* Rename 'numExecutors' to 'numExecutorNodes'

Signed-off-by: Partho Sarthi <[email protected]>

* Revert "Rename 'numExecutors' to 'numExecutorNodes'"

This reverts commit 45fd8bd186ac3962fb1bc34be025492554e8cbac.

---------

Signed-off-by: Partho Sarthi <[email protected]>
* Add exception handling and NA cols

Signed-off-by: Partho Sarthi <[email protected]>

bla

Signed-off-by: Partho Sarthi <[email protected]>

* Rename 'numExecutors' to 'numExecutorNodes'

Signed-off-by: Partho Sarthi <[email protected]>

* Add typing in for-loops

Signed-off-by: Partho Sarthi <[email protected]>

* Fix linter

Signed-off-by: Partho Sarthi <[email protected]>

* Revert "Rename 'numExecutors' to 'numExecutorNodes'"

This reverts commit e78e871.

---------

Signed-off-by: Partho Sarthi <[email protected]>
…1204)

Qualification tool should print kryo related recommendations

---------

Signed-off-by: Thomas Graves <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
@bilalbari bilalbari changed the title Tools 1202 bilal AccumManager Usage + Refactoring Aug 1, 2024
bilalbari and others added 3 commits August 1, 2024 12:07
value match {
case Some(v) =>
// This assert prevents out of order events to be processed
assert( v >= existingValue,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't keep the assert so that it does not crash on the user side.
we use the max of the two values

stageValuesMap.put(stageId, Max(v, existingValue))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to remove assert

Comment on lines 45 to 50
val incomingUpdate = update.getOrElse(0L)
assert( incomingUpdate >= existingValue,
s"Stage $stageId: Out of order events detected.")
// this case is for metrics that are not parsed as long
// We track the accumId to stageId and taskId mapping
stageValuesMap.put(stageId, incomingUpdate)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. We do not keep the assert. it was used for debugging purpose.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to remove assert

new mutable.HashMap[Int, Long]()

def addAccToStage(stageId: Int,
accumulableInfo: AccumulableInfo,
update: Option[Long] = None): Unit = {
val value = accumulableInfo.value.flatMap(parseAccumFieldToLong)
val existingValue = stageValuesMap.getOrElse(stageId, 0L)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified code..

Suggested change
val existingValue = stageValuesMap.getOrElse(stageId, 0L)
val existingValue = stageValuesMap.getOrElse(stageId, 0L)
val incomingValue = value match {
case Some(v) => v
case _ =>
// ...coments
update.getOrElse(0L)
}
stageValuesMap.put(stageId, max(existingValue, incomingValue)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with simplified code

// TODO: Task can update an accum multiple times. Should account for that case.
// This is for cases where same task updates the same accum multiple times
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the TODO because it has been resolved.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

}
// update the stage value map if necessary
if (updateStageFlag) {
addAccToStage(stageId, accumulableInfo, update)
addAccToStage(stageId, accumulableInfo, update.map(_ + existingUpdate))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky..I doubt we need that change line change.

If a task accumulates to the accumulable multiple time, then the "Value" coming with information should be fine.
We don't need to add it before calling the stageMethod because we did that the first time.
If it is None, then we handle that already in the addAccToStage method.
This saves us an extra allocation everytime we call the method by getting rid of the update.map(_ + existingUpdate)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

min = accumStats.map(_.min).getOrElse(0L),
median = accumStats.map(_.med).getOrElse(0L),
max = accumStats.map(_.max).getOrElse(0L),
total = accumStats.map(_.total).getOrElse(0L)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too many accumStats.map here
we can do that stats are defined after val accumStats

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

)
}
}.toSeq
app.accumManager.accumInfoMap.map( entry => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look correct.
calculateAccStats(accumId) gets the stats by accumulating the taskMap.
In order to get stageLevel accumulable values, we need to filter stageIds that belong to the specific stage. This is because an accumulable can be updated by more than one Stage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes have been corrected in the latest code to do stage level metric aggregation

bilalbari and others added 6 commits August 1, 2024 16:18
Signed-off-by: Sayed Bilal Bari <[email protected]>
* Fix stage level metrics report

Signed-off-by: Niranjan Artal <[email protected]>

* report based on stageID

Signed-off-by: Niranjan Artal <[email protected]>

---------

Signed-off-by: Niranjan Artal <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants