diff --git a/tests/test_service.py b/tests/test_service.py index d4328e13b0..c128a480ed 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -137,7 +137,7 @@ def test_download_with_subdir(multishells, tmpdir, server_type_remote_process): subdir_filepath = dpf.core.upload_file(file, to_server_path, server=server_type_remote_process) folder = parent_path - out = dpf.core.download_files_in_folder(folder, str(tmpdir), server=server_type_remote_process) + _ = dpf.core.download_files_in_folder(folder, str(tmpdir), server=server_type_remote_process) p1 = tmpdir / filename p2 = tmpdir / "subdir" / filename # p1 = tmpdir + "/" + filename diff --git a/tests/test_session.py b/tests/test_session.py index b9b4e8074d..a2076badb7 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -20,7 +20,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -import os +from pathlib import Path import conftest import tempfile from ansys.dpf import core @@ -31,10 +31,10 @@ def get_log_file(log_path, server): if not isinstance(server, core.server_types.InProcessServer): core.core.download_file( log_path, - os.path.join(tempfile.gettempdir(), "log2.txt"), + str(Path(tempfile.gettempdir()) / "log2.txt"), server=server, ) - return os.path.join(tempfile.gettempdir(), "log2.txt") + return str(Path(tempfile.gettempdir()) / "log2.txt") else: return log_path @@ -47,14 +47,14 @@ def test_logging(tmpdir, server_type): examples.find_static_rst(return_local_path=True, server=server_type), server=server_type, ) - log_path = os.path.join(server_tmp, "log.txt") + log_path = Path(server_tmp) / "log.txt" else: - log_path = os.path.join(tmpdir, "log.txt") + log_path = Path(tmpdir) / "log.txt" result_file = examples.find_static_rst(server=server_type) # download it - new_tmpdir = os.path.join(tmpdir, "my_tmp_dir") - server_type.session.handle_events_with_file_logger(log_path, 2) + _ = Path(tmpdir) / "my_tmp_dir" + server_type.session.handle_events_with_file_logger(str(log_path), 2) wf = core.Workflow(server=server_type) wf.progress_bar = False @@ -65,13 +65,13 @@ def test_logging(tmpdir, server_type): wf.set_output_name("out", to_nodal.outputs.fields_container) wf.get_output("out", core.types.fields_container) - download_log_path = get_log_file(log_path, server_type) - assert os.path.exists(download_log_path) - file_size = os.path.getsize(download_log_path) + download_log_path = Path(get_log_file(str(log_path), server_type)) + assert download_log_path.exists() + file_size = download_log_path.stat().st_size assert file_size > 20 server_type._del_session() - download_log_path = get_log_file(log_path, server_type) - file_size = os.path.getsize(download_log_path) + download_log_path = Path(get_log_file(str(log_path), server_type)) + file_size = download_log_path.stat().st_size wf = core.Workflow(server=server_type) wf.progress_bar = False @@ -82,8 +82,8 @@ def test_logging(tmpdir, server_type): wf.set_output_name("out", to_nodal.outputs.fields_container) wf.get_output("out", core.types.fields_container) - download_log_path = get_log_file(log_path, server_type) - assert file_size == os.path.getsize(download_log_path) + download_log_path = Path(get_log_file(str(log_path), server_type)) + assert file_size == download_log_path.stat().st_size @conftest.raises_for_servers_version_under("6.1") @@ -93,8 +93,8 @@ def test_logging_remote(tmpdir, server_type_remote_process): examples.find_multishells_rst(return_local_path=True), server=server_type_remote_process, ) - log_path = os.path.join(server_tmp, "log.txt") - server_type_remote_process.session.handle_events_with_file_logger(log_path, 2) + log_path = Path(server_tmp) / "log.txt" + server_type_remote_process.session.handle_events_with_file_logger(str(log_path), 2) server_type_remote_process.session.start_emitting_rpc_log() wf = core.Workflow(server=server_type_remote_process) @@ -107,13 +107,13 @@ def test_logging_remote(tmpdir, server_type_remote_process): wf.set_output_name("out", to_nodal.outputs.fields_container) wf.get_output("out", core.types.fields_container) - download_log_path = get_log_file(log_path, server_type_remote_process) - assert os.path.exists(download_log_path) - file_size = os.path.getsize(download_log_path) + download_log_path = Path(get_log_file(str(log_path), server_type_remote_process)) + assert download_log_path.exists() + file_size = download_log_path.stat().st_size assert file_size > 3000 server_type_remote_process._del_session() - download_log_path = get_log_file(log_path, server_type_remote_process) - file_size = os.path.getsize(download_log_path) + download_log_path = Path(get_log_file(str(log_path), server_type_remote_process)) + file_size = download_log_path.stat().st_size wf = core.Workflow(server=server_type_remote_process) wf.progress_bar = False @@ -125,5 +125,5 @@ def test_logging_remote(tmpdir, server_type_remote_process): wf.set_output_name("out", to_nodal.outputs.fields_container) wf.get_output("out", core.types.fields_container) - download_log_path = get_log_file(log_path, server_type_remote_process) - assert file_size == os.path.getsize(download_log_path) + download_log_path = Path(get_log_file(str(log_path), server_type_remote_process)) + assert file_size == download_log_path.stat().st_size diff --git a/tests/test_streams_container.py b/tests/test_streams_container.py index 01e116345e..c2577aee54 100644 --- a/tests/test_streams_container.py +++ b/tests/test_streams_container.py @@ -20,7 +20,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -import os +from pathlib import Path import shutil from ansys import dpf @@ -34,27 +34,27 @@ def test_create_streams_container(server_in_process, simple_bar): def test_release_handles(server_in_process, simple_bar): - split = os.path.splitext(simple_bar) - copy_path = split[0] + "copy" + split[1] + simple_bar = Path(simple_bar) + copy_path = simple_bar.parent / (simple_bar.stem + "copy" + simple_bar.suffix) shutil.copyfile(simple_bar, copy_path) - model = dpf.core.Model(copy_path, server=server_in_process) + model = dpf.core.Model(str(copy_path), server=server_in_process) # Evaluate something from the rst _ = model.metadata.meshed_region streams_provider = model.metadata.streams_provider sc = streams_provider.outputs.streams_container() sc.release_handles() - os.remove(copy_path) + copy_path.unlink() def test_release_streams_model(server_in_process, simple_bar): - split = os.path.splitext(simple_bar) - copy_path = split[0] + "copy2" + split[1] + simple_bar = Path(simple_bar) + copy_path = simple_bar.parent / (simple_bar.stem + "copy2" + simple_bar.suffix) shutil.copyfile(simple_bar, copy_path) - model = dpf.core.Model(copy_path, server=server_in_process) + model = dpf.core.Model(str(copy_path), server=server_in_process) # Evaluate something from the rst _ = model.metadata.meshed_region model.metadata.release_streams() - os.remove(copy_path) + copy_path.unlink() def test_release_streams_model_empty(server_in_process): diff --git a/tests/test_workflow.py b/tests/test_workflow.py index 5576cbe555..0252645402 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -20,7 +20,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -import os +from pathlib import Path import numpy as np import pytest @@ -46,17 +46,17 @@ def test_create_workflow(server_type): def remove_dot_file(request): """Cleanup a testing directory once we are finished.""" - dot_path = os.path.join(os.getcwd(), "test.dot") - png_path = os.path.join(os.getcwd(), "test.png") - png_path1 = os.path.join(os.getcwd(), "test1.png") + dot_path = Path.cwd() / "test.dot" + png_path = Path.cwd() / "test.png" + png_path1 = Path.cwd() / "test1.png" def remove_files(): - if os.path.exists(dot_path): - os.remove(os.path.join(os.getcwd(), dot_path)) - if os.path.exists(png_path): - os.remove(os.path.join(os.getcwd(), png_path)) - if os.path.exists(png_path1): - os.remove(os.path.join(os.getcwd(), png_path1)) + if dot_path.exists(): + dot_path.unlink() + if png_path.exists(): + png_path.unlink() + if png_path1.exists(): + png_path1.unlink() request.addfinalizer(remove_files) @@ -77,11 +77,11 @@ def test_workflow_view(server_in_process, remove_dot_file): wf.connect_with(pre_wf, {"prewf_output": "wf_input"}) wf.view(off_screen=True, title="test1") - assert not os.path.exists("test1.dot") - assert os.path.exists("test1.png") + assert not Path("test1.dot").exists() + assert Path("test1.png").exists() wf.view(off_screen=True, save_as="test.png", keep_dot_file=True) - assert os.path.exists("test.dot") - assert os.path.exists("test.png") + assert Path("test.dot").exists() + assert Path("test.png").exists() def test_connect_field_workflow(server_type):