-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update state retry state machine for CPU alloc support #1543
Conversation
Signed-off-by: Robert (Bobby) Evans <[email protected]>
build |
Signed-off-by: Robert (Bobby) Evans <[email protected]>
Signed-off-by: Robert (Bobby) Evans <[email protected]>
build |
build |
nit: Fix typo in PR title |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am half way through SparkResourceAdaptorJni
. It looks good so far, and I have mostly minor stuff. I'll do another pass.
synchronized (Rmm.class) { | ||
if (sra != null && sra.isOpen()) { | ||
sra.associateThreadWithShuffle(threadId); | ||
ThreadStateRegistry.addThread(threadId, thread); | ||
sra.poolThreadWorkingOnTasks(true, threadId, taskIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sra.poolThreadWorkingOnTasks(true, threadId, taskIds); | |
sra.poolThreadWorkingOnTasks(/*isForShuffle*/ true, threadId, taskIds); |
synchronized (Rmm.class) { | ||
if (sra != null && sra.isOpen()) { | ||
ThreadStateRegistry.addThread(threadId, thread); | ||
sra.poolThreadWorkingOnTasks(false, threadId, taskIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sra.poolThreadWorkingOnTasks(false, threadId, taskIds); | |
sra.poolThreadWorkingOnTasks(/*isForShuffle*/ false, threadId, taskIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java does not support named parameters and I really dislike /* comments */ embedded in the code. If this is a deal breaker for you, then I will change it to an enum. Just let me know.
other.time_lost_nanos = 0; | ||
} | ||
|
||
void add(task_metrics& other) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void add(task_metrics& other) { | |
void add(task_metrics const& other) { |
@@ -306,27 +387,27 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { | |||
bool supports_streams() const noexcept override { return resource->supports_streams(); } | |||
|
|||
/** | |||
* Update the internal state so that a specific thread is associated with a task. | |||
* Update the internal state so that a specific thread is dediocated to a task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Update the internal state so that a specific thread is dediocated to a task. | |
* Update the internal state so that a specific thread is dedicated to a task. |
if (is_for_shuffle) { | ||
throw std::invalid_argument("the thread is marked as a non-shuffle thread, and we cannot change it while there are active tasks"); | ||
} else { | ||
throw std::invalid_argument("the thread is marked as a shuffle thread,a nd we cannot change it while there are active tasks"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw std::invalid_argument("the thread is marked as a shuffle thread,a nd we cannot change it while there are active tasks"); | |
throw std::invalid_argument("the thread is marked as a shuffle thread, and we cannot change it while there are active tasks"); |
} | ||
} | ||
} | ||
|
||
auto metrics_at = task_to_metrics.find(task_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is more of a knowledge question. We have two levels of metrics, task and threads. How do these work at a high level? Do thread metrics become task metrics at some point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been a bit confused tracking these in the code as well. Might be helpful to separate out the metrics updating code into separate functions, or something.
} | ||
|
||
void cpu_dealloc(void* addr, size_t amount) { | ||
// addr is not used yet, but is here in case we want it in the future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we file a follow on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not. It is there so we can do memory leak detection/tracking. But we don't have a need for it now. Probably later. Also it matches the RMM APIs so I wanted to keep it the same.
|
||
void cpu_postalloc_success(void* addr, size_t amount, bool blocking, bool was_recursive) { | ||
// addr is not used yet, but is here in case we want it in the future. | ||
// amount is not used yet, but is here in case we want it for debugginig/metrics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow on to file (perhaps the same as the one below?)
jclass, | ||
jlong ptr, | ||
jlong thread_id) | ||
JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_submittingToPool(JNIEnv* env, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, indentation is a little off here and a couple of the following methods.
|
||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a comment here saying that true == is_recursive? I know we have a comment on line 1198, but it's not clear how this boolean is used from the interface (to me) so a comment on the return value would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reading through the code. Only minor comments/questions so far.
} | ||
|
||
/** | ||
* A dedicated task thread is about to wait on work done work on a pool that could transitively |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: work done work on
* @param threadId the ID of the thread to throw the exception (not java thread id). | ||
* @param numOOMs the number of times the RetryOOM should be thrown | ||
* @param numOOMs the number of times the GpuRetryOOM should be thrown | ||
*/ | ||
public static void forceRetryOOM(long threadId, int numOOMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) should we rename these forceGpuRetryOOM, etc..
|
||
/** | ||
* The allocation failed, and spilling didn't save it. | ||
* @param wasOom wat the failure caused by an OOM or something else. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: wat
// Ignored | ||
} | ||
// Force an exception | ||
RmmSpark.forceRetryOOM(threadId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this force a GpuRetryOOM?
assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId)); | ||
|
||
// Force another exception | ||
RmmSpark.forceSplitAndRetryOOM(threadId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - isn't this a GpuSplitAndRetryOOM?
Thread.sleep(100); | ||
} | ||
} catch (InterruptedException e) { | ||
// Ignored we are going to exit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract to ignore is that if we do not rethrow it, we should set the interrupt status.
// Ignored we are going to exit. | |
// we are going to exit. | |
Thread.currentThread().interrupt(); |
try { | ||
while (handle > 0) { | ||
checkAndBreakDeadlocks(); | ||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make it a System property with a default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me. Only minor comments.
|
||
/** | ||
* The allocation failed, and spilling didn't save it. | ||
* @param wasOom wat the failure caused by an OOM or something else. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: wat
int num_times_retry_throw = 0; | ||
int num_times_split_retry_throw = 0; | ||
long time_blocked_nanos = 0; | ||
// The amount of time that this thread has lost due to retries (not inclduing blocked time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: inclduing
// When did the retry time for this thread start, or when did the block time end. | ||
std::chrono::time_point<std::chrono::steady_clock> retry_start_or_block_end; | ||
// Is this thread currently in a marked retry block. This is only used for metrics. | ||
bool is_in_retry = false; | ||
|
||
// The amount of time that this thread has spent in the current retry block (not inclucing block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: inclucing
if (thread->second.pool_task_ids.erase(task_id) != 0) { | ||
std::stringstream ss; | ||
ss << "CURRENT IDs "; | ||
for (const auto& task_id: thread->second.pool_task_ids) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename task_id to avoid conflict with task_id arugment.
} | ||
} | ||
} | ||
|
||
auto metrics_at = task_to_metrics.find(task_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been a bit confused tracking these in the code as well. Might be helpful to separate out the metrics updating code into separate functions, or something.
Signed-off-by: Robert (Bobby) Evans <[email protected]>
Signed-off-by: Robert (Bobby) Evans <[email protected]>
@jbrennan333 @gerashegalov @abellina I think I have addressed all of the review comments. |
build |
1 similar comment
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, my comment does not appear addressed but it does not rise to the level of blocking this PR
Sorry should be fixed now |
build |
class task_metrics { | ||
public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no private then just use struct
:
class task_metrics { | |
public: | |
struct task_metrics { |
build |
1 similar comment
build |
@ttnghia and @gerashegalov I think all of the review comments have been addressed. Please take another look. |
@@ -14,11 +14,13 @@ | |||
* limitations under the License. | |||
*/ | |||
|
|||
#include <algorithm> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is known to be extremely heavy and we should avoid it if possible. What APIs do we need from it here?
log_status("FIXUP", thread_id, found->second.task_id, | ||
found->second.state, ss.str().c_str()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pattern ss.str().c_str()
is used a lot here and can cause undefined behavior/crash when logging is enabled. That is because .str().c_str()
is a pointer to a temporary string produced by .str()
which can be destroyed immediately.
log_status("FIXUP", thread_id, found->second.task_id, | |
found->second.state, ss.str().c_str()); | |
auto const log_str = ss.str(); | |
log_status("FIXUP", thread_id, found->second.task_id, | |
found->second.state, log_str.c_str()); |
Please also take care of the remaining instances of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is, for short string message like this, just use std::to_string
directly:
auto const log_str = std::string("desired task_id ") + std::string(task_id);
log_status("FIXUP", thread_id, found->second.task_id,
found->second.state, log_str.c_str());
We only need stringstream
if we have to do a lot of string concatenations, or we have a for
loop.
auto this_id = static_cast<long>(pthread_self()); | ||
auto thread = threads.find(thread_id_to_wake); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Try to use auto const
. All variables should be const
if possible.
@@ -45,25 +90,16 @@ constexpr char const* SPLIT_AND_RETRY_OOM_CLASS = "com/nvidia/spark/rapids/jni/S | |||
enum thread_state { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use enum class
. In C++, pure enum
can be implicitly converted into int
and vice versa. That is error-prone.
The down size of using enum class
is that, you have to add prefix thread_state::
to all enum values in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course we still can explicitly convert from enum class
to int
, but that is always "explicit", i.e., intentional.
@@ -890,15 +1080,11 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { | |||
bool are_any_tasks_just_blocked = false; | |||
for (auto thread = threads.begin(); thread != threads.end(); thread++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern also repeats a lot. We can do better with this:
for (auto thread = threads.begin(); thread != threads.end(); thread++) { | |
for (auto const& [thread_id, thread_state]: threads) { |
So we won't use some vague names like thread->first
, thread->second
.
@@ -926,35 +1112,57 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource { | |||
* returns true if the thread that ended was a normally running task thread. | |||
* This should be used to decide if wake_up_threads_after_task_finishes is called or not. | |||
*/ | |||
bool remove_thread_association(long thread_id, const std::unique_lock<std::mutex>& lock) | |||
bool remove_thread_association(long thread_id, long remove_task_id, const std::unique_lock<std::mutex>& lock) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please put const
after type (east const) to be consistent with the current repository style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be a serious problem with pointer to temporary object (#1543 (comment)) so I block this until it is fixed.
Signed-off-by: Robert (Bobby) Evans <[email protected]>
12d8011
@ttnghia please take another look I think I have fixed all of your comments. |
build |
This is good now. You need to run |
The pre-commit scripts do not work out of the box with the docker environment so trying to do formatting locally is way too difficult. I will continue to let CI do my pre-commit until it is fixed. |
build |
This updates the state machine to reduce the number of states.
Migrate the shuffle states to be more generic around thread pools instead of just shuffle
And to add in support for reusing the state machine/retry framework for CPU memory allocations too.
This is a breaking change and NVIDIA/spark-rapids#9656 is needed in the plugin to avoid breaking the plugin.