diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index 028f3cf79..0585ac8c4 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -464,6 +464,11 @@ def init_setup_from_cfg(cfg: Namespace): # check number of processes np sys_cpu_count = os.cpu_count() + if not cfg.np: + cfg.np = sys_cpu_count + logger.warning( + f'Number of processes `np` is not set, ' + f'set it to cpu count [{sys_cpu_count}] as default value.') if cfg.np > sys_cpu_count: logger.warning(f'Number of processes `np` is set as [{cfg.np}], which ' f'is larger than the cpu count [{sys_cpu_count}]. Due ' diff --git a/data_juicer/core/ray_data.py b/data_juicer/core/ray_data.py index 646d59a5d..568f88e41 100644 --- a/data_juicer/core/ray_data.py +++ b/data_juicer/core/ray_data.py @@ -122,15 +122,41 @@ def _run_single_op(self, op): batch_size = getattr(op, 'batch_size', 1) if op.is_batched_op() else 1 if isinstance(op, Mapper): - self.data = self.data.map_batches(op.process, - batch_size=batch_size, - batch_format='pyarrow', - num_gpus=num_gpus) + if op.use_cuda(): + op_kwargs = op._op_cfg[op._name] + self.data = self.data.map_batches( + op.__class__, + fn_args=None, + fn_kwargs=None, + fn_constructor_args=None, + fn_constructor_kwargs=op_kwargs, + batch_size=batch_size, + num_gpus=num_gpus, + concurrency=op_proc, + batch_format='pyarrow') + else: + self.data = self.data.map_batches(op.process, + batch_size=batch_size, + batch_format='pyarrow', + num_gpus=num_gpus) elif isinstance(op, Filter): - self.data = self.data.map_batches(op.compute_stats, - batch_size=batch_size, - batch_format='pyarrow', - num_gpus=num_gpus) + if op.use_cuda(): + op_kwargs = op._op_cfg[op._name] + self.data = self.data.map_batches( + op.__class__, + fn_args=None, + fn_kwargs=None, + fn_constructor_args=None, + fn_constructor_kwargs=op_kwargs, + batch_size=batch_size, + num_gpus=num_gpus, + concurrency=op_proc, + batch_format='pyarrow') + else: + self.data = self.data.map_batches(op.compute_stats, + batch_size=batch_size, + batch_format='pyarrow', + num_gpus=num_gpus) if op.stats_export_path is not None: self.data.write_json(op.stats_export_path, force_ascii=False) diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index 39e23d8e9..9e39c50ab 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -288,6 +288,9 @@ def __init_subclass__(cls, **kwargs): f'{cls.__name__}. Please implement {method_name}_single ' f'or {method_name}_batched.') + def __call__(self, *args, **kwargs): + return self.process(*args, **kwargs) + def process_batched(self, samples, *args, **kwargs): keys = samples.keys() first_key = next(iter(keys)) @@ -378,6 +381,9 @@ def __init_subclass__(cls, **kwargs): f'{cls.__name__}. Please implement {method_name}_single ' f'or {method_name}_batched.') + def __call__(self, *args, **kwargs): + return self.compute_stats(*args, **kwargs) + def compute_stats_batched(self, samples, *args, **kwargs): keys = samples.keys() num_samples = len(samples[Fields.stats]) diff --git a/data_juicer/utils/process_utils.py b/data_juicer/utils/process_utils.py index 33d0a9f68..0ebb1c9fc 100644 --- a/data_juicer/utils/process_utils.py +++ b/data_juicer/utils/process_utils.py @@ -57,16 +57,10 @@ def calculate_np(name, """Calculate the optimum number of processes for the given OP""" eps = 1e-9 # about 1 byte - if num_proc is None: - num_proc = psutil.cpu_count() - if use_cuda: + auto_num_proc = None cuda_mem_available = get_min_cuda_memory() / 1024 - op_proc = min( - num_proc, - math.floor(cuda_mem_available / (mem_required + eps)) * - cuda_device_count()) - if use_cuda and mem_required == 0: + if mem_required == 0: logger.warning(f'The required cuda memory of Op[{name}] ' f'has not been specified. ' f'Please specify the mem_required field in the ' @@ -74,15 +68,39 @@ def calculate_np(name, f'out of memory error. You can reference ' f'the mem_required field in the ' f'config_all.yaml file.') - if op_proc < 1.0: - logger.warning(f'The required cuda memory:{mem_required}GB might ' - f'be more than the available cuda memory:' - f'{cuda_mem_available}GB.' - f'This Op[{name}] might ' - f'require more resource to run.') + else: + auto_num_proc = math.floor( + cuda_mem_available / mem_required) * cuda_device_count() + if cuda_mem_available / mem_required < 1.0: + logger.warning( + f'The required cuda memory:{mem_required}GB might ' + f'be more than the available cuda memory:' + f'{cuda_mem_available}GB.' + f'This Op[{name}] might ' + f'require more resource to run.') + + if auto_num_proc and num_proc: + op_proc = min(auto_num_proc, num_proc) + if num_proc > auto_num_proc: + logger.warning( + f'The given num_proc: {num_proc} is greater than ' + f'the value {auto_num_proc} auto calculated based ' + f'on the mem_required of Op[{name}]. ' + f'Set the `num_proc` to {auto_num_proc}.') + elif not auto_num_proc and not num_proc: + op_proc = cuda_device_count() + logger.warning( + f'Both mem_required and num_proc of Op[{name}] are not set.' + f'Set the `num_proc` to number of GPUs {op_proc}.') + else: + op_proc = auto_num_proc if auto_num_proc else num_proc + op_proc = max(op_proc, 1) return op_proc else: + if num_proc is None: + num_proc = psutil.cpu_count() + op_proc = num_proc cpu_available = psutil.cpu_count() mem_available = psutil.virtual_memory().available diff --git a/tests/tools/test_process_data.py b/tests/tools/test_process_data.py index 1c923a87b..27b3b290b 100644 --- a/tests/tools/test_process_data.py +++ b/tests/tools/test_process_data.py @@ -4,19 +4,49 @@ import subprocess import tempfile import unittest +import uuid import yaml from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +def run_in_subprocess(cmd): + try: + with subprocess.Popen( + cmd, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) as return_info: + while True: + next_line = return_info.stdout.readline() + return_line = next_line.decode('utf-8', 'ignore').strip() + if return_line == '' and return_info.poll() != None: + break + if return_line != '': + print(return_line) + + err_lines = '' + while True: + next_line = return_info.stderr.readline() + return_line = next_line.decode('utf-8', 'ignore').strip() + if return_line == '' and return_info.poll() != None: + break + if return_line != '': + print(return_line) + err_lines += return_line + '\n' + + return_code = return_info.wait() + if return_code: + raise RuntimeError(err_lines) + except Exception as e: + raise e + + class ProcessDataTest(DataJuicerTestCaseBase): def setUp(self): super().setUp() self.tmp_dir = tempfile.TemporaryDirectory().name - if not osp.exists(self.tmp_dir): - os.makedirs(self.tmp_dir) + os.makedirs(self.tmp_dir, exist_ok=True) def tearDown(self): super().tearDown() @@ -66,5 +96,74 @@ def test_status_code_1(self): self.assertFalse(osp.exists(tmp_out_path)) +class ProcessDataRayTest(DataJuicerTestCaseBase): + + def setUp(self): + super().setUp() + + cur_dir = osp.dirname(osp.abspath(__file__)) + self.tmp_dir = osp.join(cur_dir, f'tmp_{uuid.uuid4().hex}') + os.makedirs(self.tmp_dir, exist_ok=True) + + def tearDown(self): + super().tearDown() + + if osp.exists(self.tmp_dir): + shutil.rmtree(self.tmp_dir) + + import ray + ray.shutdown() + + def test_ray_image(self): + tmp_yaml_file = osp.join(self.tmp_dir, 'config_0.yaml') + tmp_out_path = osp.join(self.tmp_dir, 'output_0.json') + text_keys = 'text' + + data_path = osp.join(osp.dirname(osp.dirname(osp.dirname(osp.realpath(__file__)))), + 'demos', 'data', 'demo-dataset-images.jsonl') + yaml_config = { + 'dataset_path': data_path, + 'executor_type': 'ray', + 'ray_address': 'auto', + 'text_keys': text_keys, + 'image_key': 'images', + 'export_path': tmp_out_path, + 'process': [ + { + 'image_nsfw_filter': { + 'hf_nsfw_model': 'Falconsai/nsfw_image_detection', + 'trust_remote_code': True, + 'score_threshold': 0.5, + 'any_or_all': 'any', + 'mem_required': '8GB' + }, + 'image_aspect_ratio_filter':{ + 'min_ratio': 0.5, + 'max_ratio': 2.0 + } + } + ] + } + + with open(tmp_yaml_file, 'w') as file: + yaml.dump(yaml_config, file) + + run_in_subprocess(f'python tools/process_data.py --config {tmp_yaml_file}') + + self.assertTrue(osp.exists(tmp_out_path)) + + from datasets import load_dataset + jsonl_files = [os.path.join(tmp_out_path, f) \ + for f in os.listdir(tmp_out_path) \ + if f.endswith('.json')] + dataset = load_dataset( + 'json', + data_files={'jsonl': jsonl_files}) + + self.assertEqual(len(dataset['jsonl']), 3) + for item in dataset['jsonl']: + self.assertIn('aspect_ratios', item['__dj__stats__']) + + if __name__ == '__main__': unittest.main()