Skip to content

Commit

Permalink
Parallelize validation (#86)
Browse files Browse the repository at this point in the history
* Add CPU parallelization option to arguments

* Parallelize validation function over CPUs

* Add CPUs to ValidateArgs

* Update tests for validation

* Fix encoding test

* Bump version for official v4.0.0 release

* Remove unused import

* Update tests with Pool as context manager

* Update to use with

* Add CPUs option in the README and update installation instructions

* Fix link text

* Fix positive integer check

* Add tests for positive integer type

* Add basic libmagic installation steps

* Prepend PID to success and error printers

* Update README with updated parameter name

* Update argument in parser

* Update argument in validation

* Update tests
  • Loading branch information
yashpatel6 authored Nov 21, 2023
1 parent af334fe commit 2ed7792
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 34 deletions.
35 changes: 32 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,40 @@ The tool can be installed as a standalone command line tool. The following depen
|Python|3.10|
|VCFtools|0.1.16|

### Install directly from GitHub
Additionally, the `libmagic` C library must also be installed on the system.

### Installing `libmagic`

On Debian/Ubuntu, install through:
```Bash
sudo apt-get install libmagic-dev
```

On Mac, install through homebrew (https://brew.sh/):
```Bash
brew install libmagic
```

`libmagic` can also be installed through the `conda` package manager:
```Bash
conda install -c conda-forge libmagic
```

With the dependencies (and the proper versions) installed, install `pipeval` through one of the options below:

### Install directly from GitHub through SSH
```Bash
pip install git+ssh://[email protected]/uclahs-cds/package-PipeVal.git
```

### Install directly from GitHub through HTTPS
```Bash
pip install git+https://[email protected]/uclahs-cds/package-PipeVal.git
```

### Install from cloned repository
```Bash
<clone the PipeVal GitHub repository>
cd </path/to/cloned/repository>
pip install .
```
Expand All @@ -55,6 +82,8 @@ options:
-v, --version show program's version number and exit
-r CRAM_REFERENCE, --cram-reference CRAM_REFERENCE
Path to reference file for CRAM
-p PROCESSES, --processes PROCESSES
Number of processes to run in parallel when validating multiple files
```

The tool will attempt to automatically detect the file type based on extension and perform the approriate validations. The tool will also perform an existence check along with a checksum check if an MD5 or SHA512 checksum exists regardless of file type.
Expand Down Expand Up @@ -130,8 +159,8 @@ pytest
## Discussions

- [Issue tracker](https://github.com/uclahs-cds/package-PipeVal/issues) to report errors and enhancement ideas.
- Discussions can take place in [tool-NF-test Discussions](https://github.com/uclahs-cds/package-PipeVal/discussions)
- [tool-NF-test pull requests](https://github.com/uclahs-cds/package-PipeVal/pulls) are also open for discussion
- Discussions can take place in [package-PipeVal Discussions](https://github.com/uclahs-cds/package-PipeVal/discussions)
- [package-PipeVal pull requests](https://github.com/uclahs-cds/package-PipeVal/pulls) are also open for discussion

## Contributors

Expand Down
2 changes: 1 addition & 1 deletion test/unit/test_generate_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def test__write_checksum_file__writes_proper_checksum(mock_path, mock_write_open

_write_checksum_file(mock_path, hash_type, computed_hash)

mock_write_open.assert_called_once_with(f'{file_path}.{hash_type}', 'w')
mock_write_open.assert_called_once_with(f'{file_path}.{hash_type}', 'w', encoding='utf-8')

handle = mock_write_open()
handle.write.assert_called_once_with(f'{computed_hash} {file_path}\n')
Expand Down
76 changes: 64 additions & 12 deletions test/unit/test_validate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=C0116
# pylint: disable=C0114
from pathlib import Path
from argparse import Namespace, ArgumentTypeError
from unittest.mock import Mock
import warnings
import mock
Expand Down Expand Up @@ -28,10 +29,31 @@
_detect_file_type_and_extension,
_check_extension,
run_validate,
_validate_file
_validate_file,
_validation_worker
)
from validate.__main__ import positive_integer
from validate.validate_types import ValidateArgs

def test__positive_integer__returns_correct_integer():
expected_number = 2
number_str = '2'
assert expected_number == positive_integer(number_str)

@pytest.mark.parametrize(
'number_str',
[
('-2'),
('0'),
('1.2'),
('number'),
('')
]
)
def test__positive_integer__fails_non_positive_integers(number_str):
with pytest.raises(ArgumentTypeError):
positive_integer(number_str)

@pytest.mark.parametrize(
'expected_extension, expected_file_type',
[
Expand Down Expand Up @@ -206,10 +228,8 @@ def test__validate_vcf_file__passes_vcf_validation(mock_call):

_validate_vcf_file('some/file')

@mock.patch('validate.validate._print_success')
def test__run_validate__passes_validation_no_files(mock_print_success):
test_args = ValidateArgs(path=[], cram_reference=None)
mock_print_success.return_value = ''
def test__run_validate__passes_validation_no_files():
test_args = ValidateArgs(path=[], cram_reference=None, processes=1)
run_validate(test_args)

@pytest.mark.parametrize(
Expand All @@ -225,19 +245,47 @@ def test__run_validate__passes_validation_no_files(mock_print_success):
@mock.patch('validate.validate._validate_file')
@mock.patch('validate.validate._print_error')
@mock.patch('validate.validate.Path.resolve')
def test__run_validate__fails_with_failing_checks(
def test___validation_worker__fails_with_failing_checks(
mock_path_resolve,
mock_print_error,
mock_validate_file,
mock_detect_file_type_and_extension,
test_exception):
test_args = ValidateArgs(path=['some/path'], cram_reference=None)
mock_path_resolve.return_value = 'some/path'
test_path = 'some/path'
test_args = ValidateArgs(path=[test_path], cram_reference=None, processes=1)
mock_path_resolve.return_value = test_path
mock_validate_file.side_effect = test_exception
mock_detect_file_type_and_extension.return_value = ('', '')
mock_print_error.return_value = ''

assert not _validation_worker(test_path, test_args)

@mock.patch('validate.validate.Path.resolve', autospec=True)
@mock.patch('validate.validate.multiprocessing.Pool')
def test__run_validate__passes_on_all_valid_files(
mock_pool,
mock_path_resolve
):
test_path = 'some/path'
test_args = ValidateArgs(path=[test_path], cram_reference=None, processes=1)

mock_path_resolve.return_value = None
mock_pool.return_value.__enter__.return_value = Namespace(starmap=lambda y, z: [True])

run_validate(test_args)

@mock.patch('validate.validate.Path.resolve', autospec=True)
@mock.patch('validate.validate.multiprocessing.Pool')
def test__run_validate__fails_with_failing_file(
mock_pool,
mock_path_resolve):
test_path = 'some/path'
test_args = ValidateArgs(path=[test_path], cram_reference=None, processes=1)
expected_code = 1

mock_path_resolve.return_value = None
mock_pool.return_value.__enter__.return_value = Namespace(starmap=lambda y, z: [False])

with pytest.raises(SystemExit) as pytest_exit:
run_validate(test_args)
assert pytest_exit.value.code == expected_code
Expand Down Expand Up @@ -280,7 +328,9 @@ def test__run_validate__fails_on_unresolvable_symlink(mock_path_resolve):
expected_error = FileNotFoundError
mock_path_resolve.side_effect = expected_error

test_args = ValidateArgs(path=['some/path'], cram_reference=None)
test_path = 'some/path'

test_args = ValidateArgs(path=[test_path], cram_reference=None, processes=1)

with pytest.raises(expected_error):
run_validate(test_args)
Expand All @@ -289,7 +339,7 @@ def test__run_validate__fails_on_unresolvable_symlink(mock_path_resolve):
@mock.patch('validate.validate._detect_file_type_and_extension')
@mock.patch('validate.validate._validate_file')
@mock.patch('validate.validate._print_success')
def test__run_validate__passes_proper_validation(
def test___validation_worker__passes_proper_validation(
mock_print_success,
mock_validate_file,
mock_detect_file_type_and_extension,
Expand All @@ -299,6 +349,8 @@ def test__run_validate__passes_proper_validation(
mock_validate_file.return_value = None
mock_path_resolve.return_value = None

test_args = ValidateArgs(path=['some/path'], cram_reference=None)
test_path = 'some/path'

run_validate(test_args)
test_args = ValidateArgs(path=[test_path], cram_reference=None, processes=1)

_validation_worker(test_path, test_args)
2 changes: 1 addition & 1 deletion validate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
'''Inits validate module'''

__version__ = '4.0.0-rc.2'
__version__ = '4.0.0'
14 changes: 14 additions & 0 deletions validate/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@
from validate import __version__
from validate.validate import run_validate

def positive_integer(arg):
""" Type and value check for positive integers """
try:
i = int(arg)
except ValueError as value_exception:
raise argparse.ArgumentTypeError("Must be an integer.") from value_exception

if i < 1:
raise argparse.ArgumentTypeError("Must be an integer greater than 0.")

return i

def _parse_args():
""" Parse arguments """
parser = argparse.ArgumentParser()
parser.add_argument('path', help='one or more paths of files to validate', type=str, nargs='+')
parser.add_argument('-v', '--version', action='version', version=f'%(prog)s {__version__}')
parser.add_argument('-r', '--cram-reference', default=None, \
help='Path to reference file for CRAM')
parser.add_argument('-p', '--processes', type=positive_integer, default=1, \
help='Number of processes to run in parallel when validating multiple files')

parser.set_defaults(func=run_validate)

Expand Down
40 changes: 24 additions & 16 deletions validate/validate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
''' File validation functions '''
from pathlib import Path
import sys
import os
from typing import Dict, Union
import multiprocessing
from itertools import repeat

from validate.validators.bam import _check_bam
from validate.validators.sam import _check_sam
Expand Down Expand Up @@ -57,11 +60,11 @@ def _validate_file(

def _print_error(path:Path, err:BaseException):
''' Prints error message '''
print(f'Error: {str(path)} {str(err)}')
print(f'PID:{os.getpid()} - Error: {str(path)} {str(err)}')

def _print_success(path:Path, file_type:str):
''' Prints success message '''
print(f'Input: {path} is valid {file_type}')
print(f'PID:{os.getpid()} - Input: {path} is valid {file_type}')

def _detect_file_type_and_extension(path:Path):
''' File type and extension detection '''
Expand All @@ -86,27 +89,32 @@ def _check_extension(extension:str):

return UNKNOWN_FILE_TYPE

def _validation_worker(path: Path, args:Union[ValidateArgs,Dict[str, Union[str,list]]]):
''' Worker function to validate a single file '''
try:
file_type, file_extension = _detect_file_type_and_extension(path)
_validate_file(path, file_type, file_extension, args)
except FileNotFoundError as file_not_found_err:
print(f"Warning: {str(path)} {str(file_not_found_err)}")
except (TypeError, ValueError, IOError, OSError) as err:
_print_error(path, err)
return False

_print_success(path, file_type)
return True

def run_validate(args:Union[ValidateArgs,Dict[str, Union[str,list]]]):
''' Function to validate file(s)
`args` must contain the following:
`path` is a required argument with a value of list of files
`cram_reference` is a required argument with either a string value or None
'''

all_files_pass = True

for path in [Path(pathname).resolve(strict=True) for pathname in args.path]:
try:
file_type, file_extension = _detect_file_type_and_extension(path)
_validate_file(path, file_type, file_extension, args)
except FileNotFoundError as file_not_found_err:
print(f"Warning: {str(path)} {str(file_not_found_err)}")
except (TypeError, ValueError, IOError, OSError) as err:
all_files_pass = False
_print_error(path, err)
continue
num_parallel = min(args.processes, multiprocessing.cpu_count())

_print_success(path, file_type)
with multiprocessing.Pool(num_parallel) as parallel_pool:
validation_results = parallel_pool.starmap(_validation_worker, \
zip([Path(pathname).resolve(strict=True) for pathname in args.path], repeat(args)))

if not all_files_pass:
if not all(validation_results):
sys.exit(1)
2 changes: 1 addition & 1 deletion validate/validate_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

ValidateArgs = namedtuple(
'args',
'path, cram_reference'
'path, cram_reference, processes'
)

0 comments on commit 2ed7792

Please sign in to comment.