Skip to content

Commit

Permalink
Improve SharedProcessPool tests performance (#1950)
Browse files Browse the repository at this point in the history
Some minor improvements to make `SharedProcessPool` related tests to run faster. Those are mostly combining multiple tests into one unit test function to reduce the number of times that the process pool being initialized and reset.

The total execution time of `test_shared_process_pool.py` and `test_multi_processing_stage.py` should be improved from ~60s to ~30s (when running locally).

Closes #1951 

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Yuchen Zhang (https://github.com/yczhang-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1950
  • Loading branch information
yczhang-nv authored Nov 22, 2024
1 parent 086dc21 commit e90bafe
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 24 deletions.
4 changes: 2 additions & 2 deletions tests/morpheus/stages/test_multi_processing_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_created_stage_pipe(config: Config, dataset_pandas: DatasetManager):
expected_df = input_df.copy()
expected_df["new_column"] = "Hello"

df_count = 100
df_count = 10
df_generator = partial(pandas_dataframe_generator, dataset_pandas, df_count)

partial_fn = partial(_process_df, column="new_column", value="Hello")
Expand Down Expand Up @@ -225,7 +225,7 @@ def test_multiple_stages_pipe(config: Config, dataset_pandas: DatasetManager):
expected_df["new_column_1"] = "new_value"
expected_df["new_column_2"] = "Hello"

df_count = 100
df_count = 10
df_generator = partial(pandas_dataframe_generator, dataset_pandas, df_count)

partial_fn = partial(_process_df, column="new_column_1", value="new_value")
Expand Down
29 changes: 7 additions & 22 deletions tests/morpheus/utils/test_shared_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,43 +149,28 @@ def test_submit_single_task(shared_process_pool, a, b, expected):


@pytest.mark.slow
def test_submit_task_with_invalid_stage(shared_process_pool):
def test_submit_invalid_tasks(shared_process_pool):

pool = shared_process_pool

# submit_task() should raise ValueError if the stage does not exist
with pytest.raises(ValueError):
pool.submit_task("stage_does_not_exist", _add_task, 10, 20)


@pytest.mark.slow
def test_submit_task_raises_exception(shared_process_pool):

pool = shared_process_pool
pool.set_usage("test_stage", 0.5)

# if the function raises exception, the task can be submitted and the exception will be raised when calling result()
task = pool.submit_task("test_stage", _function_raises_exception)
with pytest.raises(RuntimeError):
task.result()


@pytest.mark.slow
def test_submit_task_with_unserializable_result(shared_process_pool):

pool = shared_process_pool
pool.set_usage("test_stage", 0.5)

# if the function returns unserializable result, the task can be submitted and the exception will be raised
# when calling result()
task = pool.submit_task("test_stage", _function_returns_unserializable_result)
with pytest.raises(TypeError):
task.result()


@pytest.mark.slow
def test_submit_task_with_unserializable_arg(shared_process_pool):

pool = shared_process_pool
pool.set_usage("test_stage", 0.5)

# Unserializable arguments cannot be submitted to the pool
# Function with unserializable arguments cannot be submitted to the pool
with pytest.raises(TypeError):
pool.submit_task("test_stage", _arbitrary_function, threading.Lock())

Expand All @@ -207,7 +192,7 @@ def test_submit_multiple_tasks(shared_process_pool, a, b, expected):
pool = shared_process_pool
pool.set_usage("test_stage", 0.5)

num_tasks = 100
num_tasks = 10
tasks = []
for _ in range(num_tasks):
tasks.append(pool.submit_task("test_stage", _add_task, a, b))
Expand Down

0 comments on commit e90bafe

Please sign in to comment.