From 083b665ee2d1c7d136096fad325eaa1435e3bad8 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Thu, 12 Dec 2024 21:25:47 +0800 Subject: [PATCH 01/22] + add auto mode for analyzer: load all filters that produce stats to analyze the target dataset --- data_juicer/config/config.py | 44 ++++++++++++++++++++++++++---- data_juicer/core/analyzer.py | 2 +- data_juicer/ops/filter/__init__.py | 7 +++++ 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index 71f871f10..9e003be3d 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -23,7 +23,7 @@ global_parser = None -def init_configs(args: Optional[List[str]] = None): +def init_configs(args: Optional[List[str]] = None, which_entry: object = None): """ initialize the jsonargparse parser and parse configs from one of: 1. POSIX-style commands line args; @@ -32,14 +32,23 @@ def init_configs(args: Optional[List[str]] = None): 4. hard-coded defaults :param args: list of params, e.g., ['--conifg', 'cfg.yaml'], defaut None. + :param which_entry: which entry to init configs (executor/analyzer) :return: a global cfg object used by the Executor or Analyzer """ parser = ArgumentParser(default_env=True, default_config_files=None) - parser.add_argument('--config', - action=ActionConfigFile, - help='Path to a dj basic configuration file.', - required=True) + # required but mutually exclusive args group + required_group = parser.add_mutually_exclusive_group(required=True) + required_group.add_argument('--config', + action=ActionConfigFile, + help='Path to a dj basic configuration file.') + required_group.add_argument('--auto', + action='store_true', + help='Weather to use an auto analyzing ' + 'strategy instead of a specific data ' + 'recipe. If a specific config file is ' + 'given by --config arg, this arg is ' + 'disabled. Only available for Analyzer.') parser.add_argument( '--hpo_config', @@ -339,6 +348,14 @@ def init_configs(args: Optional[List[str]] = None): try: cfg = parser.parse_args(args=args) + + # check the entry + from data_juicer.core.analyzer import Analyzer + if not isinstance(which_entry, Analyzer) and cfg.auto: + err_msg = '--auto argument can only be used for analyzer!' + logger.error(err_msg) + raise NotImplementedError(err_msg) + cfg = init_setup_from_cfg(cfg) cfg = update_op_process(cfg, parser) @@ -488,6 +505,16 @@ def init_setup_from_cfg(cfg: Namespace): SpecialTokens.image = cfg.image_special_token SpecialTokens.eoc = cfg.eoc_special_token + # add all filters that produce stats + if cfg.auto: + import pkgutil + + import data_juicer.ops.filter as djfilters + cfg.process = [{ + filter_name: {} + } for _, filter_name, _ in pkgutil.iter_modules(djfilters.__path__) + if filter_name not in djfilters.NON_STATS_FILTERS] + # Apply text_key modification during initializing configs # users can freely specify text_key for different ops using `text_key` # otherwise, set arg text_key of each op to text_keys @@ -631,7 +658,10 @@ def update_op_process(cfg, parser): temp_args = namespace_to_arg_list(temp_cfg, includes=recognized_args, excludes=['config']) - temp_args = ['--config', temp_cfg.config[0].absolute] + temp_args + if temp_cfg.config: + temp_args = ['--config', temp_cfg.config[0].absolute] + temp_args + else: + temp_args = ['--auto'] + temp_args temp_parser.parse_args(temp_args) return cfg @@ -657,6 +687,8 @@ def namespace_to_arg_list(namespace, prefix='', includes=None, excludes=None): def config_backup(cfg: Namespace): + if not cfg.config: + return cfg_path = cfg.config[0].absolute work_dir = cfg.work_dir target_path = os.path.join(work_dir, os.path.basename(cfg_path)) diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index 2ae4d3511..887de8330 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -33,7 +33,7 @@ def __init__(self, cfg: Optional[Namespace] = None): :param cfg: optional jsonargparse Namespace dict. """ - self.cfg = init_configs() if cfg is None else cfg + self.cfg = init_configs(which_entry=self) if cfg is None else cfg self.work_dir = self.cfg.work_dir diff --git a/data_juicer/ops/filter/__init__.py b/data_juicer/ops/filter/__init__.py index dad6818e1..8cb986b2b 100644 --- a/data_juicer/ops/filter/__init__.py +++ b/data_juicer/ops/filter/__init__.py @@ -63,3 +63,10 @@ 'VideoTaggingFromFramesFilter', 'VideoWatermarkFilter', 'WordRepetitionFilter', 'WordsNumFilter' ] + +NON_STATS_FILTERS = [ + 'specified_field_filter', + 'specified_numeric_field_filter', + 'suffix_filter', + 'video_tagging_from_frames_filter', +] From 662df5ed1755ff35113d70bf765e8dcc42eeda5d Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 13 Dec 2024 11:04:36 +0800 Subject: [PATCH 02/22] + add default mem_required for those model-based OPs --- configs/config_all.yaml | 2 +- data_juicer/ops/filter/image_aesthetics_filter.py | 2 +- data_juicer/ops/filter/image_nsfw_filter.py | 1 + data_juicer/ops/filter/image_text_matching_filter.py | 1 + data_juicer/ops/filter/image_text_similarity_filter.py | 1 + data_juicer/ops/filter/image_watermark_filter.py | 1 + data_juicer/ops/filter/phrase_grounding_recall_filter.py | 1 + data_juicer/ops/filter/video_aesthetics_filter.py | 2 +- data_juicer/ops/filter/video_frames_text_similarity_filter.py | 1 + data_juicer/ops/filter/video_nsfw_filter.py | 1 + data_juicer/ops/filter/video_tagging_from_frames_filter.py | 1 + data_juicer/ops/filter/video_watermark_filter.py | 1 + data_juicer/ops/mapper/image_captioning_mapper.py | 2 ++ data_juicer/ops/mapper/image_diffusion_mapper.py | 1 + data_juicer/ops/mapper/image_tagging_mapper.py | 1 + data_juicer/ops/mapper/video_captioning_from_audio_mapper.py | 1 + data_juicer/ops/mapper/video_captioning_from_frames_mapper.py | 1 + .../ops/mapper/video_captioning_from_summarizer_mapper.py | 1 + data_juicer/ops/mapper/video_captioning_from_video_mapper.py | 1 + data_juicer/ops/mapper/video_tagging_from_audio_mapper.py | 1 + data_juicer/ops/mapper/video_tagging_from_frames_mapper.py | 1 + 21 files changed, 22 insertions(+), 3 deletions(-) diff --git a/configs/config_all.yaml b/configs/config_all.yaml index 1003c89af..3a88abcc7 100644 --- a/configs/config_all.yaml +++ b/configs/config_all.yaml @@ -567,7 +567,7 @@ process: vertical_flip: false # flip frame image vertically (top to bottom). reduce_mode: avg # reduce mode when one text corresponds to multiple videos in a chunk, must be one of ['avg','max', 'min']. any_or_all: any # keep this sample when any/all videos meet the filter condition - mem_required: '1GB' # This operation (Op) utilizes deep neural network models that consume a significant amount of memory for computation, hence the system's available memory might constrains the maximum number of processes that can be launched + mem_required: '1500MB' # This operation (Op) utilizes deep neural network models that consume a significant amount of memory for computation, hence the system's available memory might constrains the maximum number of processes that can be launched - video_motion_score_filter: # Keep samples with video motion scores within a specific range. min_score: 0.25 # the minimum motion score to keep samples max_score: 10000.0 # the maximum motion score to keep samples diff --git a/data_juicer/ops/filter/image_aesthetics_filter.py b/data_juicer/ops/filter/image_aesthetics_filter.py index bbaba15eb..723845a5d 100644 --- a/data_juicer/ops/filter/image_aesthetics_filter.py +++ b/data_juicer/ops/filter/image_aesthetics_filter.py @@ -46,7 +46,7 @@ def __init__(self, :param args: Extra positional arguments. :param kwargs: Extra keyword arguments. """ - + kwargs.setdefault('mem_required', '1500MB') super().__init__(*args, **kwargs) if hf_scorer_model == '': hf_scorer_model = \ diff --git a/data_juicer/ops/filter/image_nsfw_filter.py b/data_juicer/ops/filter/image_nsfw_filter.py index 603a48518..aea409ec4 100644 --- a/data_juicer/ops/filter/image_nsfw_filter.py +++ b/data_juicer/ops/filter/image_nsfw_filter.py @@ -41,6 +41,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '1GB') super().__init__(*args, **kwargs) self.score_threshold = score_threshold if any_or_all not in ['any', 'all']: diff --git a/data_juicer/ops/filter/image_text_matching_filter.py b/data_juicer/ops/filter/image_text_matching_filter.py index dc36cd68a..6881eccf5 100644 --- a/data_juicer/ops/filter/image_text_matching_filter.py +++ b/data_juicer/ops/filter/image_text_matching_filter.py @@ -52,6 +52,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '1500MB') super().__init__(*args, **kwargs) self.min_score = min_score self.max_score = max_score diff --git a/data_juicer/ops/filter/image_text_similarity_filter.py b/data_juicer/ops/filter/image_text_similarity_filter.py index d43c9bc3f..9a3f9361b 100644 --- a/data_juicer/ops/filter/image_text_similarity_filter.py +++ b/data_juicer/ops/filter/image_text_similarity_filter.py @@ -53,6 +53,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '1500MB') super().__init__(*args, **kwargs) self.min_score = min_score self.max_score = max_score diff --git a/data_juicer/ops/filter/image_watermark_filter.py b/data_juicer/ops/filter/image_watermark_filter.py index 0d9eead6a..b752736a4 100644 --- a/data_juicer/ops/filter/image_watermark_filter.py +++ b/data_juicer/ops/filter/image_watermark_filter.py @@ -45,6 +45,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '500MB') super().__init__(*args, **kwargs) self.prob_threshold = prob_threshold if any_or_all not in ['any', 'all']: diff --git a/data_juicer/ops/filter/phrase_grounding_recall_filter.py b/data_juicer/ops/filter/phrase_grounding_recall_filter.py index 98a2dfb1f..9dec0dc3c 100644 --- a/data_juicer/ops/filter/phrase_grounding_recall_filter.py +++ b/data_juicer/ops/filter/phrase_grounding_recall_filter.py @@ -114,6 +114,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '1GB') super().__init__(*args, **kwargs) self.min_recall = min_recall self.max_recall = max_recall diff --git a/data_juicer/ops/filter/video_aesthetics_filter.py b/data_juicer/ops/filter/video_aesthetics_filter.py index 5e674162d..f65334f56 100644 --- a/data_juicer/ops/filter/video_aesthetics_filter.py +++ b/data_juicer/ops/filter/video_aesthetics_filter.py @@ -73,7 +73,7 @@ def __init__(self, :param args: Extra positional arguments. :param kwargs: Extra keyword arguments. """ - + kwargs.setdefault('mem_required', '1500MB') super().__init__(*args, **kwargs) if hf_scorer_model == '': hf_scorer_model = \ diff --git a/data_juicer/ops/filter/video_frames_text_similarity_filter.py b/data_juicer/ops/filter/video_frames_text_similarity_filter.py index 6b3e92641..da793ccf4 100644 --- a/data_juicer/ops/filter/video_frames_text_similarity_filter.py +++ b/data_juicer/ops/filter/video_frames_text_similarity_filter.py @@ -74,6 +74,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '1500MB') super().__init__(*args, **kwargs) self.min_score = min_score self.max_score = max_score diff --git a/data_juicer/ops/filter/video_nsfw_filter.py b/data_juicer/ops/filter/video_nsfw_filter.py index 27bafe1d0..a1dd9d214 100644 --- a/data_juicer/ops/filter/video_nsfw_filter.py +++ b/data_juicer/ops/filter/video_nsfw_filter.py @@ -65,6 +65,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '1GB') super().__init__(*args, **kwargs) self.score_threshold = score_threshold if frame_sampling_method not in ['all_keyframes', 'uniform']: diff --git a/data_juicer/ops/filter/video_tagging_from_frames_filter.py b/data_juicer/ops/filter/video_tagging_from_frames_filter.py index 7c41b5521..8872aab32 100644 --- a/data_juicer/ops/filter/video_tagging_from_frames_filter.py +++ b/data_juicer/ops/filter/video_tagging_from_frames_filter.py @@ -61,6 +61,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '9GB') super().__init__(*args, **kwargs) if contain not in ['any', 'all']: raise ValueError(f'the containing type [{contain}] is not ' diff --git a/data_juicer/ops/filter/video_watermark_filter.py b/data_juicer/ops/filter/video_watermark_filter.py index 2b7e30f8f..959c91e23 100644 --- a/data_juicer/ops/filter/video_watermark_filter.py +++ b/data_juicer/ops/filter/video_watermark_filter.py @@ -69,6 +69,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '500MB') super().__init__(*args, **kwargs) self.prob_threshold = prob_threshold if frame_sampling_method not in ['all_keyframes', 'uniform']: diff --git a/data_juicer/ops/mapper/image_captioning_mapper.py b/data_juicer/ops/mapper/image_captioning_mapper.py index 0bc486193..98bb3ad7c 100644 --- a/data_juicer/ops/mapper/image_captioning_mapper.py +++ b/data_juicer/ops/mapper/image_captioning_mapper.py @@ -81,6 +81,8 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '16GB') + super().__init__(*args, **kwargs) if keep_candidate_mode not in [ diff --git a/data_juicer/ops/mapper/image_diffusion_mapper.py b/data_juicer/ops/mapper/image_diffusion_mapper.py index c53d6f56d..53e315844 100644 --- a/data_juicer/ops/mapper/image_diffusion_mapper.py +++ b/data_juicer/ops/mapper/image_diffusion_mapper.py @@ -91,6 +91,7 @@ def __init__(self, :param hf_img2seq: model name on huggingface to generate caption if caption_key is None. """ + kwargs.setdefault('mem_required', '8GB') super().__init__(*args, **kwargs) self._init_parameters = self.remove_extra_parameters(locals()) self.strength = strength diff --git a/data_juicer/ops/mapper/image_tagging_mapper.py b/data_juicer/ops/mapper/image_tagging_mapper.py index d47fbf0ef..e3fc46f1b 100644 --- a/data_juicer/ops/mapper/image_tagging_mapper.py +++ b/data_juicer/ops/mapper/image_tagging_mapper.py @@ -36,6 +36,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '9GB') super().__init__(*args, **kwargs) self.model_key = prepare_model( model_type='recognizeAnything', diff --git a/data_juicer/ops/mapper/video_captioning_from_audio_mapper.py b/data_juicer/ops/mapper/video_captioning_from_audio_mapper.py index 4833409a4..75ffb9b3a 100644 --- a/data_juicer/ops/mapper/video_captioning_from_audio_mapper.py +++ b/data_juicer/ops/mapper/video_captioning_from_audio_mapper.py @@ -32,6 +32,7 @@ def __init__(self, keep_original_sample: bool = True, *args, **kwargs): :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '30GB') super().__init__(*args, **kwargs) AUTOINSTALL.check([ 'transformers', 'transformers_stream_generator', 'einops', diff --git a/data_juicer/ops/mapper/video_captioning_from_frames_mapper.py b/data_juicer/ops/mapper/video_captioning_from_frames_mapper.py index dbf614510..d4c664c5f 100644 --- a/data_juicer/ops/mapper/video_captioning_from_frames_mapper.py +++ b/data_juicer/ops/mapper/video_captioning_from_frames_mapper.py @@ -108,6 +108,7 @@ def __init__( :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '20GB') super().__init__(*args, **kwargs) if keep_candidate_mode not in [ diff --git a/data_juicer/ops/mapper/video_captioning_from_summarizer_mapper.py b/data_juicer/ops/mapper/video_captioning_from_summarizer_mapper.py index b2f4c8139..67eb7e234 100644 --- a/data_juicer/ops/mapper/video_captioning_from_summarizer_mapper.py +++ b/data_juicer/ops/mapper/video_captioning_from_summarizer_mapper.py @@ -81,6 +81,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '40GB') super().__init__(*args, **kwargs) AUTOINSTALL.check([ 'torch', diff --git a/data_juicer/ops/mapper/video_captioning_from_video_mapper.py b/data_juicer/ops/mapper/video_captioning_from_video_mapper.py index 04cd641ab..737626260 100644 --- a/data_juicer/ops/mapper/video_captioning_from_video_mapper.py +++ b/data_juicer/ops/mapper/video_captioning_from_video_mapper.py @@ -108,6 +108,7 @@ def __init__( :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '20GB') super().__init__(*args, **kwargs) if keep_candidate_mode not in [ diff --git a/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py b/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py index 763a3381c..2c32093a5 100644 --- a/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py +++ b/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py @@ -37,6 +37,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '500MB') super().__init__(*args, **kwargs) AUTOINSTALL.check(['torchaudio']) self.model_key = prepare_model(model_type='huggingface', diff --git a/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py b/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py index 26227738b..d4995d3f6 100644 --- a/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py +++ b/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py @@ -55,6 +55,7 @@ def __init__(self, :param args: extra args :param kwargs: extra args """ + kwargs.setdefault('mem_required', '9GB') super().__init__(*args, **kwargs) if frame_sampling_method not in ['all_keyframes', 'uniform']: raise ValueError( From 926c3dacb9d7bab92c496cc54a8ed82146f853f1 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 13 Dec 2024 16:02:09 +0800 Subject: [PATCH 03/22] - support wordcloud drawing for str or str list fields in stats - support set the number of samples to be analyzed in auto mode. It's 1k in default. --- data_juicer/analysis/column_wise_analysis.py | 69 +++++++++++++++----- data_juicer/config/config.py | 6 ++ data_juicer/core/analyzer.py | 4 ++ environments/minimal_requires.txt | 1 + 4 files changed, 64 insertions(+), 16 deletions(-) diff --git a/data_juicer/analysis/column_wise_analysis.py b/data_juicer/analysis/column_wise_analysis.py index 775b42683..825d9b4dd 100644 --- a/data_juicer/analysis/column_wise_analysis.py +++ b/data_juicer/analysis/column_wise_analysis.py @@ -4,6 +4,7 @@ import matplotlib.pyplot as plt import pandas as pd from tqdm import tqdm +from wordcloud import WordCloud from data_juicer.utils.constant import Fields @@ -145,33 +146,39 @@ def analyze(self, show_percentiles=False, show=False, skip_export=False): else: axes = [None] * num_subcol - # draw histogram - self.draw_hist(axes[0], - data, - os.path.join(self.output_path, - f'{column_name}-hist.png'), - percentiles=percentiles) - - # draw box - self.draw_box(axes[1], - data, - os.path.join(self.output_path, - f'{column_name}-box.png'), - percentiles=percentiles) + if not skip_export: + # draw histogram + self.draw_hist(axes[0], + data, + os.path.join(self.output_path, + f'{column_name}-hist.png'), + percentiles=percentiles) + + # draw box + self.draw_box(axes[1], + data, + os.path.join(self.output_path, + f'{column_name}-box.png'), + percentiles=percentiles) else: # object (string) or string list -- only draw histogram for # this stat if self.save_stats_in_one_file: - axes = subfig.subplots(1, 1) + axes = subfig.subplots(1, num_subcol) else: - axes = None + axes = [None] * num_subcol if not skip_export: self.draw_hist( - axes, data, + axes[0], data, os.path.join(self.output_path, f'{column_name}-hist.png')) + self.draw_wordcloud( + axes[1], data, + os.path.join(self.output_path, + f'{column_name}-wordcloud.png')) + # add a title to the figure of this stat if self.save_stats_in_one_file: subfig.suptitle(f'{data.name}', @@ -297,3 +304,33 @@ def draw_box(self, ax, data, save_path, percentiles=None, show=False): # accumulated overlapped figures in different draw_xxx function # calling ax.clear() + + def draw_wordcloud(self, ax, data, save_path, show=False): + word_list = data.tolist() + word_nums = {} + for w in word_list: + if w in word_nums: + word_nums[w] += 1 + else: + word_nums[w] = 1 + + wc = WordCloud(width=400, height=320) + wc.generate_from_frequencies(word_nums) + + if ax is None: + ax = plt.figure(figsize=(20, 16)) + else: + ax.imshow(wc, interpolation='bilinear') + ax.axis('off') + + if not self.save_stats_in_one_file: + # save into file + wc.to_file(save_path) + + if show: + plt.show() + else: + # if no showing, we need to clear this axes to avoid + # accumulated overlapped figures in different draw_xxx function + # calling + ax.clear() diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index 9e003be3d..914acd2ff 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -50,6 +50,12 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None): 'given by --config arg, this arg is ' 'disabled. Only available for Analyzer.') + parser.add_argument('--auto_num', + type=PositiveInt, + default=1000, + help='The number of samples to be analyzed ' + 'automatically. It\'s 1000 in default.') + parser.add_argument( '--hpo_config', type=str, diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index 887de8330..10cae5aac 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -87,6 +87,10 @@ def run(self, if load_data_np is None: load_data_np = self.cfg.np dataset = self.formatter.load_dataset(load_data_np, self.cfg) + if self.cfg.auto: + # if it's auto analysis, only analyze for a minor part of the input + # dataset to save time and computing resource + dataset = dataset.take(self.cfg.auto_num) # extract processes logger.info('Preparing process operators...') diff --git a/environments/minimal_requires.txt b/environments/minimal_requires.txt index 414458edc..71aa0ba38 100644 --- a/environments/minimal_requires.txt +++ b/environments/minimal_requires.txt @@ -33,3 +33,4 @@ pydantic>=2.0 Pillow fastapi[standard]>=0.100 httpx +wordcloud From 27347c0593fbb748416a71c85d0801283daddb41 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 13 Dec 2024 16:22:21 +0800 Subject: [PATCH 04/22] - take the minimum one of dataset length and auto num --- data_juicer/core/analyzer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index 10cae5aac..63e512d41 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -90,7 +90,7 @@ def run(self, if self.cfg.auto: # if it's auto analysis, only analyze for a minor part of the input # dataset to save time and computing resource - dataset = dataset.take(self.cfg.auto_num) + dataset = dataset.take(min(len(dataset), self.cfg.auto_num)) # extract processes logger.info('Preparing process operators...') From d19f92f7c8175379fe146d1ea1296e1e10035eca Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 13 Dec 2024 16:23:37 +0800 Subject: [PATCH 05/22] * update default export path --- data_juicer/config/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index 914acd2ff..d36ac2b54 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -112,7 +112,7 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None): parser.add_argument( '--export_path', type=str, - default='./outputs/hello_world.jsonl', + default='./outputs/hello_world/hello_world.jsonl', help='Path to export and save the output processed dataset. The ' 'directory to store the processed dataset will be the work ' 'directory of this process.') From fbd672659732b2ff6654574f67f0f090e68e5efc Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 13 Dec 2024 16:43:07 +0800 Subject: [PATCH 06/22] * set version limit for wandb to avoid exception --- environments/dev_requires.txt | 2 +- environments/sandbox_requires.txt | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/environments/dev_requires.txt b/environments/dev_requires.txt index 0ecd058c4..44dd79158 100644 --- a/environments/dev_requires.txt +++ b/environments/dev_requires.txt @@ -4,4 +4,4 @@ sphinx sphinx-autobuild sphinx_rtd_theme recommonmark -wandb +wandb<=0.19.0 diff --git a/environments/sandbox_requires.txt b/environments/sandbox_requires.txt index 7f1d27a25..6a1791cf8 100644 --- a/environments/sandbox_requires.txt +++ b/environments/sandbox_requires.txt @@ -1,5 +1,4 @@ torch>=1.11.0 -wandb fire pyspark # vbench-related From 9f9f85b26af189ae4afc68545f46df1eff219eb7 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 13 Dec 2024 16:56:40 +0800 Subject: [PATCH 07/22] + add docs for auto mode --- README.md | 5 +++++ README_ZH.md | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/README.md b/README.md index eb34e17ba..ee389e845 100644 --- a/README.md +++ b/README.md @@ -317,6 +317,11 @@ python tools/analyze_data.py --config configs/demo/analyzer.yaml # use command line tool dj-analyze --config configs/demo/analyzer.yaml + +# you can also use auto mode to avoid writing a recipe. It will analyze a small +# part (e.g. 1000 samples, specified by argument `auto_num`) of your dataset +# with all Filters that produce stats. +dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000] ``` - **Note:** Analyzer only compute stats of Filter ops. So extra Mapper or Deduplicator ops will be ignored in the analysis process. diff --git a/README_ZH.md b/README_ZH.md index 905a4e1a2..f1e98f098 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -295,6 +295,10 @@ python tools/analyze_data.py --config configs/demo/analyzer.yaml # 使用命令行工具 dj-analyze --config configs/demo/analyzer.yaml + +# 你也可以使用"自动"模式来避免写一个新的数据菜谱。它会使用全部可产出统计信息的 Filter 来分析 +# 你的数据集的一小部分(如1000条样本,可通过 `auto_num` 参数指定) +dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000] ``` * **注意**:Analyzer 只计算 Filter 算子的状态,其他的算子(例如 Mapper 和 Deduplicator)会在分析过程中被忽略。 From 566eb5b16461d2d9d074ebe4a52ccc0cd78cf9a9 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Mon, 16 Dec 2024 19:23:56 +0800 Subject: [PATCH 08/22] + support t-test for Measure --- data_juicer/analysis/measure.py | 108 ++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/data_juicer/analysis/measure.py b/data_juicer/analysis/measure.py index fe54cdabd..a6d9e1212 100644 --- a/data_juicer/analysis/measure.py +++ b/data_juicer/analysis/measure.py @@ -1,9 +1,13 @@ +import numpy as np + from data_juicer.utils.lazy_loader import LazyLoader torch = LazyLoader('torch', 'torch') td = LazyLoader('td', 'torch.distributions') F = LazyLoader('F', 'torch.nn.functional') +stats = LazyLoader('stats', 'scipy.stats') + class Measure(object): """Base class for Measure distribution. @@ -48,6 +52,15 @@ def _convert_to_categorical(self, p): else: return td.Categorical(torch.tensor(p)) + def _convert_to_ndarray(self, p): + """ + Convert input data to torch tensor. + :param p: input data, now support + [`scalar`,`list`, `tuple`, `torch binary file`, and `Categorical`]. + :return: torch tensor + """ + return self._convert_to_tensor(p).numpy() + class KLDivMeasure(Measure): """ @@ -108,3 +121,98 @@ class EntropyMeasure(Measure): def measure(self, p): p = self._convert_to_categorical(p) return p.entropy() + + +class RelatedTTestMeasure(Measure): + """ + Measure T-Test for two related distributions on their histogram of the same + bins. + + For continuous features or distributions, the input could be dataset stats + list. + For discrete features or distributions, the input could be the tags or the + categories list. + """ + name = 't-test' + + @staticmethod + def stats_to_hist(self, p, q): + # get common maximum number of data samples, and max/min values + max_data_num = max(len(p), len(q)) + min_val = min(min(p), min(q)) + max_val = max(max(p), max(q)) + + # get a recommended number of bins + if max_data_num >= 100: + rec_bins = int(np.sqrt(max_data_num)) + else: + rec_bins = None + + # get the common bin edges + common_p = p + [min_val, max_val] + hist_p, bin_edges = np.histogram(common_p, bins=rec_bins) + # restore the hist of the original p + hist_p[0] -= 1 + hist_p[-1] -= 1 + # get the hist of the original q using the common bin edges + hist_q, _ = np.histogram(q, bins=bin_edges) + return hist_p, hist_q, bin_edges + + @staticmethod + def category_to_hist(self, p, q): + + def flatten_list(lst): + res = [] + for s in lst: + if isinstance(s, list): + res.extend(flatten_list(s)) + else: + res.append(s) + return res + + # flatten the list + p = flatten_list(p) + q = flatten_list(q) + + # get the common categories + cat_p = set(p) + cat_q = set(q) + cat_common = cat_p.union(cat_q) + + # get category distributions + count_p = {cat: 0 for cat in cat_common} + count_q = {cat: 0 for cat in cat_common} + for cat in p: + count_p[cat] += 1 + for cat in q: + count_q[cat] += 1 + + # only keep distribution values sorted by counts + sorted_cat = list(count_p.items()) + sorted_cat.sort(key=lambda it: it[1], reverse=True) + sorted_cat = [it[0] for it in sorted_cat] + # get the value dist + hist_p = [count_p[cat] for cat in sorted_cat] + hist_q = [count_p[cat] for cat in sorted_cat] + + return hist_p, hist_q, count_p, count_q, sorted_cat + + def measure(self, p, q): + """ + :param p: the first feature or distribution. (stats/tags/categories) + :param q: the second feature or distribution. (stats/tags/categories) + :return: the T-Test results object -- ([ref](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats._result_classes.TtestResult.html#scipy.stats._result_classes.TtestResult)) # noqa: E501 + """ + ele = p[0] + while isinstance(ele, list): + ele = ele[0] + if isinstance(ele, str): + # discrete tags or categories + hist_p, hist_q = self.category_to_hist(p, q)[:2] + else: + # continuous stats + hist_p, hist_q = self.stats_to_hist(p, q)[:2] + + # compute the t-test and pval for hist_p and hist_q + ttest_res = stats.ttest_rel(hist_p, hist_q) + return ttest_res From 7b8ee5c602bc89a1744ca71ce55c8d68fff3165a Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Mon, 16 Dec 2024 20:46:42 +0800 Subject: [PATCH 09/22] * fix some bugs --- data_juicer/analysis/measure.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/data_juicer/analysis/measure.py b/data_juicer/analysis/measure.py index a6d9e1212..115b8c735 100644 --- a/data_juicer/analysis/measure.py +++ b/data_juicer/analysis/measure.py @@ -136,7 +136,10 @@ class RelatedTTestMeasure(Measure): name = 't-test' @staticmethod - def stats_to_hist(self, p, q): + def stats_to_hist(p, q): + p = np.array(p) + q = np.array(q) + # get common maximum number of data samples, and max/min values max_data_num = max(len(p), len(q)) min_val = min(min(p), min(q)) @@ -149,7 +152,7 @@ def stats_to_hist(self, p, q): rec_bins = None # get the common bin edges - common_p = p + [min_val, max_val] + common_p = np.append(p, [min_val, max_val]) hist_p, bin_edges = np.histogram(common_p, bins=rec_bins) # restore the hist of the original p hist_p[0] -= 1 @@ -159,7 +162,7 @@ def stats_to_hist(self, p, q): return hist_p, hist_q, bin_edges @staticmethod - def category_to_hist(self, p, q): + def category_to_hist(p, q): def flatten_list(lst): res = [] @@ -193,7 +196,7 @@ def flatten_list(lst): sorted_cat = [it[0] for it in sorted_cat] # get the value dist hist_p = [count_p[cat] for cat in sorted_cat] - hist_q = [count_p[cat] for cat in sorted_cat] + hist_q = [count_q[cat] for cat in sorted_cat] return hist_p, hist_q, count_p, count_q, sorted_cat From 601d9a25768de85951c9cac31364385b0afc89f3 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Tue, 17 Dec 2024 17:01:45 +0800 Subject: [PATCH 10/22] - support analyze a dataset object - optimize the logics of loading filters that produce stats and updating attributes of OPs --- data_juicer/config/config.py | 80 ++++++++++++++++++++++-------------- data_juicer/core/analyzer.py | 7 +++- 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index d36ac2b54..004f87028 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -290,6 +290,21 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None): help='Number of samples extracted by tracer to show the dataset ' 'difference before and after a op. Only available when ' 'open_tracer is true.') + parser.add_argument( + '--open_insight_mining', + type=bool, + default=False, + help='Whether to open insight mining to trace the OP-wise stats/tags ' + 'changes during process. It might take more time when opening ' + 'insight mining.') + parser.add_argument( + '--filter_list_to_mine', + type=List[str], + default=[], + help='Which Filters will be applied on the dataset to mine the ' + 'insights in their stats changes. If it\'s empty, all Filters ' + 'that produce stats will be involved. Only available when ' + 'filter_list_to_mine is true.') parser.add_argument( '--op_fusion', type=bool, @@ -513,13 +528,7 @@ def init_setup_from_cfg(cfg: Namespace): # add all filters that produce stats if cfg.auto: - import pkgutil - - import data_juicer.ops.filter as djfilters - cfg.process = [{ - filter_name: {} - } for _, filter_name, _ in pkgutil.iter_modules(djfilters.__path__) - if filter_name not in djfilters.NON_STATS_FILTERS] + cfg.process = load_filters_with_stats() # Apply text_key modification during initializing configs # users can freely specify text_key for different ops using `text_key` @@ -528,34 +537,45 @@ def init_setup_from_cfg(cfg: Namespace): text_key = cfg.text_keys[0] else: text_key = cfg.text_keys - for op in cfg.process: + op_attrs = { + 'text_key': text_key, + 'image_key': cfg.image_key, + 'audio_key': cfg.audio_key, + 'video_key': cfg.video_key, + 'num_proc': cfg.np, + 'turbo': cfg.turbo, + } + cfg.process = update_op_attr(cfg.process, op_attrs) + + return cfg + + +def load_filters_with_stats(): + import pkgutil + + import data_juicer.ops.filter as djfilters + return [{ + filter_name: {} + } for _, filter_name, _ in pkgutil.iter_modules(djfilters.__path__) + if filter_name not in djfilters.NON_STATS_FILTERS] + + +def update_op_attr(op_list: list, attr_dict: dict = None): + if not attr_dict: + return op_list + updated_op_list = [] + for op in op_list: for op_name in op: args = op[op_name] if args is None: - args = { - 'text_key': text_key, - 'image_key': cfg.image_key, - 'audio_key': cfg.audio_key, - 'video_key': cfg.video_key, - 'num_proc': cfg.np, - 'turbo': cfg.turbo, - } + args = attr_dict else: - if 'text_key' not in args or args['text_key'] is None: - args['text_key'] = text_key - if 'image_key' not in args or args['image_key'] is None: - args['image_key'] = cfg.image_key - if 'audio_key' not in args or args['audio_key'] is None: - args['audio_key'] = cfg.audio_key - if 'video_key' not in args or args['video_key'] is None: - args['video_key'] = cfg.video_key - if 'num_proc' not in args or args['num_proc'] is None: - args['num_proc'] = cfg.np - if 'turbo' not in args or args['turbo'] is None: - args['turbo'] = cfg.turbo + for key in attr_dict: + if key not in args or args[key] is None: + args[key] = attr_dict[key] op[op_name] = args - - return cfg + updated_op_list.append(op) + return updated_op_list def _collect_config_info_from_class_docs(configurable_ops, parser): diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index 63e512d41..c529229aa 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -1,6 +1,7 @@ import os from typing import Optional +from datasets import Dataset from jsonargparse import Namespace from loguru import logger from pydantic import PositiveInt @@ -13,6 +14,7 @@ from data_juicer.utils import cache_utils from .adapter import Adapter +from .data import NestedDataset from .exporter import Exporter @@ -71,12 +73,14 @@ def __init__(self, cfg: Optional[Namespace] = None): self.analysis_path = os.path.join(self.cfg.work_dir, 'analysis') def run(self, + dataset: Optional[Dataset, NestedDataset] = None, load_data_np: Optional[PositiveInt] = None, skip_export: bool = False, skip_return: bool = False): """ Running the dataset analysis pipeline. + :param dataset: a Dataset object to be analyzed. :param load_data_np: number of workers when loading the dataset. :param skip_export: whether export the results into disk :param skip_return: skip return for API called. @@ -86,7 +90,8 @@ def run(self, logger.info('Loading dataset from data formatter...') if load_data_np is None: load_data_np = self.cfg.np - dataset = self.formatter.load_dataset(load_data_np, self.cfg) + if dataset is None: + dataset = self.formatter.load_dataset(load_data_np, self.cfg) if self.cfg.auto: # if it's auto analysis, only analyze for a minor part of the input # dataset to save time and computing resource From 34f2ab6bd8d8b1ffb5b75bb01f90b3304a11ba05 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Tue, 17 Dec 2024 17:59:47 +0800 Subject: [PATCH 11/22] - support analysis on tags in meta --- data_juicer/analysis/column_wise_analysis.py | 20 ++++++++++++++------ data_juicer/analysis/overall_analysis.py | 17 ++++++++++++++--- data_juicer/utils/constant.py | 6 +++++- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/data_juicer/analysis/column_wise_analysis.py b/data_juicer/analysis/column_wise_analysis.py index 825d9b4dd..1a13a4606 100644 --- a/data_juicer/analysis/column_wise_analysis.py +++ b/data_juicer/analysis/column_wise_analysis.py @@ -6,7 +6,7 @@ from tqdm import tqdm from wordcloud import WordCloud -from data_juicer.utils.constant import Fields +from data_juicer.utils.constant import DEFAULT_PREFIX, Fields from .overall_analysis import OverallAnalysis @@ -70,6 +70,13 @@ def __init__(self, stats into one image file """ self.stats = pd.DataFrame(dataset[Fields.stats]) + self.meta = pd.DataFrame(dataset[Fields.meta]) + # remove non-tag columns + meta_columns = self.meta.columns + for col_name in meta_columns: + if not col_name.startswith(DEFAULT_PREFIX) \ + and not col_name.endswith('_tags__'): + self.meta = self.meta.drop(col_name, axis=1) self.output_path = output_path if not os.path.exists(self.output_path): os.makedirs(self.output_path) @@ -101,8 +108,9 @@ def analyze(self, show_percentiles=False, show=False, skip_export=False): width_unit = 4 height_unit = 6 - columns = self.stats.columns - num = len(columns) + stats_and_meta = pd.concat([self.stats, self.meta], axis=1) + all_columns = stats_and_meta.columns + num = len(all_columns) # get the recommended "best" number of columns and rows rec_row, rec_col, grid_indexes = get_row_col(num, num_subcol) @@ -115,9 +123,9 @@ def analyze(self, show_percentiles=False, show=False, skip_export=False): fig = plt.figure(figsize=(rec_width, rec_height), layout='constrained') subfigs = fig.subfigures(rec_row, rec_col, wspace=0.01) - for i, column_name in enumerate(tqdm(columns.to_list(), - desc='Column')): - data = self.stats[column_name] + for i, column_name in enumerate( + tqdm(all_columns.to_list(), desc='Column')): + data = stats_and_meta[column_name] # explode data to flatten inner list data = data.explode().infer_objects() grid = grid_indexes[i] diff --git a/data_juicer/analysis/overall_analysis.py b/data_juicer/analysis/overall_analysis.py index 04eefb178..9f7e78291 100644 --- a/data_juicer/analysis/overall_analysis.py +++ b/data_juicer/analysis/overall_analysis.py @@ -5,7 +5,7 @@ from loguru import logger from tqdm import tqdm -from data_juicer.utils.constant import Fields +from data_juicer.utils.constant import DEFAULT_PREFIX, Fields def _single_column_analysis(col, *args, **kwargs): @@ -25,6 +25,13 @@ def __init__(self, dataset, output_path): :param output_path: path to store the analysis results. """ self.stats = pd.DataFrame(dataset[Fields.stats]) + self.meta = pd.DataFrame(dataset[Fields.meta]) + # remove non-tag columns + meta_columns = self.meta.columns + for col_name in meta_columns: + if not col_name.startswith(DEFAULT_PREFIX) \ + and not col_name.endswith('_tags__'): + self.meta = self.meta.drop(col_name, axis=1) self.output_path = output_path if not os.path.exists(self.output_path): os.makedirs(self.output_path) @@ -71,10 +78,14 @@ def analyze(self, percentiles=[], num_proc=1, skip_export=False): # merge default and customized percentiles and get overall information percentiles = list(set(percentiles + self.default_percentiles)) + # merge stats and meta + stats_and_meta = pd.concat([self.stats, self.meta], axis=1) + all_columns = stats_and_meta.columns + results = [] pool = Pool(num_proc) - for col_name in self.stats.columns: - this_col = self.refine_single_column(self.stats[col_name]) + for col_name in all_columns: + this_col = self.refine_single_column(stats_and_meta[col_name]) res = pool.apply_async(_single_column_analysis, kwds={ 'col': this_col, diff --git a/data_juicer/utils/constant.py b/data_juicer/utils/constant.py index 8bc78ad5a..300672a58 100644 --- a/data_juicer/utils/constant.py +++ b/data_juicer/utils/constant.py @@ -16,13 +16,17 @@ class Fields(object): context = DEFAULT_PREFIX + 'context__' suffix = DEFAULT_PREFIX + 'suffix__' - video_frames = DEFAULT_PREFIX + 'video_frames__' + # tags # video_frame_tags video_frame_tags = DEFAULT_PREFIX + 'video_frame_tags__' + # video_audio_tags video_audio_tags = DEFAULT_PREFIX + 'video_audio_tags__' # image_tags image_tags = DEFAULT_PREFIX + 'image_tags__' + # video_frames + video_frames = DEFAULT_PREFIX + 'video_frames__' + # the name of the original file from which this sample was derived. source_file = DEFAULT_PREFIX + 'source_file__' From 8531a01a670be9f9feb3b026f12f522695f9372a Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Tue, 17 Dec 2024 19:46:35 +0800 Subject: [PATCH 12/22] - support analysis with tagging OPs --- data_juicer/config/config.py | 7 ++++--- data_juicer/core/analyzer.py | 10 +++++++--- data_juicer/ops/__init__.py | 17 +++++++++++++++++ data_juicer/ops/filter/__init__.py | 7 ------- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index 004f87028..dbc731d6a 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -553,11 +553,12 @@ def init_setup_from_cfg(cfg: Namespace): def load_filters_with_stats(): import pkgutil - import data_juicer.ops.filter as djfilters + import data_juicer.ops.filter as djfilter + from data_juicer.ops import NON_STATS_FILTERS return [{ filter_name: {} - } for _, filter_name, _ in pkgutil.iter_modules(djfilters.__path__) - if filter_name not in djfilters.NON_STATS_FILTERS] + } for _, filter_name, _ in pkgutil.iter_modules(djfilter.__path__) + if filter_name not in NON_STATS_FILTERS] def update_op_attr(op_list: list, attr_dict: dict = None): diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index c529229aa..74d26a7ce 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -9,7 +9,7 @@ from data_juicer.analysis import ColumnWiseAnalysis, OverallAnalysis from data_juicer.config import init_configs from data_juicer.format import load_formatter -from data_juicer.ops import Filter, load_ops +from data_juicer.ops import TAGGING_OPS, Filter, load_ops from data_juicer.ops.op_fusion import fuse_operators from data_juicer.utils import cache_utils @@ -122,9 +122,13 @@ def run(self, dataset = dataset.process(op, work_dir=self.work_dir) op.process = original_process stats_collected = True + elif op._name in TAGGING_OPS: + dataset = dataset.process(op, work_dir=self.work_dir) + stats_collected = True if not stats_collected: - logger.warning('No stats collected. Please add some Filter ops to ' - 'the process list in configs.') + logger.warning( + 'No stats/meta collected. Please add some Filter OPs or ' + 'Tagging OPs to the process list in configs.') return dataset # 3. data export diff --git a/data_juicer/ops/__init__.py b/data_juicer/ops/__init__.py index c7ab44c25..659581654 100644 --- a/data_juicer/ops/__init__.py +++ b/data_juicer/ops/__init__.py @@ -9,4 +9,21 @@ 'Mapper', 'Deduplicator', 'Selector', + 'NON_STATS_FILTERS', + 'TAGGING_OPS', ] + +# Filters that don't produce any stats +NON_STATS_FILTERS = { + 'specified_field_filter', + 'specified_numeric_field_filter', + 'suffix_filter', + 'video_tagging_from_frames_filter', +} + +# OPs that will produce tags in meta +TAGGING_OPS = { + 'video_tagging_from_frames_filter', + 'video_tagging_from_audio_mapper', + 'image_tagging_mapper', +} diff --git a/data_juicer/ops/filter/__init__.py b/data_juicer/ops/filter/__init__.py index 8cb986b2b..dad6818e1 100644 --- a/data_juicer/ops/filter/__init__.py +++ b/data_juicer/ops/filter/__init__.py @@ -63,10 +63,3 @@ 'VideoTaggingFromFramesFilter', 'VideoWatermarkFilter', 'WordRepetitionFilter', 'WordsNumFilter' ] - -NON_STATS_FILTERS = [ - 'specified_field_filter', - 'specified_numeric_field_filter', - 'suffix_filter', - 'video_tagging_from_frames_filter', -] From 4d6b7014ae86a4784a54eacd8b142c6985c44f40 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Wed, 18 Dec 2024 10:05:46 +0800 Subject: [PATCH 13/22] - move tags into the meta field --- data_juicer/config/config.py | 2 +- data_juicer/core/analyzer.py | 2 +- data_juicer/ops/__init__.py | 21 ++----------------- data_juicer/ops/base_op.py | 13 ++++++++++++ .../ops/filter/specified_field_filter.py | 7 +++++-- .../filter/specified_numeric_field_filter.py | 8 +++++-- data_juicer/ops/filter/suffix_filter.py | 7 +++++-- .../video_tagging_from_frames_filter.py | 5 ++++- .../ops/mapper/image_tagging_mapper.py | 10 +++++---- .../mapper/video_tagging_from_audio_mapper.py | 11 ++++++---- .../video_tagging_from_frames_mapper.py | 10 +++++---- 11 files changed, 56 insertions(+), 40 deletions(-) diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index dbc731d6a..1de3b7a10 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -558,7 +558,7 @@ def load_filters_with_stats(): return [{ filter_name: {} } for _, filter_name, _ in pkgutil.iter_modules(djfilter.__path__) - if filter_name not in NON_STATS_FILTERS] + if filter_name not in NON_STATS_FILTERS.modules] def update_op_attr(op_list: list, attr_dict: dict = None): diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index 74d26a7ce..69ab8d8f3 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -122,7 +122,7 @@ def run(self, dataset = dataset.process(op, work_dir=self.work_dir) op.process = original_process stats_collected = True - elif op._name in TAGGING_OPS: + elif op._name in TAGGING_OPS.modules: dataset = dataset.process(op, work_dir=self.work_dir) stats_collected = True if not stats_collected: diff --git a/data_juicer/ops/__init__.py b/data_juicer/ops/__init__.py index 659581654..f512ab89a 100644 --- a/data_juicer/ops/__init__.py +++ b/data_juicer/ops/__init__.py @@ -1,6 +1,6 @@ from . import deduplicator, filter, mapper, selector -from .base_op import (OPERATORS, UNFORKABLE, Deduplicator, Filter, Mapper, - Selector) +from .base_op import (NON_STATS_FILTERS, OPERATORS, TAGGING_OPS, UNFORKABLE, + Deduplicator, Filter, Mapper, Selector) from .load import load_ops __all__ = [ @@ -9,21 +9,4 @@ 'Mapper', 'Deduplicator', 'Selector', - 'NON_STATS_FILTERS', - 'TAGGING_OPS', ] - -# Filters that don't produce any stats -NON_STATS_FILTERS = { - 'specified_field_filter', - 'specified_numeric_field_filter', - 'suffix_filter', - 'video_tagging_from_frames_filter', -} - -# OPs that will produce tags in meta -TAGGING_OPS = { - 'video_tagging_from_frames_filter', - 'video_tagging_from_audio_mapper', - 'image_tagging_mapper', -} diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index 72618a6bb..d7d78d8e2 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -14,6 +14,8 @@ OPERATORS = Registry('Operators') UNFORKABLE = Registry('Unforkable') +NON_STATS_FILTERS = Registry('Non-stats Filters') +TAGGING_OPS = Registry('Tagging Operators') def convert_list_dict_to_dict_list(samples): @@ -216,6 +218,17 @@ def run(self, dataset): from data_juicer.core.data import NestedDataset if not isinstance(dataset, NestedDataset): dataset = NestedDataset(dataset) + if self._name in TAGGING_OPS.modules \ + and Fields.meta not in dataset.features: + from data_juicer.core.data import add_same_content_to_new_column + dataset = dataset.map(add_same_content_to_new_column, + fn_kwargs={ + 'new_column_name': Fields.meta, + 'initial_value': {} + }, + num_proc=self.runtime_np(), + batch_size=self.batch_size, + desc='Adding new column for meta') return dataset def empty_history(self): diff --git a/data_juicer/ops/filter/specified_field_filter.py b/data_juicer/ops/filter/specified_field_filter.py index 86aff2426..41addf8da 100644 --- a/data_juicer/ops/filter/specified_field_filter.py +++ b/data_juicer/ops/filter/specified_field_filter.py @@ -1,9 +1,12 @@ from typing import List -from ..base_op import OPERATORS, Filter +from ..base_op import NON_STATS_FILTERS, OPERATORS, Filter +OP_NAME = 'specified_field_filter' -@OPERATORS.register_module('specified_field_filter') + +@NON_STATS_FILTERS.register_module(OP_NAME) +@OPERATORS.register_module(OP_NAME) class SpecifiedFieldFilter(Filter): """ Filter based on specified field information. diff --git a/data_juicer/ops/filter/specified_numeric_field_filter.py b/data_juicer/ops/filter/specified_numeric_field_filter.py index 693be3392..c7a1d301a 100644 --- a/data_juicer/ops/filter/specified_numeric_field_filter.py +++ b/data_juicer/ops/filter/specified_numeric_field_filter.py @@ -1,6 +1,6 @@ import sys -from ..base_op import OPERATORS, Filter +from ..base_op import NON_STATS_FILTERS, OPERATORS, Filter def is_number(s): @@ -13,7 +13,11 @@ def is_number(s): return False -@OPERATORS.register_module('specified_numeric_field_filter') +OP_NAME = 'specified_numeric_field_filter' + + +@NON_STATS_FILTERS.register_module(OP_NAME) +@OPERATORS.register_module(OP_NAME) class SpecifiedNumericFieldFilter(Filter): """ Filter based on specified numeric field information. diff --git a/data_juicer/ops/filter/suffix_filter.py b/data_juicer/ops/filter/suffix_filter.py index ea7868399..7aaca53a7 100644 --- a/data_juicer/ops/filter/suffix_filter.py +++ b/data_juicer/ops/filter/suffix_filter.py @@ -2,10 +2,13 @@ from data_juicer.utils.constant import Fields -from ..base_op import OPERATORS, Filter +from ..base_op import NON_STATS_FILTERS, OPERATORS, Filter +OP_NAME = 'suffix_filter' -@OPERATORS.register_module('suffix_filter') + +@NON_STATS_FILTERS.register_module(OP_NAME) +@OPERATORS.register_module(OP_NAME) class SuffixFilter(Filter): """Filter to keep samples with specified suffix.""" diff --git a/data_juicer/ops/filter/video_tagging_from_frames_filter.py b/data_juicer/ops/filter/video_tagging_from_frames_filter.py index 8872aab32..bd17bc838 100644 --- a/data_juicer/ops/filter/video_tagging_from_frames_filter.py +++ b/data_juicer/ops/filter/video_tagging_from_frames_filter.py @@ -5,7 +5,8 @@ from data_juicer.utils.constant import Fields -from ..base_op import OPERATORS, UNFORKABLE, Filter +from ..base_op import (NON_STATS_FILTERS, OPERATORS, TAGGING_OPS, UNFORKABLE, + Filter) from ..mapper.video_tagging_from_frames_mapper import \ VideoTaggingFromFramesMapper from ..op_fusion import LOADED_VIDEOS @@ -13,6 +14,8 @@ OP_NAME = 'video_tagging_from_frames_filter' +@NON_STATS_FILTERS.register_module(OP_NAME) +@TAGGING_OPS.register_module(OP_NAME) @UNFORKABLE.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) @LOADED_VIDEOS.register_module(OP_NAME) diff --git a/data_juicer/ops/mapper/image_tagging_mapper.py b/data_juicer/ops/mapper/image_tagging_mapper.py index e3fc46f1b..dc2099b78 100644 --- a/data_juicer/ops/mapper/image_tagging_mapper.py +++ b/data_juicer/ops/mapper/image_tagging_mapper.py @@ -7,7 +7,7 @@ from data_juicer.utils.mm_utils import load_data_with_context, load_image from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, UNFORKABLE, Mapper +from ..base_op import OPERATORS, TAGGING_OPS, UNFORKABLE, Mapper from ..op_fusion import LOADED_IMAGES torch = LazyLoader('torch', 'torch') @@ -16,6 +16,7 @@ OP_NAME = 'image_tagging_mapper' +@TAGGING_OPS.register_module(OP_NAME) @UNFORKABLE.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) @LOADED_IMAGES.register_module(OP_NAME) @@ -47,12 +48,13 @@ def __init__(self, def process_single(self, sample, rank=None, context=False): # check if it's generated already - if self.tag_field_name in sample: + if self.tag_field_name in sample[Fields.meta]: return sample # there is no image in this sample if self.image_key not in sample or not sample[self.image_key]: - sample[self.tag_field_name] = np.array([[]], dtype=np.str_) + sample[Fields.meta][self.tag_field_name] = np.array([[]], + dtype=np.str_) return sample # load images @@ -75,5 +77,5 @@ def process_single(self, sample, rank=None, context=False): sorted_word_list = [item for item, _ in word_count.most_common()] image_tags.append(np.array(sorted_word_list, dtype=np.str_)) - sample[self.tag_field_name] = image_tags + sample[Fields.meta][self.tag_field_name] = image_tags return sample diff --git a/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py b/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py index 2c32093a5..7302953f2 100644 --- a/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py +++ b/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py @@ -6,13 +6,14 @@ from data_juicer.utils.mm_utils import extract_audio_from_video from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, TAGGING_OPS, Mapper torch = LazyLoader('torch', 'torch') OP_NAME = 'video_tagging_from_audio_mapper' +@TAGGING_OPS.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) class VideoTaggingFromAudioMapper(Mapper): """Mapper to generate video tags from audio streams extracted by video @@ -50,12 +51,13 @@ def __init__(self, def process_single(self, sample, rank=None): # check if it's generated already - if self.tag_field_name in sample: + if self.tag_field_name in sample[Fields.meta]: return sample # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: - sample[self.tag_field_name] = np.array([], dtype=np.str_) + sample[Fields.meta][self.tag_field_name] = np.array([], + dtype=np.str_) return sample # load video paths @@ -90,5 +92,6 @@ def process_single(self, sample, rank=None): predicted_tag_id = torch.argmax(logits, dim=-1).item() predicted_tag = model.config.id2label[predicted_tag_id] video_audio_tags.append(predicted_tag) - sample[self.tag_field_name] = np.array(video_audio_tags, dtype=np.str_) + sample[Fields.meta][self.tag_field_name] = np.array(video_audio_tags, + dtype=np.str_) return sample diff --git a/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py b/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py index d4995d3f6..31927e1b2 100644 --- a/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py +++ b/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py @@ -10,7 +10,7 @@ load_data_with_context, load_video) from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, UNFORKABLE, Mapper +from ..base_op import OPERATORS, TAGGING_OPS, UNFORKABLE, Mapper from ..op_fusion import LOADED_VIDEOS ram = LazyLoader('ram', 'ram') @@ -19,6 +19,7 @@ OP_NAME = 'video_tagging_from_frames_mapper' +@TAGGING_OPS.register_module(OP_NAME) @UNFORKABLE.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) @LOADED_VIDEOS.register_module(OP_NAME) @@ -73,12 +74,13 @@ def __init__(self, def process_single(self, sample, rank=None, context=False): # check if it's generated already - if self.tag_field_name in sample: + if self.tag_field_name in sample[Fields.meta]: return sample # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: - sample[self.tag_field_name] = np.array([[]], dtype=np.str_) + sample[Fields.meta][self.tag_field_name] = np.array([[]], + dtype=np.str_) return sample # load videos @@ -115,5 +117,5 @@ def process_single(self, sample, rank=None, context=False): for vid_key in videos: close_video(videos[vid_key]) - sample[self.tag_field_name] = video_tags + sample[Fields.meta][self.tag_field_name] = video_tags return sample From 35aa6bdf8d61878e4bb9de4edcab50736b12a576 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Wed, 18 Dec 2024 17:49:19 +0800 Subject: [PATCH 14/22] - do not tell tags using their suffix - suppress the error/exceptions in Monitor due to the termination of the main process - exported stats file includes meta field in exporter --- data_juicer/analysis/column_wise_analysis.py | 3 +-- data_juicer/analysis/overall_analysis.py | 3 +-- data_juicer/config/config.py | 23 +++++++++++-------- data_juicer/core/analyzer.py | 24 +++++++++++++------- data_juicer/core/executor.py | 1 + data_juicer/core/exporter.py | 11 ++++++--- data_juicer/core/monitor.py | 8 ++++++- data_juicer/ops/base_op.py | 5 +++- data_juicer/utils/constant.py | 2 +- 9 files changed, 52 insertions(+), 28 deletions(-) diff --git a/data_juicer/analysis/column_wise_analysis.py b/data_juicer/analysis/column_wise_analysis.py index 1a13a4606..1b9945ebf 100644 --- a/data_juicer/analysis/column_wise_analysis.py +++ b/data_juicer/analysis/column_wise_analysis.py @@ -74,8 +74,7 @@ def __init__(self, # remove non-tag columns meta_columns = self.meta.columns for col_name in meta_columns: - if not col_name.startswith(DEFAULT_PREFIX) \ - and not col_name.endswith('_tags__'): + if not col_name.startswith(DEFAULT_PREFIX): self.meta = self.meta.drop(col_name, axis=1) self.output_path = output_path if not os.path.exists(self.output_path): diff --git a/data_juicer/analysis/overall_analysis.py b/data_juicer/analysis/overall_analysis.py index 9f7e78291..696b25946 100644 --- a/data_juicer/analysis/overall_analysis.py +++ b/data_juicer/analysis/overall_analysis.py @@ -29,8 +29,7 @@ def __init__(self, dataset, output_path): # remove non-tag columns meta_columns = self.meta.columns for col_name in meta_columns: - if not col_name.startswith(DEFAULT_PREFIX) \ - and not col_name.endswith('_tags__'): + if not col_name.startswith(DEFAULT_PREFIX): self.meta = self.meta.drop(col_name, axis=1) self.output_path = output_path if not os.path.exists(self.output_path): diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index 1de3b7a10..9169f9dc0 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -298,13 +298,14 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None): 'changes during process. It might take more time when opening ' 'insight mining.') parser.add_argument( - '--filter_list_to_mine', + '--op_list_to_mine', type=List[str], default=[], - help='Which Filters will be applied on the dataset to mine the ' - 'insights in their stats changes. If it\'s empty, all Filters ' - 'that produce stats will be involved. Only available when ' - 'filter_list_to_mine is true.') + help='Which OPs will be applied on the dataset to mine the insights ' + 'in their stats changes. Only those OPs that produce stats or ' + 'meta are valid. If it\'s empty, all OPs that produce stats and ' + 'meta will be involved. Only available when filter_list_to_mine ' + 'is true.') parser.add_argument( '--op_fusion', type=bool, @@ -528,7 +529,7 @@ def init_setup_from_cfg(cfg: Namespace): # add all filters that produce stats if cfg.auto: - cfg.process = load_filters_with_stats() + cfg.process = load_ops_with_stats_meta() # Apply text_key modification during initializing configs # users can freely specify text_key for different ops using `text_key` @@ -550,15 +551,17 @@ def init_setup_from_cfg(cfg: Namespace): return cfg -def load_filters_with_stats(): +def load_ops_with_stats_meta(): import pkgutil import data_juicer.ops.filter as djfilter - from data_juicer.ops import NON_STATS_FILTERS - return [{ + from data_juicer.ops import NON_STATS_FILTERS, TAGGING_OPS + stats_filters = [{ filter_name: {} } for _, filter_name, _ in pkgutil.iter_modules(djfilter.__path__) - if filter_name not in NON_STATS_FILTERS.modules] + if filter_name not in NON_STATS_FILTERS.modules] + meta_ops = [{op_name: {}} for op_name in TAGGING_OPS.modules] + return stats_filters + meta_ops def update_op_attr(op_list: list, attr_dict: dict = None): diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index 69ab8d8f3..d9ac586e9 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -1,5 +1,5 @@ import os -from typing import Optional +from typing import Optional, Union from datasets import Dataset from jsonargparse import Namespace @@ -9,7 +9,7 @@ from data_juicer.analysis import ColumnWiseAnalysis, OverallAnalysis from data_juicer.config import init_configs from data_juicer.format import load_formatter -from data_juicer.ops import TAGGING_OPS, Filter, load_ops +from data_juicer.ops import NON_STATS_FILTERS, TAGGING_OPS, Filter, load_ops from data_juicer.ops.op_fusion import fuse_operators from data_juicer.utils import cache_utils @@ -73,7 +73,7 @@ def __init__(self, cfg: Optional[Namespace] = None): self.analysis_path = os.path.join(self.cfg.work_dir, 'analysis') def run(self, - dataset: Optional[Dataset, NestedDataset] = None, + dataset: Union[Dataset, NestedDataset] = None, load_data_np: Optional[PositiveInt] = None, skip_export: bool = False, skip_return: bool = False): @@ -87,11 +87,13 @@ def run(self, :return: analyzed dataset. """ # 1. format data - logger.info('Loading dataset from data formatter...') if load_data_np is None: load_data_np = self.cfg.np if dataset is None: + logger.info('Loading dataset from data formatter...') dataset = self.formatter.load_dataset(load_data_np, self.cfg) + else: + logger.info(f'Using existing dataset {dataset}') if self.cfg.auto: # if it's auto analysis, only analyze for a minor part of the input # dataset to save time and computing resource @@ -116,20 +118,26 @@ def run(self, logger.info('Computing the stats of dataset...') stats_collected = False for op in ops: - if isinstance(op, Filter): + if isinstance(op, Filter) \ + and op._name not in NON_STATS_FILTERS.modules: original_process = op.process op.process = None - dataset = dataset.process(op, work_dir=self.work_dir) + dataset = dataset.process(op, + work_dir=self.work_dir, + open_monitor=self.cfg.open_monitor) op.process = original_process stats_collected = True elif op._name in TAGGING_OPS.modules: - dataset = dataset.process(op, work_dir=self.work_dir) + dataset = dataset.process(op, + work_dir=self.work_dir, + open_monitor=self.cfg.open_monitor) stats_collected = True if not stats_collected: logger.warning( 'No stats/meta collected. Please add some Filter OPs or ' 'Tagging OPs to the process list in configs.') - return dataset + if not skip_return: + return dataset # 3. data export logger.info('Exporting dataset to disk...') diff --git a/data_juicer/core/executor.py b/data_juicer/core/executor.py index f78059247..7f0d93a66 100644 --- a/data_juicer/core/executor.py +++ b/data_juicer/core/executor.py @@ -199,6 +199,7 @@ def run(self, exporter=self.exporter, checkpointer=self.ckpt_manager, tracer=self.tracer, + adapter=self.adapter, open_monitor=self.cfg.open_monitor, ) tend = time() diff --git a/data_juicer/core/exporter.py b/data_juicer/core/exporter.py index 72b555d34..dbdb4fb9f 100644 --- a/data_juicer/core/exporter.py +++ b/data_juicer/core/exporter.py @@ -106,10 +106,15 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True): :param export_stats: whether to export stats of dataset. :return: """ - if Fields.stats in dataset.features and export_stats: + if export_stats: # export stats of datasets into a single file. logger.info('Exporting computed stats into a single file...') - ds_stats = dataset.select_columns(Fields.stats) + export_columns = [] + if Fields.stats in dataset.features: + export_columns.append(Fields.stats) + if Fields.meta in dataset.features: + export_columns.append(Fields.meta) + ds_stats = dataset.select_columns(export_columns) stats_file = export_path.replace('.' + suffix, '_stats.jsonl') Exporter.to_jsonl( ds_stats, @@ -119,7 +124,7 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True): if self.export_ds: # fetch the corresponding export method according to the suffix if not self.keep_stats_in_res_ds: - extra_fields = {Fields.stats} + extra_fields = {Fields.stats, Fields.meta} feature_fields = set(dataset.features.keys()) removed_fields = extra_fields.intersection(feature_fields) dataset = dataset.remove_columns(removed_fields) diff --git a/data_juicer/core/monitor.py b/data_juicer/core/monitor.py index 0210e3732..d5fdee241 100644 --- a/data_juicer/core/monitor.py +++ b/data_juicer/core/monitor.py @@ -15,7 +15,13 @@ def resource_monitor(mdict, interval): while True: this_states.append(Monitor.monitor_current_resources()) time.sleep(interval) - if mdict['stop']: + try: + stop_sign = mdict['stop'] + except (BrokenPipeError, FileNotFoundError): + # mdict crushes due to the main process is terminated already, + # which is not the fault here + return + if stop_sign: break mdict['resource'] = this_states diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index d7d78d8e2..482a56182 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -218,6 +218,7 @@ def run(self, dataset): from data_juicer.core.data import NestedDataset if not isinstance(dataset, NestedDataset): dataset = NestedDataset(dataset) + # add meta field for OPs that produce tags if self._name in TAGGING_OPS.modules \ and Fields.meta not in dataset.features: from data_juicer.core.data import add_same_content_to_new_column @@ -394,7 +395,9 @@ def process_single(self, sample): def run(self, dataset, *, exporter=None, tracer=None, reduce=True): dataset = super(Filter, self).run(dataset) - if Fields.stats not in dataset.features: + # add stats field for Filters that produce stats + if self._name not in NON_STATS_FILTERS.modules \ + and Fields.stats not in dataset.features: from data_juicer.core.data import add_same_content_to_new_column dataset = dataset.map(add_same_content_to_new_column, fn_kwargs={ diff --git a/data_juicer/utils/constant.py b/data_juicer/utils/constant.py index 300672a58..17d4369a1 100644 --- a/data_juicer/utils/constant.py +++ b/data_juicer/utils/constant.py @@ -16,7 +16,7 @@ class Fields(object): context = DEFAULT_PREFIX + 'context__' suffix = DEFAULT_PREFIX + 'suffix__' - # tags + # tags in meta # video_frame_tags video_frame_tags = DEFAULT_PREFIX + 'video_frame_tags__' # video_audio_tags From 85e1392a0192f6ea3371d27d4d6daf9ba1c613f7 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Wed, 18 Dec 2024 19:11:58 +0800 Subject: [PATCH 15/22] - add insight mining --- data_juicer/analysis/column_wise_analysis.py | 5 +- data_juicer/analysis/measure.py | 5 +- data_juicer/core/adapter.py | 85 +++++++++++++++++++- data_juicer/core/data.py | 30 ++++++- 4 files changed, 113 insertions(+), 12 deletions(-) diff --git a/data_juicer/analysis/column_wise_analysis.py b/data_juicer/analysis/column_wise_analysis.py index 1b9945ebf..ce5b3617d 100644 --- a/data_juicer/analysis/column_wise_analysis.py +++ b/data_juicer/analysis/column_wise_analysis.py @@ -217,10 +217,7 @@ def draw_hist(self, ax, data, save_path, percentiles=None, show=False): """ # recommended number of bins data_num = len(data) - if data_num >= 100: - rec_bins = int(math.sqrt(len(data))) - else: - rec_bins = None + rec_bins = max(int(math.sqrt(data_num)), 10) # if ax is None, using plot method in pandas if ax is None: diff --git a/data_juicer/analysis/measure.py b/data_juicer/analysis/measure.py index 115b8c735..8c67330e8 100644 --- a/data_juicer/analysis/measure.py +++ b/data_juicer/analysis/measure.py @@ -146,10 +146,7 @@ def stats_to_hist(p, q): max_val = max(max(p), max(q)) # get a recommended number of bins - if max_data_num >= 100: - rec_bins = int(np.sqrt(max_data_num)) - else: - rec_bins = None + rec_bins = max(int(np.sqrt(max_data_num)), 10) # get the common bin edges common_p = np.append(p, [min_val, max_val]) diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index 5ab6e6ec8..4698f6d73 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -1,8 +1,14 @@ -from datasets import concatenate_datasets +import json +import os +from copy import deepcopy + +from datasets import Dataset, concatenate_datasets from datasets.config import DEFAULT_MAX_BATCH_SIZE +from data_juicer.analysis.measure import RelatedTTestMeasure from data_juicer.core.monitor import Monitor from data_juicer.ops import UNFORKABLE +from data_juicer.utils.constant import Fields from data_juicer.utils.process_utils import setup_mp @@ -12,6 +18,11 @@ class Adapter: def __init__(self, cfg: dict): self.cfg = cfg + + # insight mining related + self.enable_insight_mining = self.cfg.open_insight_mining + + # resource probe related self.idle_resources = Monitor.monitor_current_resources() @staticmethod @@ -177,3 +188,75 @@ def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9): batch_size_per_op.append(bs_this_op) return batch_size_per_op + + def analyze_small_batch(self, dataset, current_state): + # prepare analyzer config + new_cfg = deepcopy(self.cfg) + # check ops to mine + new_cfg.auto = True + new_cfg.config = None + if len(new_cfg.op_list_to_mine) > 0: + new_cfg.process = [{ + op_name: {} + } for op_name in new_cfg.op_list_to_mine] + # update work dir + new_cfg.work_dir = os.path.join(new_cfg.work_dir, 'insight_mining', + current_state) + new_cfg.export_path = os.path.join(new_cfg.work_dir, + f'{current_state}.jsonl') + # close insight mining and monitor for inner analysis + new_cfg.open_insight_mining = False + new_cfg.open_monitor = False + + # init the analyzer + from data_juicer.core.analyzer import Analyzer + analyzer = Analyzer(new_cfg) + + # remove existing stats and meta in dataset + target_fields = {Fields.stats, Fields.meta} + target_fields = target_fields.intersection(set(dataset.features)) + if len(target_fields) > 0: + dataset = dataset.remove_columns(list(target_fields)) + analyzer.run(dataset, skip_return=True) + + def insight_mining(self, pval_th=0.05): + work_dir = os.path.join(self.cfg.work_dir, 'insight_mining') + res_order = [ + d for d in os.listdir(work_dir) + if os.path.isdir(os.path.join(work_dir, d)) + ] + res_order.sort() + + # collect analysis results + analysis_results = {} + for res_dir in res_order: + res = Dataset.from_json( + os.path.join(work_dir, res_dir, + f'{res_dir}_stats.jsonl')).flatten() + analysis_results[res_dir] = res + + # distribution change significance analysis + ttest_measure = RelatedTTestMeasure() + + sig_res = {} + # i = 0 is the original dataset + for i in range(1, len(res_order)): + prev_res = analysis_results[res_order[i - 1]] + curr_res = analysis_results[res_order[i]] + + # only consider common stats and meta + common_features = list( + set(prev_res.features).intersection(set(curr_res.features))) + curr_sig_res = {} + for feat in common_features: + ttest_res = ttest_measure(prev_res[feat], curr_res[feat]) + curr_sig_res[feat] = { + 't-statistic': ttest_res.statistic, + 'p-value': ttest_res.pvalue, + 'significant': + True if ttest_res.pvalue < pval_th else False, + } + sig_res[res_order[i]] = curr_sig_res + + with open(os.path.join(work_dir, 'insight_mining.json'), 'w') as out: + json.dump(sig_res, out) diff --git a/data_juicer/core/data.py b/data_juicer/core/data.py index 361f6e8a0..d0f8083e1 100644 --- a/data_juicer/core/data.py +++ b/data_juicer/core/data.py @@ -172,6 +172,7 @@ def process( exporter=None, checkpointer=None, tracer=None, + adapter=None, open_monitor=True, ): if operators is None: @@ -185,9 +186,19 @@ def process( if open_monitor: resource_util_list = [] + # whether to enable insight mining + enable_insight_mining = adapter.enable_insight_mining \ + if adapter else False + # record the analysis results of the original dataset + if enable_insight_mining: + logger.info('Analyze small batch for the original dataset for ' + 'insight mining...') + adapter.analyze_small_batch(self, '0_original') + dataset = self + op_num = len(operators) try: - for op in operators: + for idx, op in enumerate(operators, start=1): mp_context = ['forkserver', 'spawn'] if ( op.use_cuda() or op._name in unforkable_operators) else None @@ -211,8 +222,16 @@ def process( if open_monitor: resource_util_list.append(resource_util_per_op) end = time() - logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. ' - f'Left {len(dataset)} samples.') + logger.info( + f'[{idx}/{op_num}] OP [{op._name}] Done in ' + f'{end - start:.3f}s. Left {len(dataset)} samples.') + + # record the analysis results of the current dataset + if enable_insight_mining: + logger.info( + f'Analyze small batch for the current dataset after ' + f'OP [{op._name}] for insight mining...') + adapter.analyze_small_batch(dataset, f'{idx}_{op._name}') except: # noqa: E722 logger.error(f'An error occurred during Op [{op._name}].') traceback.print_exc() @@ -223,6 +242,7 @@ def process( 'last op...') dataset.cleanup_cache_files() checkpointer.save_ckpt(dataset) + # make summarization on the monitor results if work_dir and open_monitor: # get the analyzed version resource_util_list = Monitor.analyze_resource_util_list( @@ -234,6 +254,10 @@ def process( json.dump(resource_util_list, out) Monitor.draw_resource_util_graph(resource_util_list, monitor_dir) + # make summarization on the insight mining results + if work_dir and enable_insight_mining: + logger.info('Insight mining for each OP...') + adapter.insight_mining() return dataset def update_args(self, args, kargs, is_filter=False): From e3d7b8bd6bfa9203994c72b817eaf8f70aa4da44 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Thu, 19 Dec 2024 21:02:53 +0800 Subject: [PATCH 16/22] * resolve the bugs when running insight mining in multiprocessing mode --- data_juicer/core/adapter.py | 39 ++++++++++++++++++-------- data_juicer/utils/cache_utils.py | 47 ++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 11 deletions(-) diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index 4698f6d73..398f4b5be 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -8,6 +8,7 @@ from data_juicer.analysis.measure import RelatedTTestMeasure from data_juicer.core.monitor import Monitor from data_juicer.ops import UNFORKABLE +from data_juicer.utils.cache_utils import dataset_cache_control from data_juicer.utils.constant import Fields from data_juicer.utils.process_utils import setup_mp @@ -119,25 +120,21 @@ def adapt_workloads(self, dataset, operators): return bs_per_op + @dataset_cache_control(on=True) def probe_small_batch(self, dataset, operators): """ Perform small batch pre-execution to probe available resources, current load and estimated OP speed, returning load factors and speed ranks for each OP. - Notice: the probe should be run with cache enabled. + Notice: the probe should be run with cache enabled to avoid removing + the cache files of the input dataset. :param dataset: The dataset to pre-execute small batch on :param operators: The OP list to be pre-execution and probe :return: A list of probe results for each OP and the length of data batch to probe. """ - # record the cache state and enable the cache - from datasets import (disable_caching, enable_caching, - is_caching_enabled) - previous_state = is_caching_enabled() - if not previous_state: - enable_caching() # take a small batch data_batch = self.take_batch(dataset, self.cfg) @@ -146,10 +143,6 @@ def probe_small_batch(self, dataset, operators): # analyze resource utilization analysis_res = Monitor.analyze_resource_util_list(resource_util_list) - # if the cache is disabled before, disable it again - if not previous_state: - disable_caching() - return analysis_res, len(data_batch) def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9): @@ -189,7 +182,21 @@ def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9): return batch_size_per_op + @dataset_cache_control(on=True) def analyze_small_batch(self, dataset, current_state): + """ + Perform small batch analysis to probe the current OP-wise stats/meta + distributions. The analyzed results will be stored in the directory + `{work_dir}/insight_mining`. + + Notice: the probe should be run with cache enabled to avoid removing + the cache files of the input dataset. + + :param dataset: The dataset to analyze small batch on + :param current_state: A string to indicate the current state of the + input dataset. It usually consists of a number of the index of the + OP processed just now and the OP name, e.g. "1_text_length_filter". + """ # prepare analyzer config new_cfg = deepcopy(self.cfg) # check ops to mine @@ -220,6 +227,16 @@ def analyze_small_batch(self, dataset, current_state): analyzer.run(dataset, skip_return=True) def insight_mining(self, pval_th=0.05): + """ + Mining the insights from the OP-wise analysis results. For now, we use + T-Test to check the significance of stats/meta changes before and after + each OP processing. If the p-value is less than a given threshold + (usually 0.05), we think the stats/meta changes are significant. The + insight mining results will be stored in the file + `{work_dir}/insight_mining/insight_mining.json`. + + :param pval_th: the threshold of p-value. + """ work_dir = os.path.join(self.cfg.work_dir, 'insight_mining') res_order = [ d for d in os.listdir(work_dir) diff --git a/data_juicer/utils/cache_utils.py b/data_juicer/utils/cache_utils.py index 7d815db2c..51138d7ed 100644 --- a/data_juicer/utils/cache_utils.py +++ b/data_juicer/utils/cache_utils.py @@ -1,4 +1,7 @@ import os +from functools import wraps + +from datasets import disable_caching, enable_caching, is_caching_enabled # Default cache location DEFAULT_CACHE_HOME = '~/.cache' @@ -21,3 +24,47 @@ DEFAULT_DATA_JUICER_MODELS_CACHE) CACHE_COMPRESS = None + + +class DatasetCacheControl: + """Define a range that change the cache state temporarily.""" + + def __init__(self, on: bool = False): + self.on = on + + def __enter__(self): + """ + Record the original cache state and turn it to the target state. + """ + self.previous_state = is_caching_enabled() + if self.on: + enable_caching() + else: + disable_caching() + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Restore the original cache state. + """ + if self.previous_state: + enable_caching() + else: + disable_caching() + + +def dataset_cache_control(on): + """ + A more easy-to-use decorator for functions that need to control the cache + state temporarily. + """ + + def dataset_cache_decorator(func): + + @wraps(func) + def wrapped_function(*args, **kwargs): + with DatasetCacheControl(on=on): + return func(*args, **kwargs) + + return wrapped_function + + return dataset_cache_decorator From 16ca358d400868e702a2d46946a5db6e29d2d356 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 20 Dec 2024 09:44:24 +0800 Subject: [PATCH 17/22] * update unittests --- .../video_tagging_from_frames_filter.py | 2 +- .../test_video_tagging_from_frames_filter.py | 4 +- tests/ops/mapper/test_image_tagging_mapper.py | 99 ++++---- .../test_video_tagging_from_audio_mapper.py | 2 +- .../test_video_tagging_from_frames_mapper.py | 214 ++++++++++-------- 5 files changed, 174 insertions(+), 147 deletions(-) diff --git a/data_juicer/ops/filter/video_tagging_from_frames_filter.py b/data_juicer/ops/filter/video_tagging_from_frames_filter.py index bd17bc838..2436d886c 100644 --- a/data_juicer/ops/filter/video_tagging_from_frames_filter.py +++ b/data_juicer/ops/filter/video_tagging_from_frames_filter.py @@ -94,7 +94,7 @@ def compute_stats_single(self, sample, rank=None, context=False): return sample def process_single(self, sample, rank=None): - video_tags = sample[self.tag_field_name] + video_tags = sample[Fields.meta][self.tag_field_name] if len(video_tags) <= 0: return True diff --git a/tests/ops/filter/test_video_tagging_from_frames_filter.py b/tests/ops/filter/test_video_tagging_from_frames_filter.py index bc4f67fb4..37cdf2eb9 100644 --- a/tests/ops/filter/test_video_tagging_from_frames_filter.py +++ b/tests/ops/filter/test_video_tagging_from_frames_filter.py @@ -21,8 +21,8 @@ def _run_video_tagging_from_frames_filter(self, target_list, num_proc=1): dataset = Dataset.from_list(source_list) - dataset = dataset.map(op.compute_stats) - dataset = dataset.filter(op.process) + dataset = dataset.map(op.compute_stats, num_proc=num_proc) + dataset = dataset.filter(op.process, num_proc=num_proc) dataset = dataset.select_columns(column_names=['text', 'videos']) res_list = dataset.to_list() self.assertEqual(res_list, target_list) diff --git a/tests/ops/mapper/test_image_tagging_mapper.py b/tests/ops/mapper/test_image_tagging_mapper.py index 9ec3e4d22..24f50190d 100644 --- a/tests/ops/mapper/test_image_tagging_mapper.py +++ b/tests/ops/mapper/test_image_tagging_mapper.py @@ -38,23 +38,26 @@ def test(self): }] tgt_list = [{ 'images': [self.img1_path], - Fields.image_tags: [[ - 'bed', 'bedcover', 'bedroom', 'bedding', 'lamp', 'ceiling', - 'chair', 'pillar', 'comfort', 'side table', 'floor', - 'hardwood floor', 'headboard', 'linen', 'mattress', - 'nightstand', 'picture frame', 'pillow', 'room', 'wall lamp', - 'stool', 'white', 'window', 'wood floor']], + Fields.meta: { + Fields.image_tags: [[ + 'bed', 'bedcover', 'bedroom', 'bedding', 'lamp', 'ceiling', + 'chair', 'pillar', 'comfort', 'side table', 'floor', + 'hardwood floor', 'headboard', 'linen', 'mattress', + 'nightstand', 'picture frame', 'pillow', 'room', 'wall lamp', + 'stool', 'white', 'window', 'wood floor']]}, }, { 'images': [self.img2_path], - Fields.image_tags: [[ - 'advertisement', 'back', 'bus', 'car', 'city bus', - 'city street', 'curb', 'decker bus', 'drive', 'license plate', - 'road', 'street scene', 'tour bus', 'travel', 'white']], + Fields.meta: { + Fields.image_tags: [[ + 'advertisement', 'back', 'bus', 'car', 'city bus', + 'city street', 'curb', 'decker bus', 'drive', 'license plate', + 'road', 'street scene', 'tour bus', 'travel', 'white']]}, }, { 'images': [self.img3_path], - Fields.image_tags: [[ - 'alley', 'black', 'building', 'catch', 'person', 'pavement', - 'photo', 'rain', 'road', 'umbrella', 'walk', 'woman']], + Fields.meta: { + Fields.image_tags: [[ + 'alley', 'black', 'building', 'catch', 'person', 'pavement', + 'photo', 'rain', 'road', 'umbrella', 'walk', 'woman']]}, }] op = ImageTaggingMapper() self._run_image_tagging_mapper(op, ds_list, tgt_list) @@ -67,13 +70,15 @@ def test_no_images(self): }] tgt_list = [{ 'images': [], - Fields.image_tags: [[]], + Fields.meta: { + Fields.image_tags: [[]]}, }, { 'images': [self.img2_path], - Fields.image_tags: [[ - 'advertisement', 'back', 'bus', 'car', 'city bus', - 'city street', 'curb', 'decker bus', 'drive', 'license plate', - 'road', 'street scene', 'tour bus', 'travel', 'white']], + Fields.meta: { + Fields.image_tags: [[ + 'advertisement', 'back', 'bus', 'car', 'city bus', + 'city street', 'curb', 'decker bus', 'drive', 'license plate', + 'road', 'street scene', 'tour bus', 'travel', 'white']]}, }] op = ImageTaggingMapper() self._run_image_tagging_mapper(op, ds_list, tgt_list) @@ -90,23 +95,26 @@ def test_specified_tag_field_name(self): }] tgt_list = [{ 'images': [self.img1_path], - tag_field_name: [[ - 'bed', 'bedcover', 'bedroom', 'bedding', 'lamp', 'ceiling', - 'chair', 'pillar', 'comfort', 'side table', 'floor', - 'hardwood floor', 'headboard', 'linen', 'mattress', - 'nightstand', 'picture frame', 'pillow', 'room', 'wall lamp', - 'stool', 'white', 'window', 'wood floor']], + Fields.meta: { + tag_field_name: [[ + 'bed', 'bedcover', 'bedroom', 'bedding', 'lamp', 'ceiling', + 'chair', 'pillar', 'comfort', 'side table', 'floor', + 'hardwood floor', 'headboard', 'linen', 'mattress', + 'nightstand', 'picture frame', 'pillow', 'room', 'wall lamp', + 'stool', 'white', 'window', 'wood floor']]}, }, { 'images': [self.img2_path], - tag_field_name: [[ - 'advertisement', 'back', 'bus', 'car', 'city bus', - 'city street', 'curb', 'decker bus', 'drive', 'license plate', - 'road', 'street scene', 'tour bus', 'travel', 'white']], + Fields.meta: { + tag_field_name: [[ + 'advertisement', 'back', 'bus', 'car', 'city bus', + 'city street', 'curb', 'decker bus', 'drive', 'license plate', + 'road', 'street scene', 'tour bus', 'travel', 'white']]}, }, { 'images': [self.img3_path], - tag_field_name: [[ - 'alley', 'black', 'building', 'catch', 'person', 'pavement', - 'photo', 'rain', 'road', 'umbrella', 'walk', 'woman']], + Fields.meta: { + tag_field_name: [[ + 'alley', 'black', 'building', 'catch', 'person', 'pavement', + 'photo', 'rain', 'road', 'umbrella', 'walk', 'woman']]}, }] op = ImageTaggingMapper(tag_field_name=tag_field_name) self._run_image_tagging_mapper(op, ds_list, tgt_list) @@ -126,23 +134,26 @@ def test_multi_process(self): }] tgt_list = [{ 'images': [self.img1_path], - Fields.image_tags: [[ - 'bed', 'bedcover', 'bedroom', 'bedding', 'lamp', 'ceiling', - 'chair', 'pillar', 'comfort', 'side table', 'floor', - 'hardwood floor', 'headboard', 'linen', 'mattress', - 'nightstand', 'picture frame', 'pillow', 'room', 'wall lamp', - 'stool', 'white', 'window', 'wood floor']], + Fields.meta: { + Fields.image_tags: [[ + 'bed', 'bedcover', 'bedroom', 'bedding', 'lamp', 'ceiling', + 'chair', 'pillar', 'comfort', 'side table', 'floor', + 'hardwood floor', 'headboard', 'linen', 'mattress', + 'nightstand', 'picture frame', 'pillow', 'room', 'wall lamp', + 'stool', 'white', 'window', 'wood floor']]}, }, { 'images': [self.img2_path], - Fields.image_tags: [[ - 'advertisement', 'back', 'bus', 'car', 'city bus', - 'city street', 'curb', 'decker bus', 'drive', 'license plate', - 'road', 'street scene', 'tour bus', 'travel', 'white']], + Fields.meta: { + Fields.image_tags: [[ + 'advertisement', 'back', 'bus', 'car', 'city bus', + 'city street', 'curb', 'decker bus', 'drive', 'license plate', + 'road', 'street scene', 'tour bus', 'travel', 'white']]}, }, { 'images': [self.img3_path], - Fields.image_tags: [[ - 'alley', 'black', 'building', 'catch', 'person', 'pavement', - 'photo', 'rain', 'road', 'umbrella', 'walk', 'woman']], + Fields.meta: { + Fields.image_tags: [[ + 'alley', 'black', 'building', 'catch', 'person', 'pavement', + 'photo', 'rain', 'road', 'umbrella', 'walk', 'woman']]}, }] op = ImageTaggingMapper() self._run_image_tagging_mapper(op, diff --git a/tests/ops/mapper/test_video_tagging_from_audio_mapper.py b/tests/ops/mapper/test_video_tagging_from_audio_mapper.py index 8bbf05933..134712f71 100644 --- a/tests/ops/mapper/test_video_tagging_from_audio_mapper.py +++ b/tests/ops/mapper/test_video_tagging_from_audio_mapper.py @@ -32,7 +32,7 @@ def _run_video_tagging_from_audio_mapper(self, num_proc=1): dataset = Dataset.from_list(source_list) dataset = dataset.map(op.process, num_proc=num_proc) - res_list = dataset.select_columns([tag_field_name])[tag_field_name] + res_list = dataset.flatten().select_columns([f'{Fields.meta}.{tag_field_name}'])[f'{Fields.meta}.{tag_field_name}'] self.assertEqual(res_list, target_list) def test(self): diff --git a/tests/ops/mapper/test_video_tagging_from_frames_mapper.py b/tests/ops/mapper/test_video_tagging_from_frames_mapper.py index 4484df754..7d8f3853c 100644 --- a/tests/ops/mapper/test_video_tagging_from_frames_mapper.py +++ b/tests/ops/mapper/test_video_tagging_from_frames_mapper.py @@ -46,30 +46,33 @@ def test(self): 'text': f'{SpecialTokens.video} 白色的小羊站在一旁讲话。旁边还有两只灰色猫咪和一只拉着灰狼的猫咪。', 'videos': [self.vid1_path], - Fields.video_frame_tags: [[ - 'animal', 'ray', 'text', 'writing', 'yellow', 'game', - 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', - 'sky' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'animal', 'ray', 'text', 'writing', 'yellow', 'game', + 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', + 'sky' + ]]} }, { 'text': f'{SpecialTokens.video} 身穿白色上衣的男子,拿着一个东西,拍打自己的胃部。{SpecialTokens.eoc}', 'videos': [self.vid2_path], - Fields.video_frame_tags: [[ - 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', - 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', - 'ball', 'person' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', + 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', + 'ball', 'person' + ]]} }, { 'text': f'{SpecialTokens.video} 两个长头发的女子正坐在一张圆桌前讲话互动。 {SpecialTokens.eoc}', 'videos': [self.vid3_path], - Fields.video_frame_tags: [[ - 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', - 'conversation', 'round table', 'closet', 'computer', 'girl', - 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', - 'selfie', 'stand' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', + 'conversation', 'round table', 'closet', 'computer', 'girl', + 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', + 'selfie', 'stand' + ]]} }] op = VideoTaggingFromFramesMapper() self._run_video_tagging_from_frames_mapper(op, ds_list, tgt_list) @@ -92,11 +95,12 @@ def test_no_video(self): 'text': f'{SpecialTokens.video} 身穿白色上衣的男子,拿着一个东西,拍打自己的胃部。{SpecialTokens.eoc}', 'videos': [self.vid2_path], - Fields.video_frame_tags: [[ - 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', - 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', - 'ball', 'person' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', + 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', + 'ball', 'person' + ]]} }] op = VideoTaggingFromFramesMapper() self._run_video_tagging_from_frames_mapper(op, ds_list, tgt_list) @@ -120,30 +124,33 @@ def test_specified_tag_field_name(self): 'text': f'{SpecialTokens.video} 白色的小羊站在一旁讲话。旁边还有两只灰色猫咪和一只拉着灰狼的猫咪。', 'videos': [self.vid1_path], - tag_field_name: [[ - 'animal', 'ray', 'text', 'writing', 'yellow', 'game', - 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', - 'sky' - ]] + Fields.meta: { + tag_field_name: [[ + 'animal', 'ray', 'text', 'writing', 'yellow', 'game', + 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', + 'sky' + ]]} }, { 'text': f'{SpecialTokens.video} 身穿白色上衣的男子,拿着一个东西,拍打自己的胃部。{SpecialTokens.eoc}', 'videos': [self.vid2_path], - tag_field_name: [[ - 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', - 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', - 'ball', 'person' - ]] + Fields.meta: { + tag_field_name: [[ + 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', + 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', + 'ball', 'person' + ]]} }, { 'text': f'{SpecialTokens.video} 两个长头发的女子正坐在一张圆桌前讲话互动。 {SpecialTokens.eoc}', 'videos': [self.vid3_path], - tag_field_name: [[ - 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', - 'conversation', 'round table', 'closet', 'computer', 'girl', - 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', - 'selfie', 'stand' - ]] + Fields.meta: { + tag_field_name: [[ + 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', + 'conversation', 'round table', 'closet', 'computer', 'girl', + 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', + 'selfie', 'stand' + ]]} }] op = VideoTaggingFromFramesMapper(tag_field_name=tag_field_name) self._run_video_tagging_from_frames_mapper(op, ds_list, tgt_list) @@ -165,30 +172,33 @@ def test_uniform(self): 'text': f'{SpecialTokens.video} 白色的小羊站在一旁讲话。旁边还有两只灰色猫咪和一只拉着灰狼的猫咪。', 'videos': [self.vid1_path], - Fields.video_frame_tags: [[ - 'cartoon', 'animal', 'anime', 'game', 'screenshot', - 'video game', 'cartoon character', 'robe', 'ray', 'text', - 'writing', 'yellow', 'doll', 'tail', 'sky', 'person']] + Fields.meta: { + Fields.video_frame_tags: [[ + 'cartoon', 'animal', 'anime', 'game', 'screenshot', + 'video game', 'cartoon character', 'robe', 'ray', 'text', + 'writing', 'yellow', 'doll', 'tail', 'sky', 'person']]} }, { 'text': f'{SpecialTokens.video} 身穿白色上衣的男子,拿着一个东西,拍打自己的胃部。{SpecialTokens.eoc}', 'videos': [self.vid2_path], - Fields.video_frame_tags: [[ - 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', - 'hand', 'catch', 'bulletin board', 'Wii', 'cotton candy', - 'tennis racket', 'blind', 'game controller', 'remote', 'stand', - 'video game', 'Wii controller', 'play', 'baseball uniform', - 'toy', 'green']] + Fields.meta: { + Fields.video_frame_tags: [[ + 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', + 'hand', 'catch', 'bulletin board', 'Wii', 'cotton candy', + 'tennis racket', 'blind', 'game controller', 'remote', 'stand', + 'video game', 'Wii controller', 'play', 'baseball uniform', + 'toy', 'green']]} }, { 'text': f'{SpecialTokens.video} 两个长头发的女子正坐在一张圆桌前讲话互动。 {SpecialTokens.eoc}', 'videos': [self.vid3_path], - Fields.video_frame_tags: [[ - 'table', 'sit', 'woman', 'bookshelf', 'conversation', 'person', - 'round table', 'computer', 'girl', 'man', 'closet', 'laptop', - 'stand', 'computer screen', 'talk', 'room', 'stool', 'hand', - 'point' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'table', 'sit', 'woman', 'bookshelf', 'conversation', 'person', + 'round table', 'computer', 'girl', 'man', 'closet', 'laptop', + 'stand', 'computer screen', 'talk', 'room', 'stool', 'hand', + 'point' + ]]} }] op = VideoTaggingFromFramesMapper(frame_sampling_method='uniform', frame_num=10) @@ -216,30 +226,33 @@ def test_multi_process(self): 'text': f'{SpecialTokens.video} 白色的小羊站在一旁讲话。旁边还有两只灰色猫咪和一只拉着灰狼的猫咪。', 'videos': [self.vid1_path], - Fields.video_frame_tags: [[ - 'animal', 'ray', 'text', 'writing', 'yellow', 'game', - 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', - 'sky' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'animal', 'ray', 'text', 'writing', 'yellow', 'game', + 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', + 'sky' + ]]} }, { 'text': f'{SpecialTokens.video} 身穿白色上衣的男子,拿着一个东西,拍打自己的胃部。{SpecialTokens.eoc}', 'videos': [self.vid2_path], - Fields.video_frame_tags: [[ - 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', - 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', - 'ball', 'person' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', + 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', + 'ball', 'person' + ]]} }, { 'text': f'{SpecialTokens.video} 两个长头发的女子正坐在一张圆桌前讲话互动。 {SpecialTokens.eoc}', 'videos': [self.vid3_path], - Fields.video_frame_tags: [[ - 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', - 'conversation', 'round table', 'closet', 'computer', 'girl', - 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', - 'selfie', 'stand' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', + 'conversation', 'round table', 'closet', 'computer', 'girl', + 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', + 'selfie', 'stand' + ]]} }] op = VideoTaggingFromFramesMapper() self._run_video_tagging_from_frames_mapper(op, @@ -268,44 +281,47 @@ def test_multi_chunk(self): 'text': f'{SpecialTokens.video} 白色的小羊站在一旁讲话。旁边还有两只灰色猫咪和一只拉着灰狼的猫咪。{SpecialTokens.eoc}{SpecialTokens.video} 身穿白色上衣的男子,拿着一个东西,拍打自己的胃部。', 'videos': [self.vid1_path, self.vid2_path], - Fields.video_frame_tags: - [[ - 'animal', 'ray', 'text', 'writing', 'yellow', 'game', - 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', - 'sky' - ], [ - 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', - 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', - 'ball', 'person' - ]] + Fields.meta: { + Fields.video_frame_tags: + [[ + 'animal', 'ray', 'text', 'writing', 'yellow', 'game', + 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', + 'sky' + ], [ + 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', + 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', + 'ball', 'person' + ]]} }, { 'text': f'{SpecialTokens.video} 身穿白色上衣的男子,拿着一个东西,拍打自己的胃部。{SpecialTokens.eoc}{SpecialTokens.video} 两个长头发的女子正坐在一张圆桌前讲话互动。 {SpecialTokens.eoc}', 'videos': [self.vid2_path, self.vid3_path], - Fields.video_frame_tags: [[ - 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', - 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', - 'ball', 'person' - ], [ - 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', - 'conversation', 'round table', 'closet', 'computer', 'girl', - 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', - 'selfie', 'stand' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'man', 'shirt', 't shirt', 't-shirt', 'wear', 'white', 'boy', + 'catch', 'hand', 'blind', 'cotton candy', 'tennis racket', + 'ball', 'person' + ], [ + 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', + 'conversation', 'round table', 'closet', 'computer', 'girl', + 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', + 'selfie', 'stand' + ]]} }, { 'text': f'{SpecialTokens.video} 白色的小羊站在一旁讲话。旁边还有两只灰色猫咪和一只拉着灰狼的猫咪。{SpecialTokens.eoc}{SpecialTokens.video} 两个长头发的女子正坐在一张圆桌前讲话互动。 {SpecialTokens.eoc}', 'videos': [self.vid1_path, self.vid3_path], - Fields.video_frame_tags: [[ - 'animal', 'ray', 'text', 'writing', 'yellow', 'game', - 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', - 'sky' - ], [ - 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', - 'conversation', 'round table', 'closet', 'computer', 'girl', - 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', - 'selfie', 'stand' - ]] + Fields.meta: { + Fields.video_frame_tags: [[ + 'animal', 'ray', 'text', 'writing', 'yellow', 'game', + 'screenshot', 'cartoon', 'cartoon character', 'person', 'robe', + 'sky' + ], [ + 'woman', 'table', 'sit', 'person', 'laptop', 'bookshelf', + 'conversation', 'round table', 'closet', 'computer', 'girl', + 'man', 'stool', 'computer screen', 'laugh', 'cabinet', 'hand', + 'selfie', 'stand' + ]]} }] op = VideoTaggingFromFramesMapper() self._run_video_tagging_from_frames_mapper(op, ds_list, tgt_list) From dfb0bca99cd8cb649f31f517e8bc3df9ea1378fa Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 20 Dec 2024 09:56:26 +0800 Subject: [PATCH 18/22] * update unittests --- tests/ops/filter/test_video_tagging_from_frames_filter.py | 4 ++++ tests/ops/mapper/test_image_tagging_mapper.py | 3 +++ tests/ops/mapper/test_video_tagging_from_audio_mapper.py | 3 +++ tests/ops/mapper/test_video_tagging_from_frames_mapper.py | 3 +++ 4 files changed, 13 insertions(+) diff --git a/tests/ops/filter/test_video_tagging_from_frames_filter.py b/tests/ops/filter/test_video_tagging_from_frames_filter.py index 37cdf2eb9..4018136ec 100644 --- a/tests/ops/filter/test_video_tagging_from_frames_filter.py +++ b/tests/ops/filter/test_video_tagging_from_frames_filter.py @@ -6,6 +6,7 @@ from data_juicer.ops.filter.video_tagging_from_frames_filter import \ VideoTaggingFromFramesFilter from data_juicer.utils.mm_utils import SpecialTokens +from data_juicer.utils.constant import Fields from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase class VideoTaggingFromFramesFilterTest(DataJuicerTestCaseBase): @@ -21,6 +22,9 @@ def _run_video_tagging_from_frames_filter(self, target_list, num_proc=1): dataset = Dataset.from_list(source_list) + if Fields.meta not in dataset.features: + dataset = dataset.add_column(name=Fields.meta, + column=[{}] * dataset.num_rows) dataset = dataset.map(op.compute_stats, num_proc=num_proc) dataset = dataset.filter(op.process, num_proc=num_proc) dataset = dataset.select_columns(column_names=['text', 'videos']) diff --git a/tests/ops/mapper/test_image_tagging_mapper.py b/tests/ops/mapper/test_image_tagging_mapper.py index 24f50190d..d2bbddec2 100644 --- a/tests/ops/mapper/test_image_tagging_mapper.py +++ b/tests/ops/mapper/test_image_tagging_mapper.py @@ -24,6 +24,9 @@ def _run_image_tagging_mapper(self, target_list, num_proc=1): dataset = Dataset.from_list(source_list) + if Fields.meta not in dataset.features: + dataset = dataset.add_column(name=Fields.meta, + column=[{}] * dataset.num_rows) dataset = dataset.map(op.process, num_proc=num_proc, with_rank=True) res_list = dataset.to_list() self.assertEqual(res_list, target_list) diff --git a/tests/ops/mapper/test_video_tagging_from_audio_mapper.py b/tests/ops/mapper/test_video_tagging_from_audio_mapper.py index 134712f71..00a376170 100644 --- a/tests/ops/mapper/test_video_tagging_from_audio_mapper.py +++ b/tests/ops/mapper/test_video_tagging_from_audio_mapper.py @@ -31,6 +31,9 @@ def _run_video_tagging_from_audio_mapper(self, tag_field_name=Fields.video_audio_tags, num_proc=1): dataset = Dataset.from_list(source_list) + if Fields.meta not in dataset.features: + dataset = dataset.add_column(name=Fields.meta, + column=[{}] * dataset.num_rows) dataset = dataset.map(op.process, num_proc=num_proc) res_list = dataset.flatten().select_columns([f'{Fields.meta}.{tag_field_name}'])[f'{Fields.meta}.{tag_field_name}'] self.assertEqual(res_list, target_list) diff --git a/tests/ops/mapper/test_video_tagging_from_frames_mapper.py b/tests/ops/mapper/test_video_tagging_from_frames_mapper.py index 7d8f3853c..af4146206 100644 --- a/tests/ops/mapper/test_video_tagging_from_frames_mapper.py +++ b/tests/ops/mapper/test_video_tagging_from_frames_mapper.py @@ -25,6 +25,9 @@ def _run_video_tagging_from_frames_mapper(self, target_list, num_proc=1): dataset = Dataset.from_list(source_list) + if Fields.meta not in dataset.features: + dataset = dataset.add_column(name=Fields.meta, + column=[{}] * dataset.num_rows) dataset = dataset.map(op.process, num_proc=num_proc) res_list = dataset.to_list() self.assertEqual(res_list, target_list) From f8b953907f3d38d4a1452b8a621dcf8c51594bff Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 20 Dec 2024 10:06:15 +0800 Subject: [PATCH 19/22] * update unittests --- tests/ops/mapper/test_video_tagging_from_frames_mapper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/ops/mapper/test_video_tagging_from_frames_mapper.py b/tests/ops/mapper/test_video_tagging_from_frames_mapper.py index af4146206..31fc04c3b 100644 --- a/tests/ops/mapper/test_video_tagging_from_frames_mapper.py +++ b/tests/ops/mapper/test_video_tagging_from_frames_mapper.py @@ -93,7 +93,8 @@ def test_no_video(self): 'text': f'白色的小羊站在一旁讲话。旁边还有两只灰色猫咪和一只拉着灰狼的猫咪。', 'videos': [], - Fields.video_frame_tags: [[]] + Fields.meta: { + Fields.video_frame_tags: [[]]} }, { 'text': f'{SpecialTokens.video} 身穿白色上衣的男子,拿着一个东西,拍打自己的胃部。{SpecialTokens.eoc}', From 45259e55f0de9f2b3afb7dc57c751f954c132b9f Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 20 Dec 2024 10:59:48 +0800 Subject: [PATCH 20/22] * update readme for analyzer --- README.md | 4 +++- README_ZH.md | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d891ac332..586869b0a 100644 --- a/README.md +++ b/README.md @@ -340,7 +340,9 @@ dj-analyze --config configs/demo/analyzer.yaml dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000] ``` -- **Note:** Analyzer only compute stats of Filter ops. So extra Mapper or Deduplicator ops will be ignored in the analysis process. +- **Note:** Analyzer only compute stats for Filters that produce stats or other OPs that produce tags/categories in meta. So other OPs will be ignored in the analysis process. We use the following registries to decorate OPs: + - `NON_STATS_FILTERS`: decorate Filters that **DO NOT** produce any stats. + - `TAGGING_OPS`: decorate OPs that **DO** produce tags/categories in meta field. ### Data Visualization diff --git a/README_ZH.md b/README_ZH.md index 01633731b..42612964a 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -316,7 +316,9 @@ dj-analyze --config configs/demo/analyzer.yaml dj-analyze --auto --dataset_path xx.jsonl [--auto_num 1000] ``` -* **注意**:Analyzer 只计算 Filter 算子的状态,其他的算子(例如 Mapper 和 Deduplicator)会在分析过程中被忽略。 +* **注意**:Analyzer 只用于能在 stats 字段里产出统计信息的 Filter 算子和能在 meta 字段里产出 tags 或类别标签的其他算子。除此之外的其他的算子会在分析过程中被忽略。我们使用以下两种注册器来装饰相关的算子: + * `NON_STATS_FILTERS`:装饰那些**不能**产出任何统计信息的 Filter 算子。 + * `TAGGING_OPS`:装饰那些能在 meta 字段中产出 tags 或类别标签的算子。 ### 数据可视化 From 51f53dc48186a7815c5a7a20c919c76063e1da49 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 20 Dec 2024 15:49:34 +0800 Subject: [PATCH 21/22] * use more detailed key --- data_juicer/core/adapter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index 398f4b5be..64fd622f0 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -268,7 +268,8 @@ def insight_mining(self, pval_th=0.05): for feat in common_features: ttest_res = ttest_measure(prev_res[feat], curr_res[feat]) curr_sig_res[feat] = { - 't-statistic': ttest_res.statistic, + 't-statistic (standardized mean difference)': + ttest_res.statistic, 'p-value': ttest_res.pvalue, 'significant': True if ttest_res.pvalue < pval_th else False, From 58001ca937ff86abc6121215b8aed920bdf8363e Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 20 Dec 2024 15:50:50 +0800 Subject: [PATCH 22/22] + add reference --- data_juicer/analysis/measure.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/data_juicer/analysis/measure.py b/data_juicer/analysis/measure.py index 8c67330e8..bd97e811c 100644 --- a/data_juicer/analysis/measure.py +++ b/data_juicer/analysis/measure.py @@ -128,6 +128,9 @@ class RelatedTTestMeasure(Measure): Measure T-Test for two related distributions on their histogram of the same bins. + Ref: + https://en.wikipedia.org/wiki/Student%27s_t-test + For continuous features or distributions, the input could be dataset stats list. For discrete features or distributions, the input could be the tags or the