Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#113 Remove temporary directory in the output of a git analysis #168

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ Version 1.8.1, 2024-07-xx
`#157 <https://github.com/roskakori/pygount/issues/157>`_).
* Development: Change default branch to main (issue
`#160 <https://github.com/roskakori/pygount/issues/160>`_).
* Removed deprecated code: (contributed by Marco Gambone and Niels Vanden Bussche, issue
* Remove temporary directory in the output of a git analysis (contributed by
Isabel Beckenbach, issue `#113 <https://github.com/roskakori/pygount/issues/113>`_)
* Remove deprecated code: (contributed by Marco Gambone and Niels Vanden Bussche, issue
`#47 <https://github.com/roskakori/pygount/issues/47>`_).

Version 1.8.0, 2024-05-13
Expand Down
53 changes: 36 additions & 17 deletions pygount/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import os
import re
from dataclasses import dataclass
from enum import Enum
from io import SEEK_CUR, BufferedIOBase, IOBase, RawIOBase, TextIOBase
from typing import Iterator, List, Optional, Pattern, Sequence, Set, Tuple, Union
Expand Down Expand Up @@ -146,6 +147,13 @@ class SourceState(Enum):
_SUFFIX_TO_FALLBACK_LEXER_MAP[_oracle_suffix] = pygments.lexers.get_lexer_by_name("plpgsql")


@dataclass(frozen=True)
class PathData:
source_path: str
group: str
tmp_dir: Optional[str] = None


class DuplicatePool:
"""
A pool that collects information about potential duplicate files.
Expand Down Expand Up @@ -223,7 +231,11 @@ def __init__(

@staticmethod
def from_state(
source_path: str, group: str, state: SourceState, state_info: Optional[str] = None
source_path: str,
group: str,
state: SourceState,
state_info: Optional[str] = None,
tmp_dir: Optional[str] = None,
) -> "SourceAnalysis":
"""
Factory method to create a :py:class:`SourceAnalysis` with all counts
Expand All @@ -233,8 +245,9 @@ def from_state(
assert group is not None
assert state != SourceState.analyzed, "use from() for analyzable sources"
SourceAnalysis._check_state_info(state, state_info)
reduced_path = source_path.split(tmp_dir)[-1].lstrip(os.sep) if tmp_dir else source_path
return SourceAnalysis(
path=source_path,
path=reduced_path,
language=f"__{state.name}__",
group=group,
code=0,
Expand Down Expand Up @@ -263,6 +276,7 @@ def from_file(
duplicate_pool: Optional[DuplicatePool] = None,
file_handle: Optional[IOBase] = None,
merge_embedded_language: bool = False,
tmp_dir: Optional[str] = None,
) -> "SourceAnalysis":
"""
Factory method to create a :py:class:`SourceAnalysis` by analyzing
Expand All @@ -284,6 +298,8 @@ def from_file(
:param merge_embedded_language: If pygments detects a base and embedded language, the source
code counts towards the base language. For example: "JavaScript+Lasso" counts as
"JavaScript".
:param tmp_dir: If a temporary directory was created, strip it from the path name. This happens
right now only for git repositories.
"""
assert encoding is not None

Expand Down Expand Up @@ -355,8 +371,9 @@ def from_file(
if mark_to_check in line_parts:
mark_to_increment = mark_to_check
mark_to_count_map[mark_to_increment] += 1
reduced_path = source_path.split(tmp_dir)[-1].lstrip(os.sep) if tmp_dir else source_path
result = SourceAnalysis(
path=source_path,
path=reduced_path,
language=language,
group=group,
code=mark_to_count_map["c"],
Expand Down Expand Up @@ -549,7 +566,7 @@ def _is_path_to_skip(self, name, is_folder) -> bool:
regexps_to_skip = self._folder_regexps_to_skip if is_folder else self._name_regexps_to_skip
return any(path_name_to_skip_regex.match(name) is not None for path_name_to_skip_regex in regexps_to_skip)

def _paths_and_group_to_analyze_in(self, folder, group) -> Tuple[str, str]:
def _paths_and_group_to_analyze_in(self, folder, group, tmp_dir) -> PathData:
assert folder is not None
assert group is not None

Expand All @@ -560,11 +577,11 @@ def _paths_and_group_to_analyze_in(self, folder, group) -> Tuple[str, str]:
if self._is_path_to_skip(os.path.basename(path), is_folder):
_log.debug("skip due to matching skip pattern: %s", path)
elif is_folder:
yield from self._paths_and_group_to_analyze_in(path, group)
yield from self._paths_and_group_to_analyze_in(path, group, tmp_dir)
else:
yield path, group
yield PathData(source_path=path, group=group, tmp_dir=tmp_dir)

def _paths_and_group_to_analyze(self, path_to_analyse_pattern, group=None) -> Iterator[Tuple[str, str]]:
def _paths_and_group_to_analyze(self, path_to_analyse_pattern, group=None, tmp_dir=None) -> Iterator[PathData]:
for path_to_analyse in glob.glob(path_to_analyse_pattern):
if os.path.islink(path_to_analyse):
_log.debug("skip link: %s", path_to_analyse)
Expand All @@ -580,15 +597,15 @@ def _paths_and_group_to_analyze(self, path_to_analyse_pattern, group=None) -> It
if actual_group == "":
# Compensate for trailing path separator.
actual_group = os.path.basename(os.path.dirname(path_to_analyse))
yield from self._paths_and_group_to_analyze_in(path_to_analyse_pattern, actual_group)
yield from self._paths_and_group_to_analyze_in(path_to_analyse_pattern, actual_group, tmp_dir)
else:
if actual_group is None:
actual_group = os.path.dirname(path_to_analyse)
if actual_group == "":
actual_group = os.path.basename(os.path.dirname(os.path.abspath(path_to_analyse)))
yield path_to_analyse, actual_group
yield PathData(source_path=path_to_analyse, group=actual_group, tmp_dir=tmp_dir)

def _source_paths_and_groups_to_analyze(self, source_patterns_to_analyze) -> List[Tuple[str, str]]:
def _source_paths_and_groups_to_analyze(self, source_patterns_to_analyze) -> List[PathData]:
assert source_patterns_to_analyze is not None
result = []
# NOTE: We could avoid initializing `source_pattern_to_analyze` here by moving the `try` inside
Expand All @@ -602,28 +619,30 @@ def _source_paths_and_groups_to_analyze(self, source_patterns_to_analyze) -> Lis
self._git_storages.append(git_storage)
git_storage.extract()
# TODO#113: Find a way to exclude the ugly temp folder from the source path.
result.extend(self._paths_and_group_to_analyze(git_storage.temp_folder))
result.extend(
self._paths_and_group_to_analyze(git_storage.temp_folder, tmp_dir=git_storage.temp_folder)
)
else:
result.extend(self._paths_and_group_to_analyze(source_pattern_to_analyze))
except OSError as error:
assert source_pattern_to_analyze is not None
raise OSError(f'cannot scan "{source_pattern_to_analyze}" for source files: {error}') from error
result = sorted(set(result))
result = sorted(set(result), key=lambda data: (data.source_path, data.group))
return result

def source_paths(self) -> Iterator[str]:
def source_paths(self) -> Iterator[PathData]:
"""
Paths to source code files matching all the conditions for this scanner.
"""
source_paths_and_groups_to_analyze = self._source_paths_and_groups_to_analyze(self.source_patterns)

for source_path, group in source_paths_and_groups_to_analyze:
suffix = os.path.splitext(source_path)[1].lstrip(".")
for path_data in source_paths_and_groups_to_analyze:
suffix = os.path.splitext(path_data.source_path)[1].lstrip(".")
is_suffix_to_analyze = any(suffix_regexp.match(suffix) for suffix_regexp in self.suffixes)
if is_suffix_to_analyze:
yield source_path, group
yield path_data
else:
_log.info("skip due to suffix: %s", source_path)
_log.info("skip due to suffix: %s", path_data.source_path)


_LANGUAGE_TO_WHITE_WORDS_MAP = {"batchfile": {"@"}, "python": {"pass"}, "sql": {"begin", "end"}}
Expand Down
7 changes: 4 additions & 3 deletions pygount/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,16 +353,17 @@ def execute(self):
disable=not writer.has_to_track_progress, transient=True
) as progress:
try:
for source_path, group in progress.track(source_paths_and_groups_to_analyze):
for path_data in progress.track(source_paths_and_groups_to_analyze):
writer.add(
pygount.analysis.SourceAnalysis.from_file(
source_path,
group,
path_data.source_path,
path_data.group,
self.default_encoding,
self.fallback_encoding,
generated_regexes=self._generated_regexs,
duplicate_pool=duplicate_pool,
merge_embedded_language=self.has_to_merge_embedded_languages,
tmp_dir=path_data.tmp_dir,
)
)
finally:
Expand Down
13 changes: 7 additions & 6 deletions tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def test_can_find_python_files(self):
scanner = analysis.SourceScanner([PYGOUNT_SOURCE_FOLDER], "py")
actual_paths = list(scanner.source_paths())
assert actual_paths != []
for python_path, _ in actual_paths:
actual_suffix = os.path.splitext(python_path)[1]
for path_data in actual_paths:
actual_suffix = os.path.splitext(path_data.source_path)[1]
assert actual_suffix == ".py"

def test_can_skip_dot_folder(self):
Expand All @@ -61,23 +61,24 @@ def test_can_skip_dot_folder(self):
self.create_temp_file(relative_path_to_skip, "skip = 2", do_create_folder=True)

scanner = analysis.SourceScanner([project_folder])
scanned_names = [os.path.basename(source_path) for source_path, _ in scanner.source_paths()]
scanned_names = [os.path.basename(path_data.source_path) for path_data in scanner.source_paths()]
assert scanned_names == [name_to_include]

def test_can_find_python_files_in_dot(self):
scanner = analysis.SourceScanner(["."], "py")
actual_paths = list(scanner.source_paths())
assert actual_paths != []
for python_path, _ in actual_paths:
actual_suffix = os.path.splitext(python_path)[1]
for path_data in actual_paths:
actual_suffix = os.path.splitext(path_data.source_path)[1]
assert actual_suffix == ".py"

def test_can_find_files_from_mixed_cloned_git_remote_url_and_local(self):
git_remote_url = "https://github.com/roskakori/pygount.git"
with analysis.SourceScanner([git_remote_url, PYGOUNT_SOURCE_FOLDER]) as scanner:
actual_paths = list(scanner.source_paths())
assert actual_paths != []
assert actual_paths[0][1] != actual_paths[-1][1]
assert actual_paths[0].source_path != actual_paths[-1].source_path
assert actual_paths[-1].tmp_dir is not None


class AnalysisTest(unittest.TestCase):
Expand Down
Loading