Skip to content
This repository has been archived by the owner on Jan 18, 2024. It is now read-only.

Commit

Permalink
Use wcmatch.glob.globfilter for path_utils
Browse files Browse the repository at this point in the history
  • Loading branch information
ethho committed Dec 9, 2023
1 parent 0ba1b24 commit b6cf232
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 142 deletions.
110 changes: 13 additions & 97 deletions datajoint_file_validator/path_utils.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,22 @@
import fnmatch
from typing import List
import glob
import os.path
from itertools import product
from .snapshot import FileMetadata, Snapshot
from wcmatch import glob


# Cross-Python dictionary views on the keys
if hasattr(dict, "viewkeys"):
# Python 2
def _viewkeys(d):
return d.viewkeys()
def find_matching_paths(
filenames, patterns, flags=(glob.GLOBSTAR | glob.K | glob.X), **kw
):
return glob.globfilter(filenames, patterns, flags=flags, **kw)

else:
# Python 3
def _viewkeys(d):
return d.keys()

def find_matching_files_gen(snapshot: Snapshot, patterns):
filenames = (file.get('path') for file in snapshot)
return (
file for file in snapshot
if file.get('path') in set(find_matching_paths(filenames, patterns))
)

def _in_trie(trie, path):
"""Determine if path is completely in trie"""
current = trie
for elem in path:
try:
current = current[elem]
except KeyError:
return False
return None in current


def find_matching_paths_generator(paths, pattern):
"""
Produce a list of paths that match the pattern.
---
paths: list of str
List of paths to search.
pattern: str
Pattern to match.
Adapted from
https://stackoverflow.com/questions/27726545/python-glob-but-against-a-list-of-strings-rather-than-the-filesystem
"""
og_pattern = pattern
# if os.altsep: # normalise
# pattern = pattern.replace(os.altsep, os.sep)
pattern = os.path.normpath(pattern).split(os.sep)

# build a trie out of path elements; efficiently search on prefixes
path_trie = {}
for path in paths:
if os.altsep: # normalise
path = path.replace(os.altsep, os.sep)
_, path = os.path.splitdrive(path)
elems = path.split(os.sep)
current = path_trie
for elem in elems:
current = current.setdefault(elem, {})
current.setdefault(None, None) # sentinel

matching = []

current_level = [path_trie]
for subpattern in pattern:
if not glob.has_magic(subpattern):
# plain element, element must be in the trie or there are
# 0 matches
if not any(subpattern in d for d in current_level):
return []
matching.append([subpattern])
current_level = [d[subpattern] for d in current_level if subpattern in d]
else:
# match all next levels in the trie that match the pattern
matched_names = fnmatch.filter(
{k for d in current_level for k in d}, subpattern
)
if not matched_names:
# nothing found
return []
matching.append(matched_names)
current_level = [
d[n] for d in current_level for n in _viewkeys(d) & set(matched_names)
]

return (os.path.normpath(os.sep.join(p)) for p in product(*matching) if _in_trie(path_trie, p))


def find_matching_paths(paths, pattern) -> List:
"""
Produce a list of paths that match the pattern.
---
paths: list of str
List of paths to search.
pattern: str
Pattern to match.
"""
return set(find_matching_paths_generator(paths, pattern))


def find_matching_files(snapshot: Snapshot, pattern: str):
paths = [file["path"] for file in snapshot]
path_matches = find_matching_paths_generator(paths, pattern)
path_matches = [os.path.normpath(path) for path in path_matches]
return [file for file in snapshot if os.path.normpath(file["path"]) in path_matches]
def find_matching_files(snapshot: Snapshot, path: str):
return list(find_matching_files_gen)
3 changes: 0 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
import pytest



118 changes: 76 additions & 42 deletions tests/test_path_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import pytest
from datajoint_file_validator.path_utils import find_matching_files, find_matching_paths
from datajoint_file_validator.path_utils import find_matching_paths


@pytest.fixture
Expand Down Expand Up @@ -49,48 +49,82 @@ def filename0_paths(filename0_snapshot):

@pytest.fixture
def example0_paths():
return ['./',
'./2021-10-02',
'./2021-10-02/subject1_frame1.png',
'./2021-10-02/subject1_frame2.png',
'./2021-10-02/obs.md',
'./2021-10-02/subject1_frame3.png',
'./2021-10-02/subject1_frame7.png',
'./2021-10-02/subject1_frame0.png',
'./2021-10-02/foo',
'./2021-10-02/foo/bar.txt',
'./2021-10-02/subject1_frame4.png',
'./2021-10-02/subject1_frame6.png',
'./2021-10-02/subject1_frame5.png',
'./obs.md',
'./2021-10-01',
'./2021-10-01/subject1_frame1.png',
'./2021-10-01/subject1_frame2.png',
'./2021-10-01/obs.txt',
'./2021-10-01/subject1_frame3.png',
'./2021-10-01/subject1_frame0.png',
'./2021-10-01/subject1_frame4.png',
'./2021-10-01/subject1_frame5.png',
'./README.txt']
"""
flags = (glob.GLOBSTAR | glob.K | glob.X)
glob.glob('**', flags=flags)
"""
return set(
[
"2021-10-02/",
"2021-10-02/subject1_frame1.png",
"2021-10-02/subject1_frame2.png",
"2021-10-02/obs.md",
"2021-10-02/subject1_frame3.png",
"2021-10-02/subject1_frame7.png",
"2021-10-02/subject1_frame0.png",
"2021-10-02/foo/",
"2021-10-02/foo/bar.txt",
"2021-10-02/subject1_frame4.png",
"2021-10-02/subject1_frame6.png",
"2021-10-02/subject1_frame5.png",
"obs.md",
"2021-10-01/",
"2021-10-01/subject1_frame1.png",
"2021-10-01/subject1_frame2.png",
"2021-10-01/obs.txt",
"2021-10-01/subject1_frame3.png",
"2021-10-01/subject1_frame0.png",
"2021-10-01/subject1_frame4.png",
"2021-10-01/subject1_frame5.png",
"README.txt",
]
)

class TestFindMatchingPaths:

def test_same_after_star_star(self, filename0_paths):
paths = filename0_paths
assert paths == find_matching_paths(paths, "**")
def test_example0_paths(example0_paths):
assert set(find_matching_paths(example0_paths, "**")) == example0_paths
assert set(find_matching_paths(example0_paths, ["**"])) == example0_paths
assert not set(find_matching_paths(example0_paths, "./**"))
assert not set(find_matching_paths(example0_paths, "./*"))

def test_so_example(self):
paths = ['/foo/bar/baz', '/spam/eggs/baz', '/foo/bar/bar']
assert find_matching_paths(paths, '/foo/bar/*') == set(['/foo/bar/baz', '/foo/bar/bar'])
assert find_matching_paths(paths, '/*/bar/b*') == set(['/foo/bar/baz', '/foo/bar/bar'])
assert find_matching_paths(paths, '/*/[be]*/b*') == set(['/foo/bar/baz', '/foo/bar/bar', '/spam/eggs/baz'])
assert not find_matching_paths(paths, '/*/[xq]*/b*')
assert find_matching_paths(paths, '/**') == paths
assert find_matching_paths(paths, '/*/**') == paths
assert find_matching_paths(paths, '/**/*') == paths
assert find_matching_paths(paths, '**/*') == paths
assert find_matching_paths(paths, '**/**') == paths
assert set(find_matching_paths(example0_paths, "**.md")) == {
"obs.md",
"2021-10-02/obs.md",
}
assert not set(find_matching_paths(example0_paths, "./**.md"))
assert set(find_matching_paths(example0_paths, "**.txt")) == {
"2021-10-01/obs.txt",
"2021-10-02/foo/bar.txt",
"README.txt",
}
assert set(find_matching_paths(example0_paths, "*/*/*.txt")) == {
"2021-10-02/foo/bar.txt",
}
assert set(find_matching_paths(example0_paths, "*/**/*.txt")) == {
"2021-10-01/obs.txt",
"2021-10-02/foo/bar.txt",
}

def test_find_matching_paths(self, filename0_paths):
paths = filename0_paths
assert paths == find_matching_paths(paths, "/**")
assert set(
find_matching_paths(example0_paths, "2021-10-0*/subject1_frame*.png")
) == {
"2021-10-01/subject1_frame0.png",
"2021-10-01/subject1_frame1.png",
"2021-10-01/subject1_frame2.png",
"2021-10-01/subject1_frame3.png",
"2021-10-01/subject1_frame4.png",
"2021-10-01/subject1_frame5.png",
"2021-10-02/subject1_frame0.png",
"2021-10-02/subject1_frame1.png",
"2021-10-02/subject1_frame2.png",
"2021-10-02/subject1_frame3.png",
"2021-10-02/subject1_frame4.png",
"2021-10-02/subject1_frame5.png",
"2021-10-02/subject1_frame6.png",
"2021-10-02/subject1_frame7.png",
}

assert set(find_matching_paths(example0_paths, "*/")) == {
"2021-10-01/",
"2021-10-02/",
}

0 comments on commit b6cf232

Please sign in to comment.