Skip to content

Commit

Permalink
add linter
Browse files Browse the repository at this point in the history
  • Loading branch information
JohannesGawron committed Oct 31, 2024
1 parent 755c7b4 commit 7f02efa
Show file tree
Hide file tree
Showing 23 changed files with 1,348 additions and 1,050 deletions.
13 changes: 13 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
repos:
- repo: https://github.com/snakemake/snakefmt
rev: v0.10.2 # Replace by any tag/version ≥0.2.4 : https://github.com/snakemake/snakefmt/releases
hooks:
- id: snakefmt
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.4
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
26 changes: 15 additions & 11 deletions benchmarking_pipeline/launch_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
import argparse
from pathlib import Path
import subprocess
import tempfile


def parse_args():
parser = argparse.ArgumentParser(description="set a seed and run the pipeline")
parser.add_argument("--seed", type=int, help="seed")
parser.add_argument("--config_template",type = str, help = "Provide path to template config file")
parser = argparse.ArgumentParser(description='set a seed and run the pipeline')
parser.add_argument('--seed', type=int, help='seed')
parser.add_argument(
'--config_template', type=str, help='Provide path to template config file'
)
return parser.parse_args()


def main(args):
with open(args.config_template, 'r') as file:
config_content = file.read()

config_content = config_content.replace('__seed__', str(args.seed))

config_content = config_content.replace("__seed__", str(args.seed))


with open(Path(args.config_template).parent / f"config_{args.seed}.yaml", 'w') as file:
with open(
Path(args.config_template).parent / f'config_{args.seed}.yaml', 'w'
) as file:
file.write(config_content)

command = (
"source $(conda info --base)/etc/profile.d/conda.sh && conda activate snakemake; "
f"sbatch --time=24:00:00 --wrap=\"snakemake --cores 10 --configfile config/config_{args.seed}.yaml "
"--software-deployment-method conda --rerun-incomplete -p --keep-going --profile profiles/slurm/\""
'source $(conda info --base)/etc/profile.d/conda.sh && conda activate snakemake; '
f'sbatch --time=24:00:00 --wrap="snakemake --cores 10 --configfile config/config_{args.seed}.yaml '
'--software-deployment-method conda --rerun-incomplete -p --keep-going --profile profiles/slurm/"'
)

subprocess.run(command, shell=True, check=True, executable='/bin/bash')


if __name__ == '__main__':
args = parse_args()
main(args)
29 changes: 22 additions & 7 deletions benchmarking_pipeline/tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

script_directory = Path(__file__).resolve().parent

path = (script_directory.parent / "workflow" / "rules" / "common.smk").as_posix()
spec = importlib.util.spec_from_file_location("common", path)
path = (script_directory.parent / 'workflow' / 'rules' / 'common.smk').as_posix()
spec = importlib.util.spec_from_file_location('common', path)
target = importlib.util.module_from_spec(spec)
spec.loader.exec_module(target)
get_split_files = target.get_split_files
Expand All @@ -15,14 +15,29 @@
class TestCommon(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(TestCommon, self).__init__(*args, **kwargs)
self.sample_list = script_directory / "sample_list_test.txt"
self.sample_list = script_directory / 'sample_list_test.txt'

def test_get_split_files(self):
split_files = get_split_files(self.sample_list)
self.assertEqual(split_files, [self.sample_list.parent / "loom1_split1.loom", self.sample_list.parent / "loom1_split2.loom", self.sample_list.parent / "loom2_split1.loom",self.sample_list.parent / "loom2_split2.loom"])
self.assertEqual(
split_files,
[
self.sample_list.parent / 'loom1_split1.loom',
self.sample_list.parent / 'loom1_split2.loom',
self.sample_list.parent / 'loom2_split1.loom',
self.sample_list.parent / 'loom2_split2.loom',
],
)

def test_determine_number_of_different_donors(self):
self.assertEqual(target.determine_number_of_different_donors(script_directory / 'pools_test.txt'), "(0.0)", 4)
self.assertEqual(
target.determine_number_of_different_donors(
script_directory / 'pools_test.txt'
),
'(0.0)',
4,
)


if __name__ == '__main__':
unittest.main()
unittest.main()
79 changes: 66 additions & 13 deletions benchmarking_pipeline/tests/test_create_demultiplexing_scheme.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,85 @@

script_directory = Path(__file__).resolve().parent

path = (script_directory.parent / "workflow" / "scripts" / "create_demultiplexing_scheme.py").as_posix()
spec = importlib.util.spec_from_file_location("create_demultiplexing_scheme", path)
path = (
script_directory.parent / 'workflow' / 'scripts' / 'create_demultiplexing_scheme.py'
).as_posix()
spec = importlib.util.spec_from_file_location('create_demultiplexing_scheme', path)
target = importlib.util.module_from_spec(spec)
spec.loader.exec_module(target)
multiplexing_scheme_format2pool_format = target.multiplexing_scheme_format2pool_format
select_samples_for_pooling = target.select_samples_for_pooling
define_demultiplexing_scheme_optimal_case = target.define_demultiplexing_scheme_optimal_case
define_demultiplexing_scheme_optimal_case = (
target.define_demultiplexing_scheme_optimal_case
)


class TestCreateDemultiplexingScheme(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(TestCreateDemultiplexingScheme, self).__init__(*args, **kwargs)
self.multiplexing_scheme = {1: (0, 1, 0), 2: (0, 2, 0), 3: (1, 2, 0), 4: (0, 0, 0), 5: (1, 1, 0), 6: (2, 2, 0), 7: (0, 1, 1), 8: (0, 2, 1), 9: (1, 2, 1), 10: (0, 0, 1), 11: (1, 1, 1), 12: (2, 2, 1)}
self.multiplexing_scheme_pool_format = {(0, 0): [1,2, 4,-4], (1, 0): [-1,3,5,-5], (2, 0): [-2, -3, 6, -6], (0, 1): [7, 8, 10, -10], (1,1): [-7, 9, 11, -11], (2, 1): [-8, -9, 12, -12]}
self.mock_samples = ['/test/folder/loom1.loom', '/test/folder/loom2.loom', '/test/folder/loom3.loom']
self.multiplexing_scheme = {
1: (0, 1, 0),
2: (0, 2, 0),
3: (1, 2, 0),
4: (0, 0, 0),
5: (1, 1, 0),
6: (2, 2, 0),
7: (0, 1, 1),
8: (0, 2, 1),
9: (1, 2, 1),
10: (0, 0, 1),
11: (1, 1, 1),
12: (2, 2, 1),
}
self.multiplexing_scheme_pool_format = {
(0, 0): [1, 2, 4, -4],
(1, 0): [-1, 3, 5, -5],
(2, 0): [-2, -3, 6, -6],
(0, 1): [7, 8, 10, -10],
(1, 1): [-7, 9, 11, -11],
(2, 1): [-8, -9, 12, -12],
}
self.mock_samples = [
'/test/folder/loom1.loom',
'/test/folder/loom2.loom',
'/test/folder/loom3.loom',
]
self.mock_path = '/another/test/folder'
self.another_pool_scheme = {(0,0): [1,-1,3], (1,0): [2,-2,-3]}
self.another_pool_scheme = {(0, 0): [1, -1, 3], (1, 0): [2, -2, -3]}

def test_multiplexing_scheme_format2pool_format(self):
self.assertEqual(multiplexing_scheme_format2pool_format(self.multiplexing_scheme), self.multiplexing_scheme_pool_format)

self.assertEqual(
multiplexing_scheme_format2pool_format(self.multiplexing_scheme),
self.multiplexing_scheme_pool_format,
)

def test_select_samples_for_pooling(self):
self.assertEqual(select_samples_for_pooling(self.another_pool_scheme, self.mock_path, self.mock_samples), {'(0.0)': ['/another/test/folder/loom1_split1.loom', '/another/test/folder/loom1_split2.loom', '/another/test/folder/loom3_split1.loom'],
'(1.0)': ['/another/test/folder/loom2_split1.loom', '/another/test/folder/loom2_split2.loom', '/another/test/folder/loom3_split2.loom']})
self.assertEqual(
select_samples_for_pooling(
self.another_pool_scheme, self.mock_path, self.mock_samples
),
{
'(0.0)': [
'/another/test/folder/loom1_split1.loom',
'/another/test/folder/loom1_split2.loom',
'/another/test/folder/loom3_split1.loom',
],
'(1.0)': [
'/another/test/folder/loom2_split1.loom',
'/another/test/folder/loom2_split2.loom',
'/another/test/folder/loom3_split2.loom',
],
},
)

def test_define_demultiplexing_scheme_optimal_case(self):
self.assertEqual(define_demultiplexing_scheme_optimal_case(maximal_number_of_samples = 3, maximal_pool_size = 2, n_samples = 3), {1: (0, 1, 0), 2: (0, 0, 0), 3: (1, 1, 0)})
self.assertEqual(
define_demultiplexing_scheme_optimal_case(
maximal_number_of_samples=3, maximal_pool_size=2, n_samples=3
),
{1: (0, 1, 0), 2: (0, 0, 0), 3: (1, 1, 0)},
)


if __name__ == '__main__':
unittest.main()
unittest.main()
65 changes: 39 additions & 26 deletions benchmarking_pipeline/tests/test_split_loom_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@
import numpy as np
import loompy

logging.basicConfig(format="{asctime} - {levelname} - {message}", style="{", datefmt="%Y-%m-%d %H:%M",level=logging.DEBUG)
logging.basicConfig(
format='{asctime} - {levelname} - {message}',
style='{',
datefmt='%Y-%m-%d %H:%M',
level=logging.DEBUG,
)

script_directory = Path(__file__).resolve().parent

split_loom_file_path = (script_directory.parent / "workflow" / "scripts" / "split_loom_files.py").as_posix()
spec = importlib.util.spec_from_file_location("split_loom_file", split_loom_file_path)
split_loom_file_path = (
script_directory.parent / 'workflow' / 'scripts' / 'split_loom_files.py'
).as_posix()
spec = importlib.util.spec_from_file_location('split_loom_file', split_loom_file_path)
target = importlib.util.module_from_spec(spec)
spec.loader.exec_module(target)
split_loom_file = target.split_loom_file
Expand All @@ -20,36 +27,42 @@
class TestSplitLoomFiles(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(TestSplitLoomFiles, self).__init__(*args, **kwargs)
self.filename = "split_loom_data_test.loom"
#matrix = np.arange(4).reshape(2,2)
#row_attrs = { "SomeRowAttr": np.arange(2), "OtherRowAttr": ['A','B'] }
#col_attrs = { "SomeColAttr": np.arange(2), "OtherColAttr": ['C','D'] }
#other_layer = np.arange(4, 8).reshape(2,2)
#loompy.create((script_directory / self.filename).as_posix(), layers = {"":matrix, "other": other_layer}, row_attrs = row_attrs, col_attrs = col_attrs)
self.filename = 'split_loom_data_test.loom'
# matrix = np.arange(4).reshape(2,2)
# row_attrs = { "SomeRowAttr": np.arange(2), "OtherRowAttr": ['A','B'] }
# col_attrs = { "SomeColAttr": np.arange(2), "OtherColAttr": ['C','D'] }
# other_layer = np.arange(4, 8).reshape(2,2)
# loompy.create((script_directory / self.filename).as_posix(), layers = {"":matrix, "other": other_layer}, row_attrs = row_attrs, col_attrs = col_attrs)

def test_split_loom_files(self):
loom_file = script_directory / self.filename
split_loom_file(0.5, script_directory / self.filename, loom_file.parent, seed = 42)
with loompy.connect(script_directory / 'temp' / f'{Path(self.filename).stem}_split1.loom') as ds:
split_loom_file(
0.5, script_directory / self.filename, loom_file.parent, seed=42
)
with loompy.connect(
script_directory / 'temp' / f'{Path(self.filename).stem}_split1.loom'
) as ds:
self.assertEqual(ds.shape[1], 1)
self.assertEqual(ds.shape[0], 2)
self.assertTrue(np.all(ds[:,:] == np.array([[0], [2]])))
self.assertTrue(np.all(ds.layers["other"][:,:] == np.array([[4], [6]])))
self.assertTrue(np.all(ds.ca["SomeColAttr"] == np.array([0])))
self.assertTrue(np.all(ds.ca["OtherColAttr"] == np.array(['C'])))
self.assertTrue(np.all(ds.ra["SomeRowAttr"] == np.array([0, 1])))
self.assertTrue(np.all(ds.ra["OtherRowAttr"] == np.array(['A', 'B'])))

with loompy.connect(script_directory / 'temp' / f'{Path(self.filename).stem}_split2.loom') as ds:
self.assertTrue(np.all(ds[:, :] == np.array([[0], [2]])))
self.assertTrue(np.all(ds.layers['other'][:, :] == np.array([[4], [6]])))
self.assertTrue(np.all(ds.ca['SomeColAttr'] == np.array([0])))
self.assertTrue(np.all(ds.ca['OtherColAttr'] == np.array(['C'])))
self.assertTrue(np.all(ds.ra['SomeRowAttr'] == np.array([0, 1])))
self.assertTrue(np.all(ds.ra['OtherRowAttr'] == np.array(['A', 'B'])))

with loompy.connect(
script_directory / 'temp' / f'{Path(self.filename).stem}_split2.loom'
) as ds:
self.assertEqual(ds.shape[1], 1)
self.assertEqual(ds.shape[0], 2)
self.assertTrue(np.all(ds[:,:] == np.array([[1], [3]])))
self.assertTrue(np.all(ds.layers["other"][:,:] == np.array([[5], [7]])))
self.assertTrue(np.all(ds.ca["SomeColAttr"] == np.array([1])))
self.assertTrue(np.all(ds.ca["OtherColAttr"] == np.array(['D'])))
self.assertTrue(np.all(ds.ra["SomeRowAttr"] == np.array([0, 1])))
self.assertTrue(np.all(ds.ra["OtherRowAttr"] == np.array(['A', 'B'])))
self.assertTrue(np.all(ds[:, :] == np.array([[1], [3]])))
self.assertTrue(np.all(ds.layers['other'][:, :] == np.array([[5], [7]])))
self.assertTrue(np.all(ds.ca['SomeColAttr'] == np.array([1])))
self.assertTrue(np.all(ds.ca['OtherColAttr'] == np.array(['D'])))
self.assertTrue(np.all(ds.ra['SomeRowAttr'] == np.array([0, 1])))
self.assertTrue(np.all(ds.ra['OtherRowAttr'] == np.array(['A', 'B'])))


if __name__ == '__main__':
unittest.main()

Loading

0 comments on commit 7f02efa

Please sign in to comment.