From 6e6409daa3badf757eb5b3b7a7d468eec8c3d806 Mon Sep 17 00:00:00 2001 From: panxuchen Date: Tue, 7 May 2024 20:19:38 +0800 Subject: [PATCH] add ray testcase --- data_juicer/core/ray_executor.py | 127 +++++++++--------- data_juicer/utils/unittest_utils.py | 49 +++++-- tests/ops/filter/test_alphanumeric_filter.py | 12 +- .../ops/filter/test_audio_duration_filter.py | 47 ++++--- 4 files changed, 142 insertions(+), 93 deletions(-) diff --git a/data_juicer/core/ray_executor.py b/data_juicer/core/ray_executor.py index d42d72f95..3cf7acbb3 100644 --- a/data_juicer/core/ray_executor.py +++ b/data_juicer/core/ray_executor.py @@ -102,6 +102,70 @@ def get_num_gpus(self, op, op_proc): proc_per_gpu = op_proc / cuda_device_count() return 1.0 / proc_per_gpu + def run_op(self, op, op_cfg, dataset): + op_name, op_args = list(op_cfg.items())[0] + op_cls = OPERATORS.modules[op_name] + op_proc = calculate_np(self.cfg.np, op, op_name) + num_gpus = self.get_num_gpus(op, op_proc) + use_actor = op.use_actor() or num_gpus + try: + if isinstance(op, Mapper): + if op.is_batched_op(): + if use_actor: + dataset = dataset.map_batches( + op_cls, + compute=ActorPoolStrategy(), + concurrency=op_proc, + fn_constructor_kwargs=op_args, + batch_format='pyarrow', + num_gpus=num_gpus, + batch_size=1) + # The batch size here is same as in data.py + else: + dataset = dataset.map_batches( + partial(ray_batch_mapper_wrapper, + fn=op.process), + batch_format='pyarrow', + num_gpus=num_gpus, + batch_size=1) + # The batch size here is same as in data.py + else: + if use_actor: + dataset = dataset.map( + op_cls, + compute=ActorPoolStrategy(), + concurrency=op_proc, + fn_constructor_kwargs=op_args, + num_gpus=num_gpus) + else: + dataset = dataset.map(op.process, + num_gpus=num_gpus) + + elif isinstance(op, Filter): + if use_actor: + dataset = dataset.map(op_cls, + compute=ActorPoolStrategy(), + concurrency=op_proc, + fn_constructor_kwargs=op_args, + num_gpus=num_gpus) + else: + dataset = dataset.map(op.compute_stats, + num_gpus=num_gpus) + if op.stats_export_path is not None: + dataset.write_json(op.stats_export_path, + force_ascii=False) + dataset = dataset.filter(op.process) + else: + logger.error( + 'Ray executor only support Filter and Mapper OPs for ' + 'now') + raise NotImplementedError + except: # noqa: E722 + logger.error(f'An error occurred during Op [{op_name}].') + import traceback + traceback.print_exc() + exit(1) + def run(self, load_data_np=None): """ Running the dataset process pipeline. @@ -140,68 +204,7 @@ def process_batch_arrow(table: pa.Table) -> pa.Table: logger.info('Processing data...') tstart = time.time() for op_cfg, op in zip(self.process_list, self.ops): - op_name, op_args = list(op_cfg.items())[0] - op_cls = OPERATORS.modules[op_name] - op_proc = calculate_np(self.cfg.np, op, op_name) - num_gpus = self.get_num_gpus(op, op_proc) - use_actor = op.use_actor() or num_gpus - try: - if isinstance(op, Mapper): - if op.is_batched_op(): - if use_actor: - dataset = dataset.map_batches( - op_cls, - compute=ActorPoolStrategy(), - concurrency=op_proc, - fn_constructor_kwargs=op_args, - batch_format='pyarrow', - num_gpus=num_gpus, - batch_size=1) - # The batch size here is same as in data.py - else: - dataset = dataset.map_batches( - partial(ray_batch_mapper_wrapper, - fn=op.process), - batch_format='pyarrow', - num_gpus=num_gpus, - batch_size=1) - # The batch size here is same as in data.py - else: - if use_actor: - dataset = dataset.map( - op_cls, - compute=ActorPoolStrategy(), - concurrency=op_proc, - fn_constructor_kwargs=op_args, - num_gpus=num_gpus) - else: - dataset = dataset.map(op.process, - num_gpus=num_gpus) - - elif isinstance(op, Filter): - if use_actor: - dataset = dataset.map(op_cls, - compute=ActorPoolStrategy(), - concurrency=op_proc, - fn_constructor_kwargs=op_args, - num_gpus=num_gpus) - else: - dataset = dataset.map(op.compute_stats, - num_gpus=num_gpus) - if op.stats_export_path is not None: - dataset.write_json(op.stats_export_path, - force_ascii=False) - dataset = dataset.filter(op.process) - else: - logger.error( - 'Ray executor only support Filter and Mapper OPs for ' - 'now') - raise NotImplementedError - except: # noqa: E722 - logger.error(f'An error occurred during Op [{op_name}].') - import traceback - traceback.print_exc() - exit(1) + dataset = self.run_op(op, op_cfg, dataset) # 4. data export logger.info('Exporting dataset to disk...') diff --git a/data_juicer/utils/unittest_utils.py b/data_juicer/utils/unittest_utils.py index 0b8acda58..546a28dc3 100644 --- a/data_juicer/utils/unittest_utils.py +++ b/data_juicer/utils/unittest_utils.py @@ -2,7 +2,9 @@ import shutil import unittest +import numpy import ray.data as rd +import pyarrow as pa from datasets import Dataset from data_juicer.ops import Filter @@ -50,22 +52,32 @@ def tearDownClass(cls, hf_model_name=None) -> None: print('CLEAN all TRANSFORMERS_CACHE') shutil.rmtree(transformers.TRANSFORMERS_CACHE) - def generate_dataset(cls, data, type='standalone'): + def generate_dataset(self, data): """Generate dataset for a specific executor. Args: - type (str, optional): `hf` or `ray`. Defaults to "hf". + type (str, optional): "standalone" or "ray". Defaults to "standalone". """ - if type.startswith('standalone'): + if self.current_tag.startswith('standalone'): return Dataset.from_list(data) - elif type.startswith('ray'): - return rd.from_items(data) + elif self.current_tag.startswith('ray'): + dataset = rd.from_items(data) + if Fields.stats not in dataset.columns(fetch_if_missing=False): + def process_batch_arrow(table: pa.Table) -> pa.Table: + new_column_data = [{} for _ in range(len(table))] + new_talbe = table.append_column(Fields.stats, + [new_column_data]) + return new_talbe + + dataset = dataset.map_batches(process_batch_arrow, + batch_format='pyarrow') + return dataset else: raise ValueError('Unsupported type') - def run_single_op(cls, dataset, op, column_names, type='standalone'): + def run_single_op(self, dataset, op, column_names): """Run operator in the specific executor.""" - if type.startswith('standalone'): + if self.current_tag.startswith('standalone'): if isinstance(op, Filter) and Fields.stats not in dataset.features: dataset = dataset.add_column(name=Fields.stats, column=[{}] * dataset.num_rows) @@ -73,7 +85,26 @@ def run_single_op(cls, dataset, op, column_names, type='standalone'): dataset = dataset.filter(op.process) dataset = dataset.select_columns(column_names=column_names) return dataset.to_list() - elif type.startswith('ray'): - raise ValueError('Unsupported type') + elif self.current_tag.startswith('ray'): + dataset = dataset.map(op.compute_stats) + dataset = dataset.filter(op.process) + dataset = dataset.to_pandas().get(column_names) + if dataset is None: + return [] + return dataset.to_dict(orient="records") else: raise ValueError('Unsupported type') + + def assertDatasetEqual(self, first, second): + def convert_record(rec): + for key in rec.keys(): + # Convert incomparable `list` to comparable `tuple` + if isinstance(rec[key], numpy.ndarray) or isinstance(rec[key], list): + rec[key] = tuple(rec[key]) + return rec + + first = [convert_record(d) for d in first] + second = [convert_record(d) for d in second] + first = sorted(first, key=lambda x: tuple(sorted(x.items()))) + second = sorted(second, key=lambda x: tuple(sorted(x.items()))) + return self.assertEqual(first, second) \ No newline at end of file diff --git a/tests/ops/filter/test_alphanumeric_filter.py b/tests/ops/filter/test_alphanumeric_filter.py index ec0115942..594432207 100644 --- a/tests/ops/filter/test_alphanumeric_filter.py +++ b/tests/ops/filter/test_alphanumeric_filter.py @@ -38,10 +38,10 @@ def test_case(self): }, { 'text': 'emoji表情测试下😊,😸31231\n' }] - dataset = DataJuicerTestCaseBase.generate_dataset(ds_list) + dataset = self.generate_dataset(ds_list) op = AlphanumericFilter(min_ratio=0.2, max_ratio=0.9) - result = DataJuicerTestCaseBase.run_single_op(dataset, op, ["text"], self.current_tag) - self.assertEqual(result, tgt_list) + result = self.run_single_op(dataset, op, ["text"]) + self.assertDatasetEqual(result, tgt_list) @TEST_TAG("standalone", "ray") def test_token_case(self): @@ -66,10 +66,10 @@ def test_token_case(self): }, { 'text': 'Do you need a cup of coffee?' }] - dataset = DataJuicerTestCaseBase.generate_dataset(ds_list) + dataset = self.generate_dataset(ds_list) op = AlphanumericFilter(tokenization=True, min_ratio=1.5) - result = DataJuicerTestCaseBase.run_single_op(dataset, op, ["text"], self.current_tag) - self.assertEqual(result, tgt_list) + result = self.run_single_op(dataset, op, ["text"]) + self.assertDatasetEqual(result, tgt_list) if __name__ == '__main__': diff --git a/tests/ops/filter/test_audio_duration_filter.py b/tests/ops/filter/test_audio_duration_filter.py index 91a39bfd8..f7363969d 100644 --- a/tests/ops/filter/test_audio_duration_filter.py +++ b/tests/ops/filter/test_audio_duration_filter.py @@ -5,7 +5,7 @@ from data_juicer.ops.filter.audio_duration_filter import AudioDurationFilter from data_juicer.utils.constant import Fields -from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG class AudioDurationFilterTest(DataJuicerTestCaseBase): @@ -30,6 +30,7 @@ def _run_audio_duration_filter(self, res_list = dataset.to_list() self.assertEqual(res_list, target_list) + @TEST_TAG("standalone", "ray") def test_default_filter(self): ds_list = [{ @@ -46,10 +47,13 @@ def test_default_filter(self): }, { 'audios': [self.aud3_path] }] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter() - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + + @TEST_TAG("standalone", "ray") def test_filter_long_audios(self): ds_list = [{ @@ -60,10 +64,12 @@ def test_filter_long_audios(self): 'audios': [self.aud3_path] }] tgt_list = [{'audios': [self.aud1_path]}] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(max_duration=10) - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_filter_short_audios(self): ds_list = [{ @@ -74,10 +80,12 @@ def test_filter_short_audios(self): 'audios': [self.aud3_path] }] tgt_list = [{'audios': [self.aud3_path]}] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=60) - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_filter_audios_within_range(self): ds_list = [{ @@ -88,12 +96,13 @@ def test_filter_audios_within_range(self): 'audios': [self.aud3_path] }] tgt_list = [{'audios': [self.aud2_path]}] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=10, max_duration=20) - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_any(self): - ds_list = [{ 'audios': [self.aud1_path, self.aud2_path] }, { @@ -106,12 +115,15 @@ def test_any(self): }, { 'audios': [self.aud2_path, self.aud3_path] }] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=10, max_duration=20, any_or_all='any') - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + print(result) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_all(self): ds_list = [{ @@ -122,12 +134,14 @@ def test_all(self): 'audios': [self.aud1_path, self.aud3_path] }] tgt_list = [] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=10, max_duration=20, any_or_all='all') - self._run_audio_duration_filter(dataset, tgt_list, op) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) + @TEST_TAG("standalone", "ray") def test_filter_in_parallel(self): ds_list = [{ @@ -138,9 +152,10 @@ def test_filter_in_parallel(self): 'audios': [self.aud3_path] }] tgt_list = [{'audios': [self.aud2_path]}] - dataset = Dataset.from_list(ds_list) + dataset = self.generate_dataset(ds_list) op = AudioDurationFilter(min_duration=10, max_duration=20) - self._run_audio_duration_filter(dataset, tgt_list, op, np=2) + result = self.run_single_op(dataset, op, [op.audio_key]) + self.assertDatasetEqual(result, tgt_list) if __name__ == '__main__':