Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update state retry state machine for CPU alloc support #1543

Merged
merged 13 commits into from
Nov 29, 2023

Conversation

revans2
Copy link
Collaborator

@revans2 revans2 commented Nov 7, 2023

This updates the state machine to reduce the number of states.
Migrate the shuffle states to be more generic around thread pools instead of just shuffle
And to add in support for reusing the state machine/retry framework for CPU memory allocations too.

This is a breaking change and NVIDIA/spark-rapids#9656 is needed in the plugin to avoid breaking the plugin.

@revans2
Copy link
Collaborator Author

revans2 commented Nov 7, 2023

build

@revans2
Copy link
Collaborator Author

revans2 commented Nov 16, 2023

build

@abellina abellina self-requested a review November 16, 2023 22:53
@revans2 revans2 changed the base branch from branch-23.12 to branch-24.02 November 17, 2023 14:15
@revans2
Copy link
Collaborator Author

revans2 commented Nov 17, 2023

build

@parthosa
Copy link
Contributor

nit: Fix typo in PR title state maching

@revans2 revans2 changed the title Update state retry state maching for CPU alloc support Update state retry state machine for CPU alloc support Nov 17, 2023
Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am half way through SparkResourceAdaptorJni. It looks good so far, and I have mostly minor stuff. I'll do another pass.

synchronized (Rmm.class) {
if (sra != null && sra.isOpen()) {
sra.associateThreadWithShuffle(threadId);
ThreadStateRegistry.addThread(threadId, thread);
sra.poolThreadWorkingOnTasks(true, threadId, taskIds);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sra.poolThreadWorkingOnTasks(true, threadId, taskIds);
sra.poolThreadWorkingOnTasks(/*isForShuffle*/ true, threadId, taskIds);

synchronized (Rmm.class) {
if (sra != null && sra.isOpen()) {
ThreadStateRegistry.addThread(threadId, thread);
sra.poolThreadWorkingOnTasks(false, threadId, taskIds);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sra.poolThreadWorkingOnTasks(false, threadId, taskIds);
sra.poolThreadWorkingOnTasks(/*isForShuffle*/ false, threadId, taskIds);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java does not support named parameters and I really dislike /* comments */ embedded in the code. If this is a deal breaker for you, then I will change it to an enum. Just let me know.

other.time_lost_nanos = 0;
}

void add(task_metrics& other) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void add(task_metrics& other) {
void add(task_metrics const& other) {

@@ -306,27 +387,27 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
bool supports_streams() const noexcept override { return resource->supports_streams(); }

/**
* Update the internal state so that a specific thread is associated with a task.
* Update the internal state so that a specific thread is dediocated to a task.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Update the internal state so that a specific thread is dediocated to a task.
* Update the internal state so that a specific thread is dedicated to a task.

if (is_for_shuffle) {
throw std::invalid_argument("the thread is marked as a non-shuffle thread, and we cannot change it while there are active tasks");
} else {
throw std::invalid_argument("the thread is marked as a shuffle thread,a nd we cannot change it while there are active tasks");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw std::invalid_argument("the thread is marked as a shuffle thread,a nd we cannot change it while there are active tasks");
throw std::invalid_argument("the thread is marked as a shuffle thread, and we cannot change it while there are active tasks");

}
}
}

auto metrics_at = task_to_metrics.find(task_id);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is more of a knowledge question. We have two levels of metrics, task and threads. How do these work at a high level? Do thread metrics become task metrics at some point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been a bit confused tracking these in the code as well. Might be helpful to separate out the metrics updating code into separate functions, or something.

}

void cpu_dealloc(void* addr, size_t amount) {
// addr is not used yet, but is here in case we want it in the future.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we file a follow on?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not. It is there so we can do memory leak detection/tracking. But we don't have a need for it now. Probably later. Also it matches the RMM APIs so I wanted to keep it the same.


void cpu_postalloc_success(void* addr, size_t amount, bool blocking, bool was_recursive) {
// addr is not used yet, but is here in case we want it in the future.
// amount is not used yet, but is here in case we want it for debugginig/metrics.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow on to file (perhaps the same as the one below?)

jclass,
jlong ptr,
jlong thread_id)
JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_SparkResourceAdaptor_submittingToPool(JNIEnv* env,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, indentation is a little off here and a couple of the following methods.


return true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a comment here saying that true == is_recursive? I know we have a comment on line 1198, but it's not clear how this boolean is used from the interface (to me) so a comment on the return value would be nice.

Copy link
Contributor

@jbrennan333 jbrennan333 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reading through the code. Only minor comments/questions so far.

}

/**
* A dedicated task thread is about to wait on work done work on a pool that could transitively
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: work done work on

* @param threadId the ID of the thread to throw the exception (not java thread id).
* @param numOOMs the number of times the RetryOOM should be thrown
* @param numOOMs the number of times the GpuRetryOOM should be thrown
*/
public static void forceRetryOOM(long threadId, int numOOMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) should we rename these forceGpuRetryOOM, etc..


/**
* The allocation failed, and spilling didn't save it.
* @param wasOom wat the failure caused by an OOM or something else.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: wat

// Ignored
}
// Force an exception
RmmSpark.forceRetryOOM(threadId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this force a GpuRetryOOM?

assertEquals(RmmSparkThreadState.THREAD_RUNNING, RmmSpark.getStateOf(threadId));

// Force another exception
RmmSpark.forceSplitAndRetryOOM(threadId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above - isn't this a GpuSplitAndRetryOOM?

Thread.sleep(100);
}
} catch (InterruptedException e) {
// Ignored we are going to exit.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract to ignore is that if we do not rethrow it, we should set the interrupt status.

Suggested change
// Ignored we are going to exit.
// we are going to exit.
Thread.currentThread().interrupt();

try {
while (handle > 0) {
checkAndBreakDeadlocks();
Thread.sleep(100);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make it a System property with a default

Copy link
Contributor

@jbrennan333 jbrennan333 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. Only minor comments.


/**
* The allocation failed, and spilling didn't save it.
* @param wasOom wat the failure caused by an OOM or something else.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: wat

int num_times_retry_throw = 0;
int num_times_split_retry_throw = 0;
long time_blocked_nanos = 0;
// The amount of time that this thread has lost due to retries (not inclduing blocked time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: inclduing

// When did the retry time for this thread start, or when did the block time end.
std::chrono::time_point<std::chrono::steady_clock> retry_start_or_block_end;
// Is this thread currently in a marked retry block. This is only used for metrics.
bool is_in_retry = false;

// The amount of time that this thread has spent in the current retry block (not inclucing block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: inclucing

if (thread->second.pool_task_ids.erase(task_id) != 0) {
std::stringstream ss;
ss << "CURRENT IDs ";
for (const auto& task_id: thread->second.pool_task_ids) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename task_id to avoid conflict with task_id arugment.

}
}
}

auto metrics_at = task_to_metrics.find(task_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been a bit confused tracking these in the code as well. Might be helpful to separate out the metrics updating code into separate functions, or something.

@revans2
Copy link
Collaborator Author

revans2 commented Nov 21, 2023

@jbrennan333 @gerashegalov @abellina I think I have addressed all of the review comments.

@revans2
Copy link
Collaborator Author

revans2 commented Nov 21, 2023

build

1 similar comment
@revans2
Copy link
Collaborator Author

revans2 commented Nov 21, 2023

build

gerashegalov
gerashegalov previously approved these changes Nov 21, 2023
Copy link
Collaborator

@gerashegalov gerashegalov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, my comment does not appear addressed but it does not rise to the level of blocking this PR

@revans2
Copy link
Collaborator Author

revans2 commented Nov 21, 2023

LGTM, my comment does not appear addressed but it does not rise to the level of blocking this PR

Sorry should be fixed now

@revans2
Copy link
Collaborator Author

revans2 commented Nov 21, 2023

build

gerashegalov
gerashegalov previously approved these changes Nov 21, 2023
Comment on lines 205 to 206
class task_metrics {
public:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no private then just use struct:

Suggested change
class task_metrics {
public:
struct task_metrics {

jbrennan333
jbrennan333 previously approved these changes Nov 27, 2023
@revans2
Copy link
Collaborator Author

revans2 commented Nov 27, 2023

build

1 similar comment
@revans2
Copy link
Collaborator Author

revans2 commented Nov 27, 2023

build

@revans2
Copy link
Collaborator Author

revans2 commented Nov 27, 2023

@ttnghia and @gerashegalov I think all of the review comments have been addressed. Please take another look.

gerashegalov
gerashegalov previously approved these changes Nov 27, 2023
@@ -14,11 +14,13 @@
* limitations under the License.
*/

#include <algorithm>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is known to be extremely heavy and we should avoid it if possible. What APIs do we need from it here?

Comment on lines 397 to 398
log_status("FIXUP", thread_id, found->second.task_id,
found->second.state, ss.str().c_str());
Copy link
Collaborator

@ttnghia ttnghia Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern ss.str().c_str() is used a lot here and can cause undefined behavior/crash when logging is enabled. That is because .str().c_str() is a pointer to a temporary string produced by .str() which can be destroyed immediately.

Suggested change
log_status("FIXUP", thread_id, found->second.task_id,
found->second.state, ss.str().c_str());
auto const log_str = ss.str();
log_status("FIXUP", thread_id, found->second.task_id,
found->second.state, log_str.c_str());

Please also take care of the remaining instances of this.

Copy link
Collaborator

@ttnghia ttnghia Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is, for short string message like this, just use std::to_string directly:

auto const log_str = std::string("desired task_id ") + std::string(task_id);
log_status("FIXUP", thread_id, found->second.task_id,
                  found->second.state, log_str.c_str());

We only need stringstream if we have to do a lot of string concatenations, or we have a for loop.

Comment on lines 1401 to 1402
auto this_id = static_cast<long>(pthread_self());
auto thread = threads.find(thread_id_to_wake);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Try to use auto const. All variables should be const if possible.

@@ -45,25 +90,16 @@ constexpr char const* SPLIT_AND_RETRY_OOM_CLASS = "com/nvidia/spark/rapids/jni/S
enum thread_state {
Copy link
Collaborator

@ttnghia ttnghia Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use enum class. In C++, pure enum can be implicitly converted into int and vice versa. That is error-prone.

The down size of using enum class is that, you have to add prefix thread_state:: to all enum values in the code.

Copy link
Collaborator

@ttnghia ttnghia Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course we still can explicitly convert from enum class to int, but that is always "explicit", i.e., intentional.

@@ -890,15 +1080,11 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
bool are_any_tasks_just_blocked = false;
for (auto thread = threads.begin(); thread != threads.end(); thread++) {
Copy link
Collaborator

@ttnghia ttnghia Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern also repeats a lot. We can do better with this:

Suggested change
for (auto thread = threads.begin(); thread != threads.end(); thread++) {
for (auto const& [thread_id, thread_state]: threads) {

So we won't use some vague names like thread->first, thread->second.

@@ -926,35 +1112,57 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
* returns true if the thread that ended was a normally running task thread.
* This should be used to decide if wake_up_threads_after_task_finishes is called or not.
*/
bool remove_thread_association(long thread_id, const std::unique_lock<std::mutex>& lock)
bool remove_thread_association(long thread_id, long remove_task_id, const std::unique_lock<std::mutex>& lock)
Copy link
Collaborator

@ttnghia ttnghia Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please put const after type (east const) to be consistent with the current repository style.

Copy link
Collaborator

@ttnghia ttnghia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a serious problem with pointer to temporary object (#1543 (comment)) so I block this until it is fixed.

Signed-off-by: Robert (Bobby) Evans <[email protected]>
@revans2 revans2 dismissed stale reviews from gerashegalov and jbrennan333 via 12d8011 November 28, 2023 19:40
@revans2
Copy link
Collaborator Author

revans2 commented Nov 28, 2023

@ttnghia please take another look I think I have fixed all of your comments.

@revans2
Copy link
Collaborator Author

revans2 commented Nov 28, 2023

build

@ttnghia
Copy link
Collaborator

ttnghia commented Nov 28, 2023

This is good now. You need to run pre-commit run --all-files (https://github.com/NVIDIA/spark-rapids-jni/blob/branch-24.02/CONTRIBUTING.md#c).

@revans2
Copy link
Collaborator Author

revans2 commented Nov 28, 2023

This is good now. You need to run pre-commit run --all-files (https://github.com/NVIDIA/spark-rapids-jni/blob/branch-24.02/CONTRIBUTING.md#c).

The pre-commit scripts do not work out of the box with the docker environment so trying to do formatting locally is way too difficult. I will continue to let CI do my pre-commit until it is fixed.

@revans2
Copy link
Collaborator Author

revans2 commented Nov 28, 2023

build

@revans2 revans2 merged commit e5cf1af into NVIDIA:branch-24.02 Nov 29, 2023
2 checks passed
@revans2 revans2 deleted the simplify_retry branch November 29, 2023 14:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants