Skip to content

Commit

Permalink
add ray testcase
Browse files Browse the repository at this point in the history
  • Loading branch information
pan-x-c committed May 7, 2024
1 parent accba06 commit 6e6409d
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 93 deletions.
127 changes: 65 additions & 62 deletions data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,70 @@ def get_num_gpus(self, op, op_proc):
proc_per_gpu = op_proc / cuda_device_count()
return 1.0 / proc_per_gpu

def run_op(self, op, op_cfg, dataset):
op_name, op_args = list(op_cfg.items())[0]
op_cls = OPERATORS.modules[op_name]
op_proc = calculate_np(self.cfg.np, op, op_name)
num_gpus = self.get_num_gpus(op, op_proc)
use_actor = op.use_actor() or num_gpus
try:
if isinstance(op, Mapper):
if op.is_batched_op():
if use_actor:
dataset = dataset.map_batches(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(
partial(ray_batch_mapper_wrapper,
fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process,
num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats,
num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path,
force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for '
'now')
raise NotImplementedError
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
exit(1)

def run(self, load_data_np=None):
"""
Running the dataset process pipeline.
Expand Down Expand Up @@ -140,68 +204,7 @@ def process_batch_arrow(table: pa.Table) -> pa.Table:
logger.info('Processing data...')
tstart = time.time()
for op_cfg, op in zip(self.process_list, self.ops):
op_name, op_args = list(op_cfg.items())[0]
op_cls = OPERATORS.modules[op_name]
op_proc = calculate_np(self.cfg.np, op, op_name)
num_gpus = self.get_num_gpus(op, op_proc)
use_actor = op.use_actor() or num_gpus
try:
if isinstance(op, Mapper):
if op.is_batched_op():
if use_actor:
dataset = dataset.map_batches(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(
partial(ray_batch_mapper_wrapper,
fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process,
num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats,
num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path,
force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
'Ray executor only support Filter and Mapper OPs for '
'now')
raise NotImplementedError
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
exit(1)
dataset = self.run_op(op, op_cfg, dataset)

# 4. data export
logger.info('Exporting dataset to disk...')
Expand Down
49 changes: 40 additions & 9 deletions data_juicer/utils/unittest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import shutil
import unittest

import numpy
import ray.data as rd
import pyarrow as pa
from datasets import Dataset

from data_juicer.ops import Filter
Expand Down Expand Up @@ -50,30 +52,59 @@ def tearDownClass(cls, hf_model_name=None) -> None:
print('CLEAN all TRANSFORMERS_CACHE')
shutil.rmtree(transformers.TRANSFORMERS_CACHE)

def generate_dataset(cls, data, type='standalone'):
def generate_dataset(self, data):
"""Generate dataset for a specific executor.
Args:
type (str, optional): `hf` or `ray`. Defaults to "hf".
type (str, optional): "standalone" or "ray". Defaults to "standalone".
"""
if type.startswith('standalone'):
if self.current_tag.startswith('standalone'):
return Dataset.from_list(data)
elif type.startswith('ray'):
return rd.from_items(data)
elif self.current_tag.startswith('ray'):
dataset = rd.from_items(data)
if Fields.stats not in dataset.columns(fetch_if_missing=False):
def process_batch_arrow(table: pa.Table) -> pa.Table:
new_column_data = [{} for _ in range(len(table))]
new_talbe = table.append_column(Fields.stats,
[new_column_data])
return new_talbe

dataset = dataset.map_batches(process_batch_arrow,
batch_format='pyarrow')
return dataset
else:
raise ValueError('Unsupported type')

def run_single_op(cls, dataset, op, column_names, type='standalone'):
def run_single_op(self, dataset, op, column_names):
"""Run operator in the specific executor."""
if type.startswith('standalone'):
if self.current_tag.startswith('standalone'):
if isinstance(op, Filter) and Fields.stats not in dataset.features:
dataset = dataset.add_column(name=Fields.stats,
column=[{}] * dataset.num_rows)
dataset = dataset.map(op.compute_stats)
dataset = dataset.filter(op.process)
dataset = dataset.select_columns(column_names=column_names)
return dataset.to_list()
elif type.startswith('ray'):
raise ValueError('Unsupported type')
elif self.current_tag.startswith('ray'):
dataset = dataset.map(op.compute_stats)
dataset = dataset.filter(op.process)
dataset = dataset.to_pandas().get(column_names)
if dataset is None:
return []
return dataset.to_dict(orient="records")
else:
raise ValueError('Unsupported type')

def assertDatasetEqual(self, first, second):
def convert_record(rec):
for key in rec.keys():
# Convert incomparable `list` to comparable `tuple`
if isinstance(rec[key], numpy.ndarray) or isinstance(rec[key], list):
rec[key] = tuple(rec[key])
return rec

first = [convert_record(d) for d in first]
second = [convert_record(d) for d in second]
first = sorted(first, key=lambda x: tuple(sorted(x.items())))
second = sorted(second, key=lambda x: tuple(sorted(x.items())))
return self.assertEqual(first, second)
12 changes: 6 additions & 6 deletions tests/ops/filter/test_alphanumeric_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ def test_case(self):
}, {
'text': 'emoji表情测试下😊,😸31231\n'
}]
dataset = DataJuicerTestCaseBase.generate_dataset(ds_list)
dataset = self.generate_dataset(ds_list)
op = AlphanumericFilter(min_ratio=0.2, max_ratio=0.9)
result = DataJuicerTestCaseBase.run_single_op(dataset, op, ["text"], self.current_tag)
self.assertEqual(result, tgt_list)
result = self.run_single_op(dataset, op, ["text"])
self.assertDatasetEqual(result, tgt_list)

@TEST_TAG("standalone", "ray")
def test_token_case(self):
Expand All @@ -66,10 +66,10 @@ def test_token_case(self):
}, {
'text': 'Do you need a cup of coffee?'
}]
dataset = DataJuicerTestCaseBase.generate_dataset(ds_list)
dataset = self.generate_dataset(ds_list)
op = AlphanumericFilter(tokenization=True, min_ratio=1.5)
result = DataJuicerTestCaseBase.run_single_op(dataset, op, ["text"], self.current_tag)
self.assertEqual(result, tgt_list)
result = self.run_single_op(dataset, op, ["text"])
self.assertDatasetEqual(result, tgt_list)


if __name__ == '__main__':
Expand Down
47 changes: 31 additions & 16 deletions tests/ops/filter/test_audio_duration_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from data_juicer.ops.filter.audio_duration_filter import AudioDurationFilter
from data_juicer.utils.constant import Fields
from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase
from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, TEST_TAG


class AudioDurationFilterTest(DataJuicerTestCaseBase):
Expand All @@ -30,6 +30,7 @@ def _run_audio_duration_filter(self,
res_list = dataset.to_list()
self.assertEqual(res_list, target_list)

@TEST_TAG("standalone", "ray")
def test_default_filter(self):

ds_list = [{
Expand All @@ -46,10 +47,13 @@ def test_default_filter(self):
}, {
'audios': [self.aud3_path]
}]
dataset = Dataset.from_list(ds_list)
dataset = self.generate_dataset(ds_list)
op = AudioDurationFilter()
self._run_audio_duration_filter(dataset, tgt_list, op)
result = self.run_single_op(dataset, op, [op.audio_key])
self.assertDatasetEqual(result, tgt_list)


@TEST_TAG("standalone", "ray")
def test_filter_long_audios(self):

ds_list = [{
Expand All @@ -60,10 +64,12 @@ def test_filter_long_audios(self):
'audios': [self.aud3_path]
}]
tgt_list = [{'audios': [self.aud1_path]}]
dataset = Dataset.from_list(ds_list)
dataset = self.generate_dataset(ds_list)
op = AudioDurationFilter(max_duration=10)
self._run_audio_duration_filter(dataset, tgt_list, op)
result = self.run_single_op(dataset, op, [op.audio_key])
self.assertDatasetEqual(result, tgt_list)

@TEST_TAG("standalone", "ray")
def test_filter_short_audios(self):

ds_list = [{
Expand All @@ -74,10 +80,12 @@ def test_filter_short_audios(self):
'audios': [self.aud3_path]
}]
tgt_list = [{'audios': [self.aud3_path]}]
dataset = Dataset.from_list(ds_list)
dataset = self.generate_dataset(ds_list)
op = AudioDurationFilter(min_duration=60)
self._run_audio_duration_filter(dataset, tgt_list, op)
result = self.run_single_op(dataset, op, [op.audio_key])
self.assertDatasetEqual(result, tgt_list)

@TEST_TAG("standalone", "ray")
def test_filter_audios_within_range(self):

ds_list = [{
Expand All @@ -88,12 +96,13 @@ def test_filter_audios_within_range(self):
'audios': [self.aud3_path]
}]
tgt_list = [{'audios': [self.aud2_path]}]
dataset = Dataset.from_list(ds_list)
dataset = self.generate_dataset(ds_list)
op = AudioDurationFilter(min_duration=10, max_duration=20)
self._run_audio_duration_filter(dataset, tgt_list, op)
result = self.run_single_op(dataset, op, [op.audio_key])
self.assertDatasetEqual(result, tgt_list)

@TEST_TAG("standalone", "ray")
def test_any(self):

ds_list = [{
'audios': [self.aud1_path, self.aud2_path]
}, {
Expand All @@ -106,12 +115,15 @@ def test_any(self):
}, {
'audios': [self.aud2_path, self.aud3_path]
}]
dataset = Dataset.from_list(ds_list)
dataset = self.generate_dataset(ds_list)
op = AudioDurationFilter(min_duration=10,
max_duration=20,
any_or_all='any')
self._run_audio_duration_filter(dataset, tgt_list, op)
result = self.run_single_op(dataset, op, [op.audio_key])
print(result)
self.assertDatasetEqual(result, tgt_list)

@TEST_TAG("standalone", "ray")
def test_all(self):

ds_list = [{
Expand All @@ -122,12 +134,14 @@ def test_all(self):
'audios': [self.aud1_path, self.aud3_path]
}]
tgt_list = []
dataset = Dataset.from_list(ds_list)
dataset = self.generate_dataset(ds_list)
op = AudioDurationFilter(min_duration=10,
max_duration=20,
any_or_all='all')
self._run_audio_duration_filter(dataset, tgt_list, op)
result = self.run_single_op(dataset, op, [op.audio_key])
self.assertDatasetEqual(result, tgt_list)

@TEST_TAG("standalone", "ray")
def test_filter_in_parallel(self):

ds_list = [{
Expand All @@ -138,9 +152,10 @@ def test_filter_in_parallel(self):
'audios': [self.aud3_path]
}]
tgt_list = [{'audios': [self.aud2_path]}]
dataset = Dataset.from_list(ds_list)
dataset = self.generate_dataset(ds_list)
op = AudioDurationFilter(min_duration=10, max_duration=20)
self._run_audio_duration_filter(dataset, tgt_list, op, np=2)
result = self.run_single_op(dataset, op, [op.audio_key])
self.assertDatasetEqual(result, tgt_list)


if __name__ == '__main__':
Expand Down

0 comments on commit 6e6409d

Please sign in to comment.