From 573c7cfa0d15f770a395efa44724db2031681e29 Mon Sep 17 00:00:00 2001 From: Evgeny Maslov Date: Wed, 3 Feb 2021 13:40:08 +0300 Subject: [PATCH 1/8] Parallel failed version --- .../dataflow/collect_task.py | 221 ++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 veniq/dataset_collection/dataflow/collect_task.py diff --git a/veniq/dataset_collection/dataflow/collect_task.py b/veniq/dataset_collection/dataflow/collect_task.py new file mode 100644 index 00000000..69c743af --- /dev/null +++ b/veniq/dataset_collection/dataflow/collect_task.py @@ -0,0 +1,221 @@ +import hashlib +import os +import re +from argparse import ArgumentParser +from functools import partial +from pathlib import Path + +import javalang +from javalang.parse import parse +import d6tflow +import pandas as pd + +import d6tcollect + +# from veniq.dataset_collection.augmentation import InvocationType +from pebble import ProcessPool +from tqdm import tqdm + +from veniq.ast_framework import AST, ASTNodeType +from veniq.utils.ast_builder import build_ast +from veniq.utils.encoding_detector import read_text_with_autodetected_encoding + +d6tcollect.submit = False + +# columns = [ +# 'project_id', +# 'original_filename', +# 'class_name', +# 'invocation_line_string', +# 'invocation_line_number_in_original_file', +# 'target_method', +# 'target_method_start_line', +# 'extract_method', +# 'extract_method_start_line', +# 'extract_method_end_line', +# 'output_filename', +# 'is_valid_ast', +# 'insertion_start', +# 'insertion_end', +# 'ncss_target', +# 'ncss_extracted', +# 'do_nothing', +# 'ONE_LINE_FUNCTION', +# 'NO_IGNORED_CASES' +# ] + [x for x in InvocationType.list_types()] + + +class TaskAggregatorJavaFiles(d6tflow.tasks.TaskCache): + dir_to_search = d6tflow.Parameter() + dir_to_save = d6tflow.Parameter() + system_cores_qty = d6tflow.IntParameter() + + def run(self): + test_files = set(Path(self.dir_to_search).glob('**/*Test*.java')) + not_test_files = set(Path(self.dir_to_search).glob('**/*.java')) + files_without_tests = list(not_test_files.difference(test_files)) + if not files_without_tests: + raise Exception("No java files were found") + + full_dataset_folder = Path(self.dir_to_save) / 'full_dataset' + self.output_dir = full_dataset_folder / 'output_files' + self.input_dir = full_dataset_folder / 'input_files' + # df = pd.DataFrame(columns=columns) + results = [] + with ProcessPool(system_cores_qty) as executor: + + future = executor.map(TaskPreprocessJavaFile.run, timeout=200, ) + result = future.result() + + # each 100 cycles we dump the results + iteration_cycle = 1000 + iteration_number = 0 + for filename in tqdm(files_without_tests): + try: + print(iteration_number) + iteration_number+=1 + single_file_features = next(result) + print(single_file_features) + single_file_features.outputLoad(cached=False) + results.append(single_file_features) + except Exception as e: + print(e) + # for x in files_without_tests: + # # d = {j: '' for j in columns} + # # d.update({'original_filename': x}) + # # df = df.append(d, ignore_index=True) + # # self.save({'file': str(x)}) + # a = yield TaskPreprocessJavaFile(file=str(x)) + # results.append(a) + self.save({"results": results}) + + +# @d6tflow.requires({'file': TaskAggregatorJavaFiles}) +class TaskPreprocessJavaFile(d6tflow.tasks.TaskCache): + file = d6tflow.Parameter() + + # target_class = d6tflow.targets.TaskPickle + # target_ext = 'pkl' + # persist = ['data'] + def _remove_comments(self, string: str): + pattern = r"(\".*?\"|\'.*?\')|(/\*.*?\*/|//[^\r\n]*$)" + # first group captures quoted strings (double or single) + # second group captures comments (//single-line or /* multi-line */) + regex = re.compile(pattern, re.MULTILINE | re.DOTALL) + + def _replacer(match): + # if the 2nd group (capturing comments) is not None, + # it means we have captured a non-quoted (real) comment string. + if match.group(2) is not None: + # so we will return empty to remove the comment + return "" + else: # otherwise, we will return the 1st group + return match.group(1) # captured quoted-string + + return regex.sub(_replacer, string) + + # def save_text_to_new_file( + # self, + # input_dir: Path, + # text: str, filename: Path): + # # need to avoid situation when filenames are the same + # hash_path = hashlib.sha256(str(filename.parent).encode('utf-8')).hexdigest() + # dst_filename = input_dir / f'{filename.stem}_{hash_path}.java' + # # if not dst_filename.parent.exists(): + # # dst_filename.parent.mkdir(parents=True) + # # if not dst_filename.exists(): + # # with open(dst_filename, 'w', encoding='utf-8') as w: + # # w.write(text) + # + # return dst_filename, text + + @classmethod + def run(self): + # file = self.inputLoad()['file']['file'] + print(self.file) + # file = files['file'].absolute() + # print(f'TaskPreprocessJavaFile {df.shape[0]}') + original_text = read_text_with_autodetected_encoding(str(self.file)) + # remove comments + text_without_comments = self._remove_comments(original_text) + # remove whitespaces + text = "\n".join([ll.rstrip() for ll in text_without_comments.splitlines() if ll.strip()]) + try: + ast = AST.build_from_javalang(parse(text)) + self.save({'text': text, 'ast': ast}) + # return {'text': text, 'ast': ast} + except javalang.parser.JavaSyntaxError: + pass + # for _, x in df.iterrows(): + # # print(x) + # x['output_file'] = 'XXXX' + # original_text = read_text_with_autodetected_encoding(str(self.file)) + # # remove comments + # text_without_comments = self._remove_comments(original_text) + # # remove whitespaces + # text = "\n".join([ll.rstrip() for ll in text_without_comments.splitlines() if ll.strip()]) + # ast = AST.build_from_javalang(parse(text)) + # + # self.save() + + +#@d6tflow.requires({'input1': TaskPreprocessJavaFile}) +# class TaskFindEM(d6tflow.tasks.TaskJson): +# # ast_tree = AST +# # text = d6tflow.Parameter() +# +# def __init__(self, *args, **kwargs): +# kwargs_ = {k: v for k, v in kwargs.items() if k in self.get_param_names()} +# super().__init__(*args, **kwargs_) +# +# def run(self): +# input1 = self.inputLoad()['input1'] +# +# class_name = [x for x in input1['ast_tree'].get_proxy_nodes(ASTNodeType.CLASS_DECLARATION)][0] +# #print(class_name) +# self.save({"class_name": class_name}) + + +if __name__ == '__main__': + # Intelligently rerun workflow after changing parameters + system_cores_qty = os.cpu_count() or 1 + parser = ArgumentParser() + parser.add_argument( + "-d", + "--dir", + required=True, + help="File path to JAVA source code for methods augmentations" + ) + parser.add_argument( + "-o", "--output", + help="Path for file with output results", + default='augmented_data' + ) + parser.add_argument( + "--jobs", + "-j", + type=int, + default=system_cores_qty - 1, + help="Number of processes to spawn. " + "By default one less than number of cores. " + "Be careful to raise it above, machine may stop responding while creating dataset.", + ) + parser.add_argument( + "-z", "--zip", + action='store_true', + help="To zip input and output files." + ) + parser.add_argument( + "-s", "--small_dataset_size", + help="Number of files in small dataset", + default=100, + type=int, + ) + + args = parser.parse_args() + + d6tflow.preview(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) + d6tflow.run(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) + data = TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs).outputLoad(cached=False) + print(data) +# See PyCharm help at https://www.jetbrains.com/help/pycharm/ From dfee4c6368cb6a38fef400cc62c959ebaccb3775 Mon Sep 17 00:00:00 2001 From: Evgeny Maslov Date: Wed, 3 Feb 2021 13:43:50 +0300 Subject: [PATCH 2/8] Non-parallel version --- .../dataflow/collect_task.py | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/veniq/dataset_collection/dataflow/collect_task.py b/veniq/dataset_collection/dataflow/collect_task.py index 69c743af..e84cdaa1 100644 --- a/veniq/dataset_collection/dataflow/collect_task.py +++ b/veniq/dataset_collection/dataflow/collect_task.py @@ -62,31 +62,29 @@ def run(self): self.input_dir = full_dataset_folder / 'input_files' # df = pd.DataFrame(columns=columns) results = [] - with ProcessPool(system_cores_qty) as executor: - - future = executor.map(TaskPreprocessJavaFile.run, timeout=200, ) - result = future.result() - - # each 100 cycles we dump the results - iteration_cycle = 1000 - iteration_number = 0 - for filename in tqdm(files_without_tests): - try: - print(iteration_number) - iteration_number+=1 - single_file_features = next(result) - print(single_file_features) - single_file_features.outputLoad(cached=False) - results.append(single_file_features) - except Exception as e: - print(e) - # for x in files_without_tests: - # # d = {j: '' for j in columns} - # # d.update({'original_filename': x}) - # # df = df.append(d, ignore_index=True) - # # self.save({'file': str(x)}) - # a = yield TaskPreprocessJavaFile(file=str(x)) - # results.append(a) + # with ProcessPool(system_cores_qty) as executor: + # + # future = executor.map(TaskPreprocessJavaFile.run, timeout=200, ) + # result = future.result() + # + # # each 100 cycles we dump the results + # iteration_cycle = 1000 + # iteration_number = 0 + # for filename in tqdm(files_without_tests): + # try: + # # print(filename) + # single_file_features = next(result) + # if single_file_features: + # results.append(single_file_features) + # except Exception as e: + # print(e) + for x in files_without_tests: + # d = {j: '' for j in columns} + # d.update({'original_filename': x}) + # df = df.append(d, ignore_index=True) + # self.save({'file': str(x)}) + a = yield TaskPreprocessJavaFile(file=str(x)) + results.append(a) self.save({"results": results}) @@ -129,7 +127,7 @@ def _replacer(match): # # return dst_filename, text - @classmethod + # @classmethod def run(self): # file = self.inputLoad()['file']['file'] print(self.file) @@ -143,7 +141,6 @@ def run(self): try: ast = AST.build_from_javalang(parse(text)) self.save({'text': text, 'ast': ast}) - # return {'text': text, 'ast': ast} except javalang.parser.JavaSyntaxError: pass # for _, x in df.iterrows(): From b513ef235b0faf4b282641d28ceb2c86e977dbc8 Mon Sep 17 00:00:00 2001 From: Evgeny Maslov Date: Wed, 3 Feb 2021 15:37:41 +0300 Subject: [PATCH 3/8] Global parallel --- .../dataflow/collect_task.py | 127 ++++++++++++------ 1 file changed, 86 insertions(+), 41 deletions(-) diff --git a/veniq/dataset_collection/dataflow/collect_task.py b/veniq/dataset_collection/dataflow/collect_task.py index e84cdaa1..cb949e7a 100644 --- a/veniq/dataset_collection/dataflow/collect_task.py +++ b/veniq/dataset_collection/dataflow/collect_task.py @@ -46,45 +46,48 @@ class TaskAggregatorJavaFiles(d6tflow.tasks.TaskCache): - dir_to_search = d6tflow.Parameter() - dir_to_save = d6tflow.Parameter() - system_cores_qty = d6tflow.IntParameter() + # dir_to_search = d6tflow.Parameter() + # dir_to_save = d6tflow.Parameter() + # system_cores_qty = d6tflow.IntParameter() + # files_without_tests = d6tflow.ListParameter() + file = d6tflow.Parameter() def run(self): - test_files = set(Path(self.dir_to_search).glob('**/*Test*.java')) - not_test_files = set(Path(self.dir_to_search).glob('**/*.java')) - files_without_tests = list(not_test_files.difference(test_files)) - if not files_without_tests: - raise Exception("No java files were found") - - full_dataset_folder = Path(self.dir_to_save) / 'full_dataset' - self.output_dir = full_dataset_folder / 'output_files' - self.input_dir = full_dataset_folder / 'input_files' - # df = pd.DataFrame(columns=columns) - results = [] - # with ProcessPool(system_cores_qty) as executor: - # - # future = executor.map(TaskPreprocessJavaFile.run, timeout=200, ) - # result = future.result() + # test_files = set(Path(self.dir_to_search).glob('**/*Test*.java')) + # not_test_files = set(Path(self.dir_to_search).glob('**/*.java')) + # files_without_tests = list(not_test_files.difference(test_files)) + # if not files_without_tests: + # raise Exception("No java files were found") # - # # each 100 cycles we dump the results - # iteration_cycle = 1000 - # iteration_number = 0 - # for filename in tqdm(files_without_tests): - # try: - # # print(filename) - # single_file_features = next(result) - # if single_file_features: - # results.append(single_file_features) - # except Exception as e: - # print(e) - for x in files_without_tests: - # d = {j: '' for j in columns} - # d.update({'original_filename': x}) - # df = df.append(d, ignore_index=True) - # self.save({'file': str(x)}) - a = yield TaskPreprocessJavaFile(file=str(x)) - results.append(a) + # full_dataset_folder = Path(self.dir_to_save) / 'full_dataset' + # self.output_dir = full_dataset_folder / 'output_files' + # self.input_dir = full_dataset_folder / 'input_files' + # # df = pd.DataFrame(columns=columns) + # results = [] + # # with ProcessPool(system_cores_qty) as executor: + # # + # # future = executor.map(TaskPreprocessJavaFile.run, timeout=200, ) + # # result = future.result() + # # + # # # each 100 cycles we dump the results + # # iteration_cycle = 1000 + # # iteration_number = 0 + # # for filename in tqdm(files_without_tests): + # # try: + # # # print(filename) + # # single_file_features = next(result) + # # if single_file_features: + # # results.append(single_file_features) + # # except Exception as e: + # # print(e) + results = [] + # for x in self.files_without_tests: + # # d = {j: '' for j in columns} + # # d.update({'original_filename': x}) + # # df = df.append(d, ignore_index=True) + # # self.save({'file': str(x)}) + a = yield TaskPreprocessJavaFile(file=str(self.file)) + results.append(a) self.save({"results": results}) @@ -173,6 +176,48 @@ def run(self): # self.save({"class_name": class_name}) +def retunrn(i): + if i == 1: + return [] + + +def get_files(dir_to_search, dir_to_save, system_cores_qty): + test_files = set(Path(dir_to_search).glob('**/*Test*.java')) + not_test_files = set(Path(dir_to_search).glob('**/*.java')) + files_without_tests = list(not_test_files.difference(test_files)) + if not files_without_tests: + raise Exception("No java files were found") + + full_dataset_folder = Path(dir_to_save) / 'full_dataset' + output_dir = full_dataset_folder / 'output_files' + input_dir = full_dataset_folder / 'input_files' + # df = pd.DataFrame(columns=columns) + results = [] + with ProcessPool(system_cores_qty) as executor: + + # d6tflow.preview(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) + # d6tflow.run(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) + # data = TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs).outputLoad(cached=False) + tasks = [TaskAggregatorJavaFiles(file=str(x)) for x in files_without_tests] + future = executor.map(d6tflow.run, tasks, timeout=200, ) + # future = executor.map(retunrn, [x for x in range(4)], ) + result = future.result() + + # each 100 cycles we dump the results + iteration_cycle = 1000 + iteration_number = 0 + for filename in tqdm(files_without_tests): + try: + # print(filename) + next(result) + data = TaskAggregatorJavaFiles(file=str(filename)).outputLoad() + if data: + results.append(data) + except Exception as e: + print(e) + print(results) + + if __name__ == '__main__': # Intelligently rerun workflow after changing parameters system_cores_qty = os.cpu_count() or 1 @@ -210,9 +255,9 @@ def run(self): ) args = parser.parse_args() - - d6tflow.preview(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) - d6tflow.run(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) - data = TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs).outputLoad(cached=False) - print(data) + get_files(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs) + # d6tflow.preview(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) + # d6tflow.run(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) + # data = TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs).outputLoad(cached=False) + # print(data) # See PyCharm help at https://www.jetbrains.com/help/pycharm/ From eb729b49854256783b39f5e5856af79d5157eff4 Mon Sep 17 00:00:00 2001 From: Evgeny Maslov Date: Wed, 3 Feb 2021 18:07:33 +0300 Subject: [PATCH 4/8] Save files and preprocessed --- .../dataflow/collect_task.py | 226 +++++++----------- 1 file changed, 92 insertions(+), 134 deletions(-) diff --git a/veniq/dataset_collection/dataflow/collect_task.py b/veniq/dataset_collection/dataflow/collect_task.py index cb949e7a..20694a29 100644 --- a/veniq/dataset_collection/dataflow/collect_task.py +++ b/veniq/dataset_collection/dataflow/collect_task.py @@ -22,82 +22,36 @@ d6tcollect.submit = False -# columns = [ -# 'project_id', -# 'original_filename', -# 'class_name', -# 'invocation_line_string', -# 'invocation_line_number_in_original_file', -# 'target_method', -# 'target_method_start_line', -# 'extract_method', -# 'extract_method_start_line', -# 'extract_method_end_line', -# 'output_filename', -# 'is_valid_ast', -# 'insertion_start', -# 'insertion_end', -# 'ncss_target', -# 'ncss_extracted', -# 'do_nothing', -# 'ONE_LINE_FUNCTION', -# 'NO_IGNORED_CASES' -# ] + [x for x in InvocationType.list_types()] - -class TaskAggregatorJavaFiles(d6tflow.tasks.TaskCache): - # dir_to_search = d6tflow.Parameter() - # dir_to_save = d6tflow.Parameter() - # system_cores_qty = d6tflow.IntParameter() +class TaskAggregatorJavaFiles(d6tflow.tasks.TaskCSVPandas): + dir_to_search = d6tflow.Parameter() + dir_to_save = d6tflow.Parameter() + system_cores_qty = d6tflow.IntParameter() # files_without_tests = d6tflow.ListParameter() - file = d6tflow.Parameter() - - def run(self): - # test_files = set(Path(self.dir_to_search).glob('**/*Test*.java')) - # not_test_files = set(Path(self.dir_to_search).glob('**/*.java')) - # files_without_tests = list(not_test_files.difference(test_files)) - # if not files_without_tests: - # raise Exception("No java files were found") - # - # full_dataset_folder = Path(self.dir_to_save) / 'full_dataset' - # self.output_dir = full_dataset_folder / 'output_files' - # self.input_dir = full_dataset_folder / 'input_files' - # # df = pd.DataFrame(columns=columns) - # results = [] - # # with ProcessPool(system_cores_qty) as executor: - # # - # # future = executor.map(TaskPreprocessJavaFile.run, timeout=200, ) - # # result = future.result() - # # - # # # each 100 cycles we dump the results - # # iteration_cycle = 1000 - # # iteration_number = 0 - # # for filename in tqdm(files_without_tests): - # # try: - # # # print(filename) - # # single_file_features = next(result) - # # if single_file_features: - # # results.append(single_file_features) - # # except Exception as e: - # # print(e) - results = [] - # for x in self.files_without_tests: - # # d = {j: '' for j in columns} - # # d.update({'original_filename': x}) - # # df = df.append(d, ignore_index=True) - # # self.save({'file': str(x)}) - a = yield TaskPreprocessJavaFile(file=str(self.file)) - results.append(a) - self.save({"results": results}) - - -# @d6tflow.requires({'file': TaskAggregatorJavaFiles}) -class TaskPreprocessJavaFile(d6tflow.tasks.TaskCache): - file = d6tflow.Parameter() + # file = d6tflow.Parameter() + + columns = [ + 'project_id', + 'original_filename', + 'class_name', + 'invocation_line_string', + 'invocation_line_number_in_original_file', + 'target_method', + 'target_method_start_line', + 'extract_method', + 'extract_method_start_line', + 'extract_method_end_line', + 'output_filename', + 'is_valid_ast', + 'insertion_start', + 'insertion_end', + 'ncss_target', + 'ncss_extracted', + 'do_nothing', + 'ONE_LINE_FUNCTION', + 'NO_IGNORED_CASES' + ] # + [x for x in InvocationType.list_types()] - # target_class = d6tflow.targets.TaskPickle - # target_ext = 'pkl' - # persist = ['data'] def _remove_comments(self, string: str): pattern = r"(\".*?\"|\'.*?\')|(/\*.*?\*/|//[^\r\n]*$)" # first group captures quoted strings (double or single) @@ -115,70 +69,75 @@ def _replacer(match): return regex.sub(_replacer, string) - # def save_text_to_new_file( - # self, - # input_dir: Path, - # text: str, filename: Path): - # # need to avoid situation when filenames are the same - # hash_path = hashlib.sha256(str(filename.parent).encode('utf-8')).hexdigest() - # dst_filename = input_dir / f'{filename.stem}_{hash_path}.java' - # # if not dst_filename.parent.exists(): - # # dst_filename.parent.mkdir(parents=True) - # # if not dst_filename.exists(): - # # with open(dst_filename, 'w', encoding='utf-8') as w: - # # w.write(text) - # - # return dst_filename, text - - # @classmethod - def run(self): - # file = self.inputLoad()['file']['file'] - print(self.file) - # file = files['file'].absolute() - # print(f'TaskPreprocessJavaFile {df.shape[0]}') - original_text = read_text_with_autodetected_encoding(str(self.file)) + def _preprocess(self, file): + original_text = read_text_with_autodetected_encoding(str(file)) # remove comments text_without_comments = self._remove_comments(original_text) # remove whitespaces text = "\n".join([ll.rstrip() for ll in text_without_comments.splitlines() if ll.strip()]) - try: - ast = AST.build_from_javalang(parse(text)) - self.save({'text': text, 'ast': ast}) - except javalang.parser.JavaSyntaxError: - pass - # for _, x in df.iterrows(): - # # print(x) - # x['output_file'] = 'XXXX' - # original_text = read_text_with_autodetected_encoding(str(self.file)) - # # remove comments - # text_without_comments = self._remove_comments(original_text) - # # remove whitespaces - # text = "\n".join([ll.rstrip() for ll in text_without_comments.splitlines() if ll.strip()]) + # try: # ast = AST.build_from_javalang(parse(text)) - # - # self.save() + # return {'text': text, 'ast': ast} + # except Exception: + # pass + + return text + def _save_text_to_new_file(self, input_dir: Path, text: str, filename: Path) -> Path: + # need to avoid situation when filenames are the same + hash_path = hashlib.sha256(str(filename.parent).encode('utf-8')).hexdigest() + dst_filename = input_dir / f'{filename.stem}_{hash_path}.java' + if not dst_filename.parent.exists(): + dst_filename.parent.mkdir(parents=True) + if not dst_filename.exists(): + with open(dst_filename, 'w', encoding='utf-8') as w: + w.write(text) -#@d6tflow.requires({'input1': TaskPreprocessJavaFile}) -# class TaskFindEM(d6tflow.tasks.TaskJson): -# # ast_tree = AST -# # text = d6tflow.Parameter() -# -# def __init__(self, *args, **kwargs): -# kwargs_ = {k: v for k, v in kwargs.items() if k in self.get_param_names()} -# super().__init__(*args, **kwargs_) -# -# def run(self): -# input1 = self.inputLoad()['input1'] -# -# class_name = [x for x in input1['ast_tree'].get_proxy_nodes(ASTNodeType.CLASS_DECLARATION)][0] -# #print(class_name) -# self.save({"class_name": class_name}) + return dst_filename + def run(self): + test_files = set(Path(self.dir_to_search).glob('**/*Test*.java')) + not_test_files = set(Path(self.dir_to_search).glob('**/*.java')) + files_without_tests = list(not_test_files.difference(test_files)) + if not files_without_tests: + raise Exception("No java files were found") + + full_dataset_folder = Path(self.dir_to_save) / 'full_dataset' + if not full_dataset_folder.exists(): + full_dataset_folder.mkdir(parents=True) + self.output_dir = full_dataset_folder / 'output_files' + if not self.output_dir.exists(): + self.output_dir.mkdir(parents=True) + self.input_dir = full_dataset_folder / 'input_files' + if not self.input_dir.exists(): + self.input_dir.mkdir(parents=True) + df = pd.DataFrame(columns=['original_filename']) + with ProcessPool(system_cores_qty) as executor: + future = executor.map(self._preprocess, files_without_tests, timeout=200, ) + result = future.result() + for filename in tqdm(files_without_tests): + try: + # print(filename) + # print(2) + text = next(result) + # print(1) + # print(single_file_features) + if text: + df = df.append( + {'original_filename': self._save_text_to_new_file(self.input_dir, text, filename)}, + ignore_index=True + ) + except Exception as e: + print(str(e)) + # for x in self.files_without_tests: + # # d = {j: '' for j in columns} + # # d.update({'original_filename': x}) + # # df = df.append(d, ignore_index=True) + # # self.save({'file': str(x)}) + # a = yield TaskPreprocessJavaFile(file=str(self.file)) + # results.append(a) -def retunrn(i): - if i == 1: - return [] + self.save(data=df) def get_files(dir_to_search, dir_to_save, system_cores_qty): @@ -255,9 +214,8 @@ def get_files(dir_to_search, dir_to_save, system_cores_qty): ) args = parser.parse_args() - get_files(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs) - # d6tflow.preview(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) - # d6tflow.run(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) - # data = TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs).outputLoad(cached=False) - # print(data) -# See PyCharm help at https://www.jetbrains.com/help/pycharm/ + # get_files(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs) + d6tflow.preview(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) + d6tflow.run(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) + data = TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs).outputLoad(cached=False) + print(data) From 535d502a41e65b5e139aa36c72ab5ed6d7e27f86 Mon Sep 17 00:00:00 2001 From: Evgeny Maslov Date: Thu, 4 Feb 2021 17:48:40 +0300 Subject: [PATCH 5/8] Paralllel version but inside task --- .../dataset_collection/dataflow/collectEM.py | 77 +++++++++++++ veniq/dataset_collection/dataflow/main.py | 64 ++++++++++ .../{collect_task.py => preprocess.py} | 109 +----------------- 3 files changed, 144 insertions(+), 106 deletions(-) create mode 100644 veniq/dataset_collection/dataflow/collectEM.py create mode 100644 veniq/dataset_collection/dataflow/main.py rename veniq/dataset_collection/dataflow/{collect_task.py => preprocess.py} (51%) diff --git a/veniq/dataset_collection/dataflow/collectEM.py b/veniq/dataset_collection/dataflow/collectEM.py new file mode 100644 index 00000000..78634c1b --- /dev/null +++ b/veniq/dataset_collection/dataflow/collectEM.py @@ -0,0 +1,77 @@ +from collections import defaultdict +from typing import List, Dict + +import d6tcollect +import d6tflow +# from veniq.dataset_collection.augmentation import InvocationType +from pebble import ProcessPool +from tqdm import tqdm + +from veniq.ast_framework import AST +from veniq.ast_framework import ASTNodeType, ASTNode +from veniq.dataset_collection.augmentation import collect_info_about_functions_without_params +from veniq.dataset_collection.dataflow.preprocess import TaskAggregatorJavaFiles +from veniq.utils.ast_builder import build_ast + +d6tcollect.submit = False + + +@d6tflow.requires({'csv': TaskAggregatorJavaFiles}) +class TaskFindEM(d6tflow.tasks.TaskPickle): + dir_to_search = d6tflow.Parameter() + dir_to_save = d6tflow.Parameter() + system_cores_qty = d6tflow.IntParameter() + + def _find_EMs(self, row): + result_dict = {} + try: + ast = AST.build_from_javalang(build_ast(row['original_filename'])) + classes_declaration = [ + ast.get_subtree(node) + for node in ast.get_root().types + if node.node_type == ASTNodeType.CLASS_DECLARATION + ] + method_declarations: Dict[str, List[ASTNode]] = defaultdict(list) + for class_ast in classes_declaration: + class_declaration = class_ast.get_root() + collect_info_about_functions_without_params(class_declaration, method_declarations) + + methods_list = list(class_declaration.methods) + list(class_declaration.constructors) + for method_node in methods_list: + target_node = ast.get_subtree(method_node) + for method_invoked in target_node.get_proxy_nodes( + ASTNodeType.METHOD_INVOCATION): + extracted_m_decl = method_declarations.get(method_invoked.member, []) + if len(extracted_m_decl) == 1: + result_dict[method_invoked.line] = [target_node, method_invoked, extracted_m_decl] + # print({'em_list': result_dict, 'ast': ast}) + if result_dict: + print(f' ZHOPA {result_dict}') + return [{'em_list': result_dict, 'ast': ast}] + else: + return {} + except Exception: + pass + + return {} + + def run(self): + csv = self.inputLoad()['csv'] + lst = [] + with ProcessPool(self.system_cores_qty) as executor: + rows = [x for _, x in csv.iterrows()] + future = executor.map(self._find_EMs, rows, timeout=200, ) + result = future.result() + for filename in tqdm(rows): + try: + res = next(result) + print(f'HH {res}') + if res: + em_list = res.get('em_list') + # print(res) + if em_list: + lst.append({'original_filename': filename, 'em_list': em_list}) + except Exception as e: + print(str(e)) + print(lst) + self.save({"data": lst}) diff --git a/veniq/dataset_collection/dataflow/main.py b/veniq/dataset_collection/dataflow/main.py new file mode 100644 index 00000000..52fd15c8 --- /dev/null +++ b/veniq/dataset_collection/dataflow/main.py @@ -0,0 +1,64 @@ +import os +from argparse import ArgumentParser + +import d6tcollect +import d6tflow + +from veniq.dataset_collection.dataflow.collectEM import TaskFindEM +from veniq.dataset_collection.dataflow.preprocess import TaskAggregatorJavaFiles + +d6tcollect.submit = False + +if __name__ == '__main__': + system_cores_qty = os.cpu_count() or 1 + parser = ArgumentParser() + parser.add_argument( + "-d", + "--dir", + required=True, + help="File path to JAVA source code for methods augmentations" + ) + parser.add_argument( + "-o", "--output", + help="Path for file with output results", + default='augmented_data' + ) + parser.add_argument( + "--jobs", + "-j", + type=int, + default=system_cores_qty - 1, + help="Number of processes to spawn. " + "By default one less than number of cores. " + "Be careful to raise it above, machine may stop responding while creating dataset.", + ) + parser.add_argument( + "-z", "--zip", + action='store_true', + help="To zip input and output files." + ) + parser.add_argument( + "-s", "--small_dataset_size", + help="Number of files in small dataset", + default=100, + type=int, + ) + + args = parser.parse_args() + d6tflow.preview( + TaskFindEM( + dir_to_search=args.dir, + dir_to_save=args.output, + system_cores_qty=args.jobs)) + d6tflow.run( + TaskFindEM( + dir_to_search=args.dir, + dir_to_save=args.output, + system_cores_qty=args.jobs + )) + data = TaskFindEM( + dir_to_search=args.dir, + dir_to_save=args.output, + system_cores_qty=args.jobs).outputLoad(cached=False) + + print(data) diff --git a/veniq/dataset_collection/dataflow/collect_task.py b/veniq/dataset_collection/dataflow/preprocess.py similarity index 51% rename from veniq/dataset_collection/dataflow/collect_task.py rename to veniq/dataset_collection/dataflow/preprocess.py index 20694a29..385f1b81 100644 --- a/veniq/dataset_collection/dataflow/collect_task.py +++ b/veniq/dataset_collection/dataflow/preprocess.py @@ -1,23 +1,14 @@ import hashlib -import os import re -from argparse import ArgumentParser -from functools import partial from pathlib import Path -import javalang -from javalang.parse import parse +import d6tcollect import d6tflow import pandas as pd - -import d6tcollect - # from veniq.dataset_collection.augmentation import InvocationType from pebble import ProcessPool from tqdm import tqdm -from veniq.ast_framework import AST, ASTNodeType -from veniq.utils.ast_builder import build_ast from veniq.utils.encoding_detector import read_text_with_autodetected_encoding d6tcollect.submit = False @@ -27,8 +18,6 @@ class TaskAggregatorJavaFiles(d6tflow.tasks.TaskCSVPandas): dir_to_search = d6tflow.Parameter() dir_to_save = d6tflow.Parameter() system_cores_qty = d6tflow.IntParameter() - # files_without_tests = d6tflow.ListParameter() - # file = d6tflow.Parameter() columns = [ 'project_id', @@ -112,110 +101,18 @@ def run(self): if not self.input_dir.exists(): self.input_dir.mkdir(parents=True) df = pd.DataFrame(columns=['original_filename']) - with ProcessPool(system_cores_qty) as executor: + with ProcessPool(self.system_cores_qty) as executor: future = executor.map(self._preprocess, files_without_tests, timeout=200, ) result = future.result() for filename in tqdm(files_without_tests): try: - # print(filename) - # print(2) text = next(result) - # print(1) - # print(single_file_features) if text: df = df.append( - {'original_filename': self._save_text_to_new_file(self.input_dir, text, filename)}, + {'original_filename': self._save_text_to_new_file(self.input_dir, text, filename).absolute()}, ignore_index=True ) except Exception as e: print(str(e)) - # for x in self.files_without_tests: - # # d = {j: '' for j in columns} - # # d.update({'original_filename': x}) - # # df = df.append(d, ignore_index=True) - # # self.save({'file': str(x)}) - # a = yield TaskPreprocessJavaFile(file=str(self.file)) - # results.append(a) self.save(data=df) - - -def get_files(dir_to_search, dir_to_save, system_cores_qty): - test_files = set(Path(dir_to_search).glob('**/*Test*.java')) - not_test_files = set(Path(dir_to_search).glob('**/*.java')) - files_without_tests = list(not_test_files.difference(test_files)) - if not files_without_tests: - raise Exception("No java files were found") - - full_dataset_folder = Path(dir_to_save) / 'full_dataset' - output_dir = full_dataset_folder / 'output_files' - input_dir = full_dataset_folder / 'input_files' - # df = pd.DataFrame(columns=columns) - results = [] - with ProcessPool(system_cores_qty) as executor: - - # d6tflow.preview(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) - # d6tflow.run(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) - # data = TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs).outputLoad(cached=False) - tasks = [TaskAggregatorJavaFiles(file=str(x)) for x in files_without_tests] - future = executor.map(d6tflow.run, tasks, timeout=200, ) - # future = executor.map(retunrn, [x for x in range(4)], ) - result = future.result() - - # each 100 cycles we dump the results - iteration_cycle = 1000 - iteration_number = 0 - for filename in tqdm(files_without_tests): - try: - # print(filename) - next(result) - data = TaskAggregatorJavaFiles(file=str(filename)).outputLoad() - if data: - results.append(data) - except Exception as e: - print(e) - print(results) - - -if __name__ == '__main__': - # Intelligently rerun workflow after changing parameters - system_cores_qty = os.cpu_count() or 1 - parser = ArgumentParser() - parser.add_argument( - "-d", - "--dir", - required=True, - help="File path to JAVA source code for methods augmentations" - ) - parser.add_argument( - "-o", "--output", - help="Path for file with output results", - default='augmented_data' - ) - parser.add_argument( - "--jobs", - "-j", - type=int, - default=system_cores_qty - 1, - help="Number of processes to spawn. " - "By default one less than number of cores. " - "Be careful to raise it above, machine may stop responding while creating dataset.", - ) - parser.add_argument( - "-z", "--zip", - action='store_true', - help="To zip input and output files." - ) - parser.add_argument( - "-s", "--small_dataset_size", - help="Number of files in small dataset", - default=100, - type=int, - ) - - args = parser.parse_args() - # get_files(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs) - d6tflow.preview(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) - d6tflow.run(TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs)) - data = TaskAggregatorJavaFiles(dir_to_search=args.dir, dir_to_save=args.output, system_cores_qty=args.jobs).outputLoad(cached=False) - print(data) From 72109cfb8ae7455422d7f945e1b5476746a44c5f Mon Sep 17 00:00:00 2001 From: Evgeny Maslov Date: Thu, 4 Feb 2021 18:40:28 +0300 Subject: [PATCH 6/8] Now joblib is used --- .../dataset_collection/dataflow/collectEM.py | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/veniq/dataset_collection/dataflow/collectEM.py b/veniq/dataset_collection/dataflow/collectEM.py index 78634c1b..736459ca 100644 --- a/veniq/dataset_collection/dataflow/collectEM.py +++ b/veniq/dataset_collection/dataflow/collectEM.py @@ -6,6 +6,8 @@ # from veniq.dataset_collection.augmentation import InvocationType from pebble import ProcessPool from tqdm import tqdm +import numpy as np +from joblib import Parallel, delayed from veniq.ast_framework import AST from veniq.ast_framework import ASTNodeType, ASTNode @@ -17,7 +19,7 @@ @d6tflow.requires({'csv': TaskAggregatorJavaFiles}) -class TaskFindEM(d6tflow.tasks.TaskPickle): +class TaskFindEM(d6tflow.tasks.TaskCache): dir_to_search = d6tflow.Parameter() dir_to_save = d6tflow.Parameter() system_cores_qty = d6tflow.IntParameter() @@ -58,20 +60,9 @@ def _find_EMs(self, row): def run(self): csv = self.inputLoad()['csv'] lst = [] - with ProcessPool(self.system_cores_qty) as executor: - rows = [x for _, x in csv.iterrows()] - future = executor.map(self._find_EMs, rows, timeout=200, ) - result = future.result() - for filename in tqdm(rows): - try: - res = next(result) - print(f'HH {res}') - if res: - em_list = res.get('em_list') - # print(res) - if em_list: - lst.append({'original_filename': filename, 'em_list': em_list}) - except Exception as e: - print(str(e)) - print(lst) - self.save({"data": lst}) + print('csv') + rows = [x for _, x in csv.iterrows()] + + with Parallel(n_jobs=2, require='sharedmem') as parallel: + results = parallel((delayed(self._find_EMs)(a) for a in rows)) + self.save({"data": [x for x in results if x]}) From 4d6eb45e90b1b95ac4fd1cccdebc9c6a3fd89e10 Mon Sep 17 00:00:00 2001 From: Evgeny Maslov Date: Thu, 4 Feb 2021 18:44:49 +0300 Subject: [PATCH 7/8] Fix --- .../dataset_collection/dataflow/collectEM.py | 5 -- veniq/dataset_collection/dataflow/main.py | 8 ++-- .../dataset_collection/dataflow/preprocess.py | 48 +++++++++---------- 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/veniq/dataset_collection/dataflow/collectEM.py b/veniq/dataset_collection/dataflow/collectEM.py index 736459ca..02015f7e 100644 --- a/veniq/dataset_collection/dataflow/collectEM.py +++ b/veniq/dataset_collection/dataflow/collectEM.py @@ -4,9 +4,6 @@ import d6tcollect import d6tflow # from veniq.dataset_collection.augmentation import InvocationType -from pebble import ProcessPool -from tqdm import tqdm -import numpy as np from joblib import Parallel, delayed from veniq.ast_framework import AST @@ -59,8 +56,6 @@ def _find_EMs(self, row): def run(self): csv = self.inputLoad()['csv'] - lst = [] - print('csv') rows = [x for _, x in csv.iterrows()] with Parallel(n_jobs=2, require='sharedmem') as parallel: diff --git a/veniq/dataset_collection/dataflow/main.py b/veniq/dataset_collection/dataflow/main.py index 52fd15c8..cace3ba5 100644 --- a/veniq/dataset_collection/dataflow/main.py +++ b/veniq/dataset_collection/dataflow/main.py @@ -5,7 +5,6 @@ import d6tflow from veniq.dataset_collection.dataflow.collectEM import TaskFindEM -from veniq.dataset_collection.dataflow.preprocess import TaskAggregatorJavaFiles d6tcollect.submit = False @@ -57,8 +56,9 @@ system_cores_qty=args.jobs )) data = TaskFindEM( - dir_to_search=args.dir, - dir_to_save=args.output, - system_cores_qty=args.jobs).outputLoad(cached=False) + dir_to_search=args.dir, + dir_to_save=args.output, + system_cores_qty=args.jobs + ).outputLoad(cached=False) print(data) diff --git a/veniq/dataset_collection/dataflow/preprocess.py b/veniq/dataset_collection/dataflow/preprocess.py index 385f1b81..42a1203b 100644 --- a/veniq/dataset_collection/dataflow/preprocess.py +++ b/veniq/dataset_collection/dataflow/preprocess.py @@ -20,26 +20,26 @@ class TaskAggregatorJavaFiles(d6tflow.tasks.TaskCSVPandas): system_cores_qty = d6tflow.IntParameter() columns = [ - 'project_id', - 'original_filename', - 'class_name', - 'invocation_line_string', - 'invocation_line_number_in_original_file', - 'target_method', - 'target_method_start_line', - 'extract_method', - 'extract_method_start_line', - 'extract_method_end_line', - 'output_filename', - 'is_valid_ast', - 'insertion_start', - 'insertion_end', - 'ncss_target', - 'ncss_extracted', - 'do_nothing', - 'ONE_LINE_FUNCTION', - 'NO_IGNORED_CASES' - ] # + [x for x in InvocationType.list_types()] + 'project_id', + 'original_filename', + 'class_name', + 'invocation_line_string', + 'invocation_line_number_in_original_file', + 'target_method', + 'target_method_start_line', + 'extract_method', + 'extract_method_start_line', + 'extract_method_end_line', + 'output_filename', + 'is_valid_ast', + 'insertion_start', + 'insertion_end', + 'ncss_target', + 'ncss_extracted', + 'do_nothing', + 'ONE_LINE_FUNCTION', + 'NO_IGNORED_CASES' + ] # + [x for x in InvocationType.list_types()] def _remove_comments(self, string: str): pattern = r"(\".*?\"|\'.*?\')|(/\*.*?\*/|//[^\r\n]*$)" @@ -64,11 +64,6 @@ def _preprocess(self, file): text_without_comments = self._remove_comments(original_text) # remove whitespaces text = "\n".join([ll.rstrip() for ll in text_without_comments.splitlines() if ll.strip()]) - # try: - # ast = AST.build_from_javalang(parse(text)) - # return {'text': text, 'ast': ast} - # except Exception: - # pass return text @@ -109,7 +104,8 @@ def run(self): text = next(result) if text: df = df.append( - {'original_filename': self._save_text_to_new_file(self.input_dir, text, filename).absolute()}, + {'original_filename': self._save_text_to_new_file(self.input_dir, text, + filename).absolute()}, ignore_index=True ) except Exception as e: From 8f9d89d4b5034c19226772e406fc8ea87f77c3c6 Mon Sep 17 00:00:00 2001 From: lyriccoder Date: Mon, 8 Feb 2021 15:50:46 +0300 Subject: [PATCH 8/8] Update collectEM.py --- veniq/dataset_collection/dataflow/collectEM.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/veniq/dataset_collection/dataflow/collectEM.py b/veniq/dataset_collection/dataflow/collectEM.py index 02015f7e..15dd4b5a 100644 --- a/veniq/dataset_collection/dataflow/collectEM.py +++ b/veniq/dataset_collection/dataflow/collectEM.py @@ -45,7 +45,7 @@ def _find_EMs(self, row): result_dict[method_invoked.line] = [target_node, method_invoked, extracted_m_decl] # print({'em_list': result_dict, 'ast': ast}) if result_dict: - print(f' ZHOPA {result_dict}') + print(f' FFF {result_dict}') return [{'em_list': result_dict, 'ast': ast}] else: return {}