Skip to content

Commit

Permalink
support ray actor (#511)
Browse files Browse the repository at this point in the history
* support ray actor
  • Loading branch information
Cathy0908 authored Dec 25, 2024
1 parent a26dcc7 commit 0125e1f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 24 deletions.
5 changes: 5 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ def init_setup_from_cfg(cfg: Namespace):

# check number of processes np
sys_cpu_count = os.cpu_count()
if not cfg.np:
cfg.np = sys_cpu_count
logger.warning(
f'Number of processes `np` is not set, '
f'set it to cpu count [{sys_cpu_count}] as default value.')
if cfg.np > sys_cpu_count:
logger.warning(f'Number of processes `np` is set as [{cfg.np}], which '
f'is larger than the cpu count [{sys_cpu_count}]. Due '
Expand Down
42 changes: 34 additions & 8 deletions data_juicer/core/ray_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,41 @@ def _run_single_op(self, op):
batch_size = getattr(op, 'batch_size',
1) if op.is_batched_op() else 1
if isinstance(op, Mapper):
self.data = self.data.map_batches(op.process,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.use_cuda():
op_kwargs = op._op_cfg[op._name]
self.data = self.data.map_batches(
op.__class__,
fn_args=None,
fn_kwargs=None,
fn_constructor_args=None,
fn_constructor_kwargs=op_kwargs,
batch_size=batch_size,
num_gpus=num_gpus,
concurrency=op_proc,
batch_format='pyarrow')
else:
self.data = self.data.map_batches(op.process,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
elif isinstance(op, Filter):
self.data = self.data.map_batches(op.compute_stats,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.use_cuda():
op_kwargs = op._op_cfg[op._name]
self.data = self.data.map_batches(
op.__class__,
fn_args=None,
fn_kwargs=None,
fn_constructor_args=None,
fn_constructor_kwargs=op_kwargs,
batch_size=batch_size,
num_gpus=num_gpus,
concurrency=op_proc,
batch_format='pyarrow')
else:
self.data = self.data.map_batches(op.compute_stats,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.stats_export_path is not None:
self.data.write_json(op.stats_export_path,
force_ascii=False)
Expand Down
6 changes: 6 additions & 0 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ def __init_subclass__(cls, **kwargs):
f'{cls.__name__}. Please implement {method_name}_single '
f'or {method_name}_batched.')

def __call__(self, *args, **kwargs):
return self.process(*args, **kwargs)

def process_batched(self, samples, *args, **kwargs):
keys = samples.keys()
first_key = next(iter(keys))
Expand Down Expand Up @@ -378,6 +381,9 @@ def __init_subclass__(cls, **kwargs):
f'{cls.__name__}. Please implement {method_name}_single '
f'or {method_name}_batched.')

def __call__(self, *args, **kwargs):
return self.compute_stats(*args, **kwargs)

def compute_stats_batched(self, samples, *args, **kwargs):
keys = samples.keys()
num_samples = len(samples[Fields.stats])
Expand Down
46 changes: 32 additions & 14 deletions data_juicer/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,50 @@ def calculate_np(name,
"""Calculate the optimum number of processes for the given OP"""
eps = 1e-9 # about 1 byte

if num_proc is None:
num_proc = psutil.cpu_count()

if use_cuda:
auto_num_proc = None
cuda_mem_available = get_min_cuda_memory() / 1024
op_proc = min(
num_proc,
math.floor(cuda_mem_available / (mem_required + eps)) *
cuda_device_count())
if use_cuda and mem_required == 0:
if mem_required == 0:
logger.warning(f'The required cuda memory of Op[{name}] '
f'has not been specified. '
f'Please specify the mem_required field in the '
f'config file, or you might encounter CUDA '
f'out of memory error. You can reference '
f'the mem_required field in the '
f'config_all.yaml file.')
if op_proc < 1.0:
logger.warning(f'The required cuda memory:{mem_required}GB might '
f'be more than the available cuda memory:'
f'{cuda_mem_available}GB.'
f'This Op[{name}] might '
f'require more resource to run.')
else:
auto_num_proc = math.floor(
cuda_mem_available / mem_required) * cuda_device_count()
if cuda_mem_available / mem_required < 1.0:
logger.warning(
f'The required cuda memory:{mem_required}GB might '
f'be more than the available cuda memory:'
f'{cuda_mem_available}GB.'
f'This Op[{name}] might '
f'require more resource to run.')

if auto_num_proc and num_proc:
op_proc = min(auto_num_proc, num_proc)
if num_proc > auto_num_proc:
logger.warning(
f'The given num_proc: {num_proc} is greater than '
f'the value {auto_num_proc} auto calculated based '
f'on the mem_required of Op[{name}]. '
f'Set the `num_proc` to {auto_num_proc}.')
elif not auto_num_proc and not num_proc:
op_proc = cuda_device_count()
logger.warning(
f'Both mem_required and num_proc of Op[{name}] are not set.'
f'Set the `num_proc` to number of GPUs {op_proc}.')
else:
op_proc = auto_num_proc if auto_num_proc else num_proc

op_proc = max(op_proc, 1)
return op_proc
else:
if num_proc is None:
num_proc = psutil.cpu_count()

op_proc = num_proc
cpu_available = psutil.cpu_count()
mem_available = psutil.virtual_memory().available
Expand Down
103 changes: 101 additions & 2 deletions tests/tools/test_process_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,49 @@
import subprocess
import tempfile
import unittest
import uuid
import yaml

from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase


def run_in_subprocess(cmd):
try:
with subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as return_info:
while True:
next_line = return_info.stdout.readline()
return_line = next_line.decode('utf-8', 'ignore').strip()
if return_line == '' and return_info.poll() != None:
break
if return_line != '':
print(return_line)

err_lines = ''
while True:
next_line = return_info.stderr.readline()
return_line = next_line.decode('utf-8', 'ignore').strip()
if return_line == '' and return_info.poll() != None:
break
if return_line != '':
print(return_line)
err_lines += return_line + '\n'

return_code = return_info.wait()
if return_code:
raise RuntimeError(err_lines)
except Exception as e:
raise e


class ProcessDataTest(DataJuicerTestCaseBase):

def setUp(self):
super().setUp()

self.tmp_dir = tempfile.TemporaryDirectory().name
if not osp.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
os.makedirs(self.tmp_dir, exist_ok=True)

def tearDown(self):
super().tearDown()
Expand Down Expand Up @@ -66,5 +96,74 @@ def test_status_code_1(self):
self.assertFalse(osp.exists(tmp_out_path))


class ProcessDataRayTest(DataJuicerTestCaseBase):

def setUp(self):
super().setUp()

cur_dir = osp.dirname(osp.abspath(__file__))
self.tmp_dir = osp.join(cur_dir, f'tmp_{uuid.uuid4().hex}')
os.makedirs(self.tmp_dir, exist_ok=True)

def tearDown(self):
super().tearDown()

if osp.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)

import ray
ray.shutdown()

def test_ray_image(self):
tmp_yaml_file = osp.join(self.tmp_dir, 'config_0.yaml')
tmp_out_path = osp.join(self.tmp_dir, 'output_0.json')
text_keys = 'text'

data_path = osp.join(osp.dirname(osp.dirname(osp.dirname(osp.realpath(__file__)))),
'demos', 'data', 'demo-dataset-images.jsonl')
yaml_config = {
'dataset_path': data_path,
'executor_type': 'ray',
'ray_address': 'auto',
'text_keys': text_keys,
'image_key': 'images',
'export_path': tmp_out_path,
'process': [
{
'image_nsfw_filter': {
'hf_nsfw_model': 'Falconsai/nsfw_image_detection',
'trust_remote_code': True,
'score_threshold': 0.5,
'any_or_all': 'any',
'mem_required': '8GB'
},
'image_aspect_ratio_filter':{
'min_ratio': 0.5,
'max_ratio': 2.0
}
}
]
}

with open(tmp_yaml_file, 'w') as file:
yaml.dump(yaml_config, file)

run_in_subprocess(f'python tools/process_data.py --config {tmp_yaml_file}')

self.assertTrue(osp.exists(tmp_out_path))

from datasets import load_dataset
jsonl_files = [os.path.join(tmp_out_path, f) \
for f in os.listdir(tmp_out_path) \
if f.endswith('.json')]
dataset = load_dataset(
'json',
data_files={'jsonl': jsonl_files})

self.assertEqual(len(dataset['jsonl']), 3)
for item in dataset['jsonl']:
self.assertIn('aspect_ratios', item['__dj__stats__'])


if __name__ == '__main__':
unittest.main()

0 comments on commit 0125e1f

Please sign in to comment.