From bb74a71561eb4f22732eaf14b8d1976fea758b59 Mon Sep 17 00:00:00 2001 From: Rob van der Leek <5324924+robvanderleek@users.noreply.github.com> Date: Sat, 6 Apr 2024 20:12:44 +0200 Subject: [PATCH] Refactored code --- codelimit/commands/report.py | 7 +++- codelimit/common/Scanner.py | 64 +++++++++++++++++---------------- codelimit/common/gsm/matcher.py | 42 +++++++++++----------- codelimit/utils.py | 31 +++++++++------- 4 files changed, 79 insertions(+), 65 deletions(-) diff --git a/codelimit/commands/report.py b/codelimit/commands/report.py index 8c81016..b5795b6 100644 --- a/codelimit/commands/report.py +++ b/codelimit/commands/report.py @@ -22,7 +22,7 @@ def report_command(path: Path, full: bool): if not report: print("[red]No cached report found in current folder[/red]") raise typer.Exit(code=1) - stdout = Console() + units = report.all_report_units_sorted_by_length_asc(30) if len(units) == 0: print( @@ -33,6 +33,11 @@ def report_command(path: Path, full: bool): report_units = units else: report_units = units[0:REPORT_LENGTH] + print_functions(root, units, report_units, full) + + +def print_functions(root, units, report_units, full): + stdout = Console() for unit in report_units: file_path = unit.file if root is None else root.joinpath(unit.file) stdout.print( diff --git a/codelimit/common/Scanner.py b/codelimit/common/Scanner.py index 23d7fc0..0154c08 100644 --- a/codelimit/common/Scanner.py +++ b/codelimit/common/Scanner.py @@ -42,7 +42,6 @@ def scan_codebase(path: Path, cached_report: Union[Report, None] = None) -> Code print_header(cached_report, path) scan_totals = ScanTotals() with Live(refresh_per_second=2) as live: - def add_file_entry(entry: SourceFileEntry): scan_totals.add(entry) table = ScanResultTable(scan_totals) @@ -83,10 +82,10 @@ def print_footer(scan_totals: ScanTotals): def _scan_folder( - codebase: Codebase, - folder: Path, - cached_report: Union[Report, None] = None, - add_file_entry: Union[Callable[[SourceFileEntry], None], None] = None, + codebase: Codebase, + folder: Path, + cached_report: Union[Report, None] = None, + add_file_entry: Union[Callable[[SourceFileEntry], None], None] = None, ): gitignore = _read_gitignore(folder) for root, dirs, files in os.walk(folder.absolute()): @@ -95,7 +94,7 @@ def _scan_folder( for file in files: rel_path = Path(os.path.join(root, file)).relative_to(folder.absolute()) if is_excluded(rel_path) or ( - gitignore is not None and is_excluded_by_gitignore(rel_path, gitignore) + gitignore is not None and is_excluded_by_gitignore(rel_path, gitignore) ): continue try: @@ -103,7 +102,7 @@ def _scan_folder( language = lexer.__class__.name file_path = os.path.join(root, file) if language in LanguageName: - file_entry = _add_file( + file_entry = _scan_file( codebase, lexer, folder, file_path, cached_report ) if add_file_entry: @@ -112,12 +111,12 @@ def _scan_folder( pass -def _add_file( - codebase: Codebase, - lexer: Lexer, - root: Path, - path: str, - cached_report: Union[Report, None] = None, +def _scan_file( + codebase: Codebase, + lexer: Lexer, + root: Path, + path: str, + cached_report: Union[Report, None] = None, ) -> SourceFileEntry: checksum = calculate_checksum(path) rel_path = relpath(path, root) @@ -136,25 +135,28 @@ def _add_file( cached_entry.loc, cached_entry.measurements(), ) - codebase.add_file(entry) - return entry else: - with open(path) as f: - code = f.read() - all_tokens = lex(lexer, code, False) - code_tokens = filter_tokens(all_tokens) - file_loc = count_lines(code_tokens) - language_name = lexer.__class__.name - language = load_language_by_name(language_name) - if language: - measurements = scan_file(all_tokens, language) - else: - measurements = [] - entry = SourceFileEntry( - rel_path, checksum, lexer.__class__.name, file_loc, measurements - ) - codebase.add_file(entry) - return entry + entry = _analyze_file(path, rel_path, checksum, lexer) + codebase.add_file(entry) + return entry + + +def _analyze_file(path, rel_path, checksum, lexer): + with open(path) as f: + code = f.read() + all_tokens = lex(lexer, code, False) + code_tokens = filter_tokens(all_tokens) + file_loc = count_lines(code_tokens) + language_name = lexer.__class__.name + language = load_language_by_name(language_name) + if language: + measurements = scan_file(all_tokens, language) + else: + measurements = [] + entry = SourceFileEntry( + rel_path, checksum, lexer.__class__.name, file_loc, measurements + ) + return entry def scan_file(tokens: list[Token], language: Language) -> list[Measurement]: diff --git a/codelimit/common/gsm/matcher.py b/codelimit/common/gsm/matcher.py index 3c5c62a..ff727f0 100644 --- a/codelimit/common/gsm/matcher.py +++ b/codelimit/common/gsm/matcher.py @@ -1,4 +1,5 @@ import copy +from dataclasses import dataclass from typing import TypeVar from codelimit.common.gsm.Expression import ( @@ -7,8 +8,8 @@ nfa_to_dfa, Expression, ) -from codelimit.common.gsm.operator.Operator import Operator from codelimit.common.gsm.Pattern import Pattern +from codelimit.common.gsm.operator.Operator import Operator from codelimit.common.gsm.utils import render_automata T = TypeVar("T") @@ -55,41 +56,42 @@ def starts_with(expression: Expression, sequence: list) -> Pattern | None: return None +@dataclass +class FindState: + matches: list[Pattern] + active_patterns: list[Pattern] + next_state_patterns: list[Pattern] + + def find_all(expression: Expression, sequence: list) -> list[Pattern]: - nfa = expression_to_nfa(expression) - dfa = nfa_to_dfa(nfa) - matches = [] - active_patterns = [] - last_match_idx = -1 + dfa = nfa_to_dfa(expression_to_nfa(expression)) + fs = FindState([], [], []) for idx, item in enumerate(sequence): - dfa_copy = copy.deepcopy(dfa) - active_patterns.append(Pattern(idx, dfa_copy)) - next_state_patterns = [] - for pattern in active_patterns: - if pattern.start < last_match_idx: + fs.active_patterns.append(Pattern(idx, copy.deepcopy(dfa))) + fs.next_state_patterns = [] + for pattern in fs.active_patterns: + if fs.matches and pattern.start < fs.matches[-1].end: continue if len(pattern.state.transition) == 0 and pattern.is_accepting(): pattern.end = idx - matches.append(pattern) - last_match_idx = idx + fs.matches.append(pattern) continue for transition in pattern.state.transition: if transition[0].accept(item): pattern.tokens.append(item) pattern.state = transition[1] - next_state_patterns.append(pattern) + fs.next_state_patterns.append(pattern) break else: if pattern.is_accepting(): pattern.end = idx - matches.append(pattern) - last_match_idx = idx - active_patterns = next_state_patterns - for pattern in active_patterns: + fs.matches.append(pattern) + fs.active_patterns = fs.next_state_patterns + for pattern in fs.active_patterns: if pattern.is_accepting(): pattern.end = len(sequence) - matches.append(pattern) - return matches + fs.matches.append(pattern) + return fs.matches def nfa_match(expression: Expression, sequence: list): diff --git a/codelimit/utils.py b/codelimit/utils.py index ced9bdd..bc88ba4 100644 --- a/codelimit/utils.py +++ b/codelimit/utils.py @@ -22,13 +22,27 @@ def read_cached_report(path: Path) -> Optional[Report]: def upload_report( report: Report, repository: str, branch: str, url: str, token: str ) -> None: + result = api_post_report(report, branch, repository, url, token) + if result.ok: + typer.secho("Upload successful!", fg="green") + else: + error_message = "Upload unsuccessful: " + if result.text: + error_message += result.text + else: + error_message += str(result.status_code) + typer.secho(error_message, fg="red") + raise typer.Exit(code=1) + + +def api_post_report(report, branch, repository, url, token): data_template = ( f'{{{{"repository": "{repository}", "branch": "{branch}", "report":{{}}}}}}' ) with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - transient=True, + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + transient=True, ) as progress: progress.add_task(description=f"Uploading report to {url}", total=None) result = requests.post( @@ -41,13 +55,4 @@ def upload_report( "Authorization": f"Bearer {token}", }, ) - if result.ok: - typer.secho("Upload successful!", fg="green") - else: - error_message = "Upload unsuccessful: " - if result.text: - error_message += result.text - else: - error_message += str(result.status_code) - typer.secho(error_message, fg="red") - raise typer.Exit(code=1) + return result