diff --git a/tests/morpheus/stages/test_multi_processing_stage.py b/tests/morpheus/stages/test_multi_processing_stage.py index f88ec3d7d8..da9230dacd 100644 --- a/tests/morpheus/stages/test_multi_processing_stage.py +++ b/tests/morpheus/stages/test_multi_processing_stage.py @@ -172,7 +172,7 @@ def test_created_stage_pipe(config: Config, dataset_pandas: DatasetManager): expected_df = input_df.copy() expected_df["new_column"] = "Hello" - df_count = 100 + df_count = 10 df_generator = partial(pandas_dataframe_generator, dataset_pandas, df_count) partial_fn = partial(_process_df, column="new_column", value="Hello") @@ -225,7 +225,7 @@ def test_multiple_stages_pipe(config: Config, dataset_pandas: DatasetManager): expected_df["new_column_1"] = "new_value" expected_df["new_column_2"] = "Hello" - df_count = 100 + df_count = 10 df_generator = partial(pandas_dataframe_generator, dataset_pandas, df_count) partial_fn = partial(_process_df, column="new_column_1", value="new_value") diff --git a/tests/morpheus/utils/test_shared_process_pool.py b/tests/morpheus/utils/test_shared_process_pool.py index 0bca371bb0..7baa4ce1ae 100644 --- a/tests/morpheus/utils/test_shared_process_pool.py +++ b/tests/morpheus/utils/test_shared_process_pool.py @@ -149,43 +149,28 @@ def test_submit_single_task(shared_process_pool, a, b, expected): @pytest.mark.slow -def test_submit_task_with_invalid_stage(shared_process_pool): +def test_submit_invalid_tasks(shared_process_pool): pool = shared_process_pool + # submit_task() should raise ValueError if the stage does not exist with pytest.raises(ValueError): pool.submit_task("stage_does_not_exist", _add_task, 10, 20) - -@pytest.mark.slow -def test_submit_task_raises_exception(shared_process_pool): - - pool = shared_process_pool pool.set_usage("test_stage", 0.5) + # if the function raises exception, the task can be submitted and the exception will be raised when calling result() task = pool.submit_task("test_stage", _function_raises_exception) with pytest.raises(RuntimeError): task.result() - -@pytest.mark.slow -def test_submit_task_with_unserializable_result(shared_process_pool): - - pool = shared_process_pool - pool.set_usage("test_stage", 0.5) - + # if the function returns unserializable result, the task can be submitted and the exception will be raised + # when calling result() task = pool.submit_task("test_stage", _function_returns_unserializable_result) with pytest.raises(TypeError): task.result() - -@pytest.mark.slow -def test_submit_task_with_unserializable_arg(shared_process_pool): - - pool = shared_process_pool - pool.set_usage("test_stage", 0.5) - - # Unserializable arguments cannot be submitted to the pool + # Function with unserializable arguments cannot be submitted to the pool with pytest.raises(TypeError): pool.submit_task("test_stage", _arbitrary_function, threading.Lock()) @@ -207,7 +192,7 @@ def test_submit_multiple_tasks(shared_process_pool, a, b, expected): pool = shared_process_pool pool.set_usage("test_stage", 0.5) - num_tasks = 100 + num_tasks = 10 tasks = [] for _ in range(num_tasks): tasks.append(pool.submit_task("test_stage", _add_task, a, b))