From 6cb3acf3d3a38ef285a5de2fc1f0e7f1560931bb Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sat, 22 Jan 2022 09:36:00 +0100 Subject: [PATCH] Added artifact filters support to list file entries script --- .github/workflows/test_docker.yml | 4 +- .github/workflows/test_tox.yml | 2 +- .pylintrc | 2 +- appveyor.yml | 2 +- config/appveyor/install.ps1 | 2 +- config/dpkg/control | 2 +- dependencies.ini | 6 + dfimagetools/artifact_filters.py | 115 +++++++++++ dfimagetools/file_entry_lister.py | 114 +++++++---- dfimagetools/path_resolver.py | 291 ++++++++++++++++++++++++++++ dfimagetools/resources.py | 66 +++++++ requirements.txt | 1 + setup.cfg | 1 + test_data/artifacts/artifacts.yaml | 32 +++ tests/artifact_filters.py | 197 +++++++++++++++++++ tests/path_resover.py | 301 +++++++++++++++++++++++++++++ tools/list_file_entries.py | 89 ++++++++- 17 files changed, 1172 insertions(+), 55 deletions(-) create mode 100644 dfimagetools/artifact_filters.py create mode 100644 dfimagetools/path_resolver.py create mode 100644 dfimagetools/resources.py create mode 100644 test_data/artifacts/artifacts.yaml create mode 100644 tests/artifact_filters.py create mode 100644 tests/path_resover.py diff --git a/.github/workflows/test_docker.yml b/.github/workflows/test_docker.yml index 60d59d3..c6e4af5 100644 --- a/.github/workflows/test_docker.yml +++ b/.github/workflows/test_docker.yml @@ -17,7 +17,7 @@ jobs: - name: Install dependencies run: | dnf copr -y enable @gift/dev - dnf install -y @development-tools python3 python3-devel libbde-python3 libewf-python3 libfsapfs-python3 libfsext-python3 libfshfs-python3 libfsntfs-python3 libfsxfs-python3 libfvde-python3 libfwnt-python3 libluksde-python3 libmodi-python3 libqcow-python3 libsigscan-python3 libsmdev-python3 libsmraw-python3 libvhdi-python3 libvmdk-python3 libvsgpt-python3 libvshadow-python3 libvslvm-python3 python3-cffi python3-cryptography python3-dfdatetime python3-dfvfs python3-dtfabric python3-idna python3-mock python3-pbr python3-pytsk3 python3-pyxattr python3-pyyaml python3-setuptools python3-six + dnf install -y @development-tools python3 python3-devel libbde-python3 libewf-python3 libfsapfs-python3 libfsext-python3 libfshfs-python3 libfsntfs-python3 libfsxfs-python3 libfvde-python3 libfwnt-python3 libluksde-python3 libmodi-python3 libqcow-python3 libsigscan-python3 libsmdev-python3 libsmraw-python3 libvhdi-python3 libvmdk-python3 libvsgpt-python3 libvshadow-python3 libvslvm-python3 python3-artifacts python3-cffi python3-cryptography python3-dfdatetime python3-dfvfs python3-dtfabric python3-idna python3-mock python3-pbr python3-pytsk3 python3-pyxattr python3-pyyaml python3-setuptools python3-six - name: Run tests env: LANG: C.utf8 @@ -57,7 +57,7 @@ jobs: run: | add-apt-repository -y ppa:gift/dev apt-get update -q - apt-get install -y build-essential python3 python3-dev libbde-python3 libewf-python3 libfsapfs-python3 libfsext-python3 libfshfs-python3 libfsntfs-python3 libfsxfs-python3 libfvde-python3 libfwnt-python3 libluksde-python3 libmodi-python3 libqcow-python3 libsigscan-python3 libsmdev-python3 libsmraw-python3 libvhdi-python3 libvmdk-python3 libvsgpt-python3 libvshadow-python3 libvslvm-python3 python3-cffi-backend python3-cryptography python3-dfdatetime python3-dfvfs python3-distutils python3-dtfabric python3-idna python3-mock python3-pbr python3-pytsk3 python3-pyxattr python3-setuptools python3-six python3-yaml + apt-get install -y build-essential python3 python3-dev libbde-python3 libewf-python3 libfsapfs-python3 libfsext-python3 libfshfs-python3 libfsntfs-python3 libfsxfs-python3 libfvde-python3 libfwnt-python3 libluksde-python3 libmodi-python3 libqcow-python3 libsigscan-python3 libsmdev-python3 libsmraw-python3 libvhdi-python3 libvmdk-python3 libvsgpt-python3 libvshadow-python3 libvslvm-python3 python3-artifacts python3-cffi-backend python3-cryptography python3-dfdatetime python3-dfvfs python3-distutils python3-dtfabric python3-idna python3-mock python3-pbr python3-pytsk3 python3-pyxattr python3-setuptools python3-six python3-yaml - name: Run tests env: LANG: en_US.UTF-8 diff --git a/.github/workflows/test_tox.yml b/.github/workflows/test_tox.yml index c16d6f9..86dda36 100644 --- a/.github/workflows/test_tox.yml +++ b/.github/workflows/test_tox.yml @@ -47,7 +47,7 @@ jobs: add-apt-repository -y ppa:deadsnakes/ppa add-apt-repository -y ppa:gift/dev apt-get update -q - apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv libbde-python3 libewf-python3 libfsapfs-python3 libfsext-python3 libfshfs-python3 libfsntfs-python3 libfsxfs-python3 libfvde-python3 libfwnt-python3 libluksde-python3 libmodi-python3 libqcow-python3 libsigscan-python3 libsmdev-python3 libsmraw-python3 libvhdi-python3 libvmdk-python3 libvsgpt-python3 libvshadow-python3 libvslvm-python3 python3-cffi-backend python3-cryptography python3-dfdatetime python3-dfvfs python3-distutils python3-dtfabric python3-idna python3-mock python3-pbr python3-pip python3-pytsk3 python3-pyxattr python3-setuptools python3-six python3-yaml + apt-get install -y build-essential git libffi-dev python${{ matrix.python-version }} python${{ matrix.python-version }}-dev python${{ matrix.python-version }}-venv libbde-python3 libewf-python3 libfsapfs-python3 libfsext-python3 libfshfs-python3 libfsntfs-python3 libfsxfs-python3 libfvde-python3 libfwnt-python3 libluksde-python3 libmodi-python3 libqcow-python3 libsigscan-python3 libsmdev-python3 libsmraw-python3 libvhdi-python3 libvmdk-python3 libvsgpt-python3 libvshadow-python3 libvslvm-python3 python3-artifacts python3-cffi-backend python3-cryptography python3-dfdatetime python3-dfvfs python3-distutils python3-dtfabric python3-idna python3-mock python3-pbr python3-pip python3-pytsk3 python3-pyxattr python3-setuptools python3-six python3-yaml - name: Install tox run: | python3 -m pip install tox diff --git a/.pylintrc b/.pylintrc index 94e5a23..8d2e6f3 100644 --- a/.pylintrc +++ b/.pylintrc @@ -7,7 +7,7 @@ # A comma-separated list of package or module names from where C extensions may # be loaded. Extensions are loading into the active Python interpreter and may # run arbitrary code. -extension-pkg-allow-list=pybde,pyewf,pyfsapfs,pyfsext,pyfshfs,pyfsntfs,pyfsxfs,pyfvde,pyfwnt,pyluksde,pymodi,pyqcow,pysigscan,pysmdev,pysmraw,pytsk3,pyvhdi,pyvmdk,pyvsgpt,pyvshadow,pyvslvm +extension-pkg-allow-list=pybde,pyewf,pyfsapfs,pyfsext,pyfshfs,pyfsntfs,pyfsxfs,pyfvde,pyfwnt,pyluksde,pymodi,pyqcow,pysigscan,pysmdev,pysmraw,pytsk3,pyvhdi,pyvmdk,pyvsgpt,pyvshadow,pyvslvm,xattr # A comma-separated list of package or module names from where C extensions may # be loaded. Extensions are loading into the active Python interpreter and may diff --git a/appveyor.yml b/appveyor.yml index 80b0106..3c31a3e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -17,7 +17,7 @@ environment: HOMEBREW_NO_INSTALL_CLEANUP: 1 install: -- cmd: "%PYTHON%\\python.exe -m pip install -U pip setuptools wheel" +- cmd: "%PYTHON%\\python.exe -m pip install -U pip setuptools twine wheel" - cmd: "%PYTHON%\\python.exe -m pip install pywin32 WMI" - cmd: "%PYTHON%\\python.exe %PYTHON%\\Scripts\\pywin32_postinstall.py -install" - ps: If ($isWindows) { .\config\appveyor\install.ps1 } diff --git a/config/appveyor/install.ps1 b/config/appveyor/install.ps1 index ae57a93..8a4191b 100644 --- a/config/appveyor/install.ps1 +++ b/config/appveyor/install.ps1 @@ -1,6 +1,6 @@ # Script to set up tests on AppVeyor Windows. -$Dependencies = "PyYAML cffi cryptography dfdatetime dfvfs dtfabric idna libbde libewf libfsapfs libfsext libfshfs libfsntfs libfsxfs libfvde libfwnt libluksde libmodi libqcow libsigscan libsmdev libsmraw libvhdi libvmdk libvsgpt libvshadow libvslvm mock pbr pytsk3 six xattr" +$Dependencies = "PyYAML artifacts cffi cryptography dfdatetime dfvfs dtfabric idna libbde libewf libfsapfs libfsext libfshfs libfsntfs libfsxfs libfvde libfwnt libluksde libmodi libqcow libsigscan libsmdev libsmraw libvhdi libvmdk libvsgpt libvshadow libvslvm mock pbr pytsk3 six xattr" $Dependencies = ${Dependencies} -split " " $Output = Invoke-Expression -Command "git clone https://github.com/log2timeline/l2tdevtools.git ..\l2tdevtools 2>&1" diff --git a/config/dpkg/control b/config/dpkg/control index 747d83b..2d93068 100644 --- a/config/dpkg/control +++ b/config/dpkg/control @@ -9,7 +9,7 @@ Homepage: https://github.com/log2timeline/dfimagetools Package: python3-dfimagetools Architecture: all -Depends: libbde-python3 (>= 20140531), libewf-python3 (>= 20131210), libfsapfs-python3 (>= 20201107), libfsext-python3 (>= 20210721), libfshfs-python3 (>= 20210722), libfsntfs-python3 (>= 20200921), libfsxfs-python3 (>= 20210726), libfvde-python3 (>= 20160719), libfwnt-python3 (>= 20210717), libluksde-python3 (>= 20200101), libmodi-python3 (>= 20210405), libqcow-python3 (>= 20201213), libsigscan-python3 (>= 20191221), libsmdev-python3 (>= 20140529), libsmraw-python3 (>= 20140612), libvhdi-python3 (>= 20201014), libvmdk-python3 (>= 20140421), libvsgpt-python3 (>= 20210207), libvshadow-python3 (>= 20160109), libvslvm-python3 (>= 20160109), python3-cffi-backend (>= 1.9.1), python3-cryptography (>= 2.0.2), python3-dfdatetime (>= 20211225), python3-dfvfs (>= 20211227), python3-dtfabric (>= 20170524), python3-idna (>= 2.5), python3-pytsk3 (>= 20210419), python3-pyxattr (>= 0.7.2), python3-yaml (>= 3.10), ${misc:Depends} +Depends: libbde-python3 (>= 20140531), libewf-python3 (>= 20131210), libfsapfs-python3 (>= 20201107), libfsext-python3 (>= 20210721), libfshfs-python3 (>= 20210722), libfsntfs-python3 (>= 20200921), libfsxfs-python3 (>= 20210726), libfvde-python3 (>= 20160719), libfwnt-python3 (>= 20210717), libluksde-python3 (>= 20200101), libmodi-python3 (>= 20210405), libqcow-python3 (>= 20201213), libsigscan-python3 (>= 20191221), libsmdev-python3 (>= 20140529), libsmraw-python3 (>= 20140612), libvhdi-python3 (>= 20201014), libvmdk-python3 (>= 20140421), libvsgpt-python3 (>= 20210207), libvshadow-python3 (>= 20160109), libvslvm-python3 (>= 20160109), python3-artifacts (>= 20211205), python3-cffi-backend (>= 1.9.1), python3-cryptography (>= 2.0.2), python3-dfdatetime (>= 20211225), python3-dfvfs (>= 20211227), python3-dtfabric (>= 20170524), python3-idna (>= 2.5), python3-pytsk3 (>= 20210419), python3-pyxattr (>= 0.7.2), python3-yaml (>= 3.10), ${misc:Depends} Description: Python 3 module of dfImageTools Collection of tools to process storage media images. diff --git a/dependencies.ini b/dependencies.ini index f567d6b..2c4b631 100644 --- a/dependencies.ini +++ b/dependencies.ini @@ -1,3 +1,9 @@ +[artifacts] +dpkg_name: python3-artifacts +minimum_version: 20211205 +rpm_name: python3-artifacts +version_property: __version__ + [cffi] skip_check: true dpkg_name: python3-cffi-backend diff --git a/dfimagetools/artifact_filters.py b/dfimagetools/artifact_filters.py new file mode 100644 index 0000000..be17d67 --- /dev/null +++ b/dfimagetools/artifact_filters.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +"""Helper for filtering based on artifact definitions.""" + +import logging + +from artifacts import definitions as artifacts_definitions + +from dfvfs.helpers import file_system_searcher as dfvfs_file_system_searcher + +from dfimagetools import path_resolver + + +class ArtifactDefinitionFiltersGenerator(object): + """Generator of filters based on artifact definitions.""" + + def __init__(self, artifacts_registry, environment_variables, user_accounts): + """Initializes an artifact definition filters generator. + + Args: + artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifact + definitions registry. + environment_variables (list[EnvironmentVariable]): environment variables. + user_accounts (list[UserAccount]): user accounts. + """ + super(ArtifactDefinitionFiltersGenerator, self).__init__() + self._artifacts_registry = artifacts_registry + self._environment_variables = environment_variables + self._path_resolver = path_resolver.PathResolver() + self._user_accounts = user_accounts + + def _BuildFindSpecsFromArtifactDefinition(self, name): + """Builds find specifications from an artifact definition. + + Args: + name (str): name of the artifact definition. + + Yields: + dfvfs.FindSpec: file system (dfVFS) find specification. + """ + definition = self._artifacts_registry.GetDefinitionByName(name) + if not definition: + logging.warning('Undefined artifact definition: {0:s}'.format(name)) + else: + for source in definition.sources: + source_type = source.type_indicator + if source_type not in ( + artifacts_definitions.TYPE_INDICATOR_ARTIFACT_GROUP, + artifacts_definitions.TYPE_INDICATOR_FILE): + logging.warning(( + 'Unsupported source type: {0:s} in artifact definition: ' + '{1:s}"').format(source_type, name)) + continue + + if source_type == artifacts_definitions.TYPE_INDICATOR_ARTIFACT_GROUP: + for source_name in set(source.names): + for find_spec in self._BuildFindSpecsFromArtifactDefinition( + source_name): + yield find_spec + + elif source_type == artifacts_definitions.TYPE_INDICATOR_FILE: + for source_path in set(source.paths): + for find_spec in self._BuildFindSpecsFromFileSourcePath( + source_path, source.separator): + yield find_spec + + def _BuildFindSpecsFromFileSourcePath(self, source_path, path_separator): + """Builds find specifications from a file source type. + + Args: + source_path (str): file system path defined by the source. + path_separator (str): file system path segment separator. + + Yields: + dfvfs.FindSpec: file system (dfVFS) find specification. + """ + for path_glob in self._path_resolver.ExpandGlobStars( + source_path, path_separator): + + for path in self._path_resolver.ExpandUsersVariable( + path_glob, path_separator, self._user_accounts): + + if '%' in path: + path = self._path_resolver.ExpandEnvironmentVariables( + path, path_separator, self._environment_variables) + + if not path.startswith(path_separator): + logging.warning(( + 'The path filter must be defined as an absolute path: ' + '"{0:s}"').format(path)) + continue + + try: + find_spec = dfvfs_file_system_searcher.FindSpec( + case_sensitive=False, location_glob=path, + location_separator=path_separator) + except ValueError as exception: + logging.error(( + 'Unable to build find specification for path: "{0:s}" with ' + 'error: {1!s}').format(path, exception)) + continue + + yield find_spec + + def GetFindSpecs(self, names): + """Retrieves find specifications based on artifact definitions. + + Args: + names (list[str]): names of the artifact definitions to filter on. + + Yields: + dfvfs.FindSpec: file system (dfVFS) find specification. + """ + for name in set(names): + for find_spec in self._BuildFindSpecsFromArtifactDefinition(name): + yield find_spec diff --git a/dfimagetools/file_entry_lister.py b/dfimagetools/file_entry_lister.py index aeeb287..8d0913a 100644 --- a/dfimagetools/file_entry_lister.py +++ b/dfimagetools/file_entry_lister.py @@ -4,24 +4,17 @@ import logging import re -from dfvfs.analyzer import analyzer -from dfvfs.analyzer import fvde_analyzer_helper +from dfvfs.helpers import file_system_searcher from dfvfs.helpers import volume_scanner from dfvfs.lib import definitions as dfvfs_definitions -from dfvfs.resolver import resolver +from dfvfs.path import factory as dfvfs_path_spec_factory +from dfvfs.resolver import resolver as dfvfs_resolver from dfvfs.volume import factory as dfvfs_volume_system_factory from dfimagetools import bodyfile from dfimagetools import decorators -try: - # Disable experimental FVDE support. - analyzer.Analyzer.DeregisterHelper(fvde_analyzer_helper.FVDEAnalyzerHelper()) -except KeyError: - pass - - class FileEntryLister(volume_scanner.VolumeScanner): """File entry lister.""" @@ -37,6 +30,47 @@ def __init__(self, mediator=None): self._bodyfile_generator = bodyfile.BodyfileGenerator() self._list_only_files = False + def _GetBasePathSegments(self, base_path_spec): + """Retrieves the base path segments. + + Args: + base_path_specs (list[dfvfs.PathSpec]): source path specification. + + Returns: + list[str]: path segments. + """ + if not base_path_spec.HasParent() or not base_path_spec.parent: + return [''] + + if base_path_spec.parent.type_indicator in ( + dfvfs_definitions.TYPE_INDICATOR_APFS_CONTAINER, + dfvfs_definitions.TYPE_INDICATOR_GPT, + dfvfs_definitions.TYPE_INDICATOR_LVM): + volume_system = dfvfs_volume_system_factory.Factory.NewVolumeSystem( + base_path_spec.parent.type_indicator) + volume_system.Open(base_path_spec.parent) + + volume = volume_system.GetVolumeByIdentifier( + base_path_spec.parent.location[1:]) + + if base_path_spec.parent.type_indicator == ( + dfvfs_definitions.TYPE_INDICATOR_GPT): + volume_identifier_prefix = 'gpt' + else: + volume_identifier_prefix = volume_system.VOLUME_IDENTIFIER_PREFIX + + volume_identifier = volume.GetAttribute('identifier') + + volume_path_segment = '{0:s}{{{1:s}}}'.format( + volume_identifier_prefix, volume_identifier.value) + return ['', volume_path_segment] + + if base_path_spec.parent.type_indicator == ( + dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION): + return base_path_spec.parent.location.split('/') + + return [''] + def _GetPathSpecificationString(self, path_spec): """Retrieves a printable string representation of the path specification. @@ -97,14 +131,14 @@ def ListFileEntries(self, base_path_specs): """Lists file entries in the base path specification. Args: - base_path_specs (list[dfvfs.PathSpec]): source path specification. + base_path_specs (list[dfvfs.PathSpec]): source path specifications. Yields: tuple[dfvfs.FileEntry, list[str]]: file entry and path segments. """ for base_path_spec in base_path_specs: - file_system = resolver.Resolver.OpenFileSystem(base_path_spec) - file_entry = resolver.Resolver.OpenFileEntry(base_path_spec) + file_system = dfvfs_resolver.Resolver.OpenFileSystem(base_path_spec) + file_entry = dfvfs_resolver.Resolver.OpenFileEntry(base_path_spec) if file_entry is None: path_specification_string = self._GetPathSpecificationString( base_path_spec) @@ -112,34 +146,38 @@ def ListFileEntries(self, base_path_specs): path_specification_string)) return - base_path_segments = [''] - if base_path_spec.HasParent() and base_path_spec.parent: - if base_path_spec.parent.type_indicator in ( - dfvfs_definitions.TYPE_INDICATOR_APFS_CONTAINER, - dfvfs_definitions.TYPE_INDICATOR_GPT, - dfvfs_definitions.TYPE_INDICATOR_LVM): - volume_system = dfvfs_volume_system_factory.Factory.NewVolumeSystem( - base_path_spec.parent.type_indicator) - volume_system.Open(base_path_spec.parent) + base_path_segments = self._GetBasePathSegments(base_path_spec) + for result in self._ListFileEntry( + file_system, file_entry, base_path_segments): + yield result - volume = volume_system.GetVolumeByIdentifier( - base_path_spec.parent.location[1:]) + def ListFileEntriesWithFindSpecs(self, base_path_specs, find_specs): + """Lists file entries in the base path specification. - if base_path_spec.parent.type_indicator == ( - dfvfs_definitions.TYPE_INDICATOR_GPT): - volume_identifier_prefix = 'gpt' - else: - volume_identifier_prefix = volume_system.VOLUME_IDENTIFIER_PREFIX + Args: + base_path_specs (list[dfvfs.PathSpec]): source path specification. + find_specs (list[dfvfs.FindSpec]): find specifications. - volume_identifier = volume.GetAttribute('identifier') + Yields: + tuple[dfvfs.FileEntry, list[str]]: file entry and path segments. + """ + for base_path_spec in base_path_specs: + file_system = dfvfs_resolver.Resolver.OpenFileSystem(base_path_spec) - base_path_segments = ['', '{0:s}{{{1:s}}}'.format( - volume_identifier_prefix, volume_identifier.value)] + if dfvfs_path_spec_factory.Factory.IsSystemLevelTypeIndicator( + base_path_spec.type_indicator): + mount_point = base_path_spec + else: + mount_point = base_path_spec.parent - elif base_path_spec.parent.type_indicator == ( - dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION): - base_path_segments = base_path_spec.parent.location.split('/') + base_path_segments = self._GetBasePathSegments(base_path_spec) - for result in self._ListFileEntry( - file_system, file_entry, base_path_segments): - yield result + searcher = file_system_searcher.FileSystemSearcher( + file_system, mount_point) + for path_spec in searcher.Find(find_specs=find_specs): + file_entry = dfvfs_resolver.Resolver.OpenFileEntry(path_spec) + path_segments = file_system.SplitPath(path_spec.location) + + full_path_segments = list(base_path_segments) + full_path_segments.extend(path_segments) + yield file_entry, full_path_segments diff --git a/dfimagetools/path_resolver.py b/dfimagetools/path_resolver.py new file mode 100644 index 0000000..258e5ac --- /dev/null +++ b/dfimagetools/path_resolver.py @@ -0,0 +1,291 @@ +# -*- coding: utf-8 -*- +"""Helper for resolving paths.""" + +import logging + + +class PathResolver(object): + """Path resolver.""" + + _GLOBSTAR_RECURSION_LIMIT = 10 + + _PATH_EXPANSIONS_PER_USERS_VARIABLE = { + '%%users.appdata%%': [ + ['%%users.userprofile%%', 'AppData', 'Roaming'], + ['%%users.userprofile%%', 'Application Data']], + '%%users.localappdata%%': [ + ['%%users.userprofile%%', 'AppData', 'Local'], + ['%%users.userprofile%%', 'Local Settings', 'Application Data']], + '%%users.localappdata_low%%': [ + ['%%users.userprofile%%', 'AppData', 'LocalLow']], + '%%users.temp%%': [ + ['%%users.localappdata%%', 'Temp']]} + + _USER_DIRECTORY_VARIABLES = ( + '%%users.homedir%%', '%%users.userprofile%%') + + _WINDOWS_DRIVE_INDICATORS = ( + '%%environ_systemdrive%%', '%systemdrive%') + + def _ExpandEnvironmentVariablesInPathSegments( + self, path_segments, environment_variables): + """Expands environment variables in path segments. + + Args: + path_segments (list[str]): path segments with environment variables. + environment_variables (list[EnvironmentVariable]): environment variables. + + Returns: + list[str]: path segments with environment variables expanded. + """ + if environment_variables is None: + environment_variables = [] + + lookup_table = {} + if environment_variables: + for environment_variable in environment_variables: + attribute_name = environment_variable.name.upper() + attribute_value = environment_variable.value + if not isinstance(attribute_value, str): + continue + + lookup_table[attribute_name] = attribute_value + + # Make a copy of path_segments since this loop can change it. + for index, path_segment in enumerate(list(path_segments)): + if (len(path_segment) <= 2 or not path_segment.startswith('%') or + not path_segment.endswith('%')): + continue + + path_segment_upper_case = path_segment.upper() + if path_segment_upper_case.startswith('%%ENVIRON_'): + lookup_key = path_segment_upper_case[10:-2] + else: + lookup_key = path_segment_upper_case[1:-1] + path_segment = lookup_table.get(lookup_key, path_segment) + path_segment = path_segment.split('\\') + + expanded_path_segments = list(path_segments[:index]) + expanded_path_segments.extend(path_segment) + expanded_path_segments.extend(path_segments[index + 1:]) + + path_segments = expanded_path_segments + + if self._IsWindowsDrivePathSegment(path_segments[0]): + path_segments[0] = '' + + return path_segments + + def _ExpandUserDirectoryVariableInPathSegments( + self, path_segments, path_separator, user_accounts): + """Expands a user directory variable in path segments. + + This method expands an artifact definition user directory variable such as + %%users.homedir%% or %%users.userprofile%%. + + Args: + path_segments (list[str]): path segments. + path_separator (str): path segment separator. + user_accounts (list[UserAccount]): user accounts. + + Returns: + list[str]: paths returned for user accounts without a drive indicator. + """ + if not path_segments: + return [] + + user_paths = [] + + first_path_segment = path_segments[0].lower() + if first_path_segment not in self._USER_DIRECTORY_VARIABLES: + if self._IsWindowsDrivePathSegment(path_segments[0]): + path_segments[0] = '' + + user_path = path_separator.join(path_segments) + user_paths.append(user_path) + + elif not user_accounts: + user_path = path_separator.join(path_segments) + user_paths.append(user_path) + + else: + for user_account in user_accounts: + if not user_account.user_directory: + continue + + user_path_segments = user_account.user_directory.split( + user_account.user_directory_path_separator) + + if self._IsWindowsDrivePathSegment(user_path_segments[0]): + user_path_segments[0] = '' + + # Prevent concatenating two consecutive path segment separators. + if not user_path_segments[-1]: + user_path_segments.pop() + + user_path_segments.extend(path_segments[1:]) + + user_path = path_separator.join(user_path_segments) + user_paths.append(user_path) + + return user_paths + + def _ExpandUsersVariableInPathSegments( + self, path_segments, path_separator, user_accounts): + """Expands a users variable in path segments. + + This method expands an artifact definition user variable such as + %%users.appdata%% or %%users.temp%%. + + Args: + path_segments (list[str]): path segments. + path_separator (str): path segment separator. + user_accounts (list[UserAccount]): user accounts. + + Returns: + list[str]: paths for which the users variables have been expanded. + """ + if not path_segments: + return [] + + path_segments_lower = [ + path_segment.lower() for path_segment in path_segments] + + if path_segments_lower[0] in self._USER_DIRECTORY_VARIABLES: + return self._ExpandUserDirectoryVariableInPathSegments( + path_segments, path_separator, user_accounts) + + path_expansions = self._PATH_EXPANSIONS_PER_USERS_VARIABLE.get( + path_segments[0], None) + + if path_expansions: + expanded_paths = [] + + for path_expansion in path_expansions: + expanded_path_segments = list(path_expansion) + expanded_path_segments.extend(path_segments[1:]) + + paths = self._ExpandUsersVariableInPathSegments( + expanded_path_segments, path_separator, user_accounts) + expanded_paths.extend(paths) + + return expanded_paths + + if self._IsWindowsDrivePathSegment(path_segments[0]): + path_segments[0] = '' + + # TODO: add support for %%users.username%% + path = path_separator.join(path_segments) + return [path] + + def _IsWindowsDrivePathSegment(self, path_segment): + """Determines if the path segment contains a Windows Drive indicator. + + A drive indicator can be a drive letter, %SystemDrive% or the artifact + definition environment variable %%environ_systemdrive%%. + + Args: + path_segment (str): path segment. + + Returns: + bool: True if the path segment contains a Windows Drive indicator. + """ + if (len(path_segment) == 2 and path_segment[1] == ':' and + path_segment[0].isalpha()): + return True + + path_segment_lower = path_segment.lower() + return path_segment_lower in self._WINDOWS_DRIVE_INDICATORS + + def ExpandEnvironmentVariables( + self, path, path_separator, environment_variables): + """Expands environment variables. + + Args: + path (str): path with environment variables. + path_separator (str): path segment separator. + environment_variables (list[EnvironmentVariable]): environment variables. + + Returns: + str: path with environment variables expanded. + """ + path_segments = path.split(path_separator) + path_segments = self._ExpandEnvironmentVariablesInPathSegments( + path_segments, environment_variables) + return path_separator.join(path_segments) + + def ExpandGlobStars(self, path, path_separator): + """Expands globstars "**". + + A globstar "**" will recursively match all files and zero or more + directories and subdirectories. + + By default the maximum recursion depth is 10 subdirectories, a numeric + values after the globstar, such as "**5", can be used to define the maximum + recursion depth. + + Args: + path (str): path with globstars. + path_separator (str): path segment separator. + + Returns: + str: path with seperate globs for every globstar. + """ + expanded_paths = [] + + path_segments = path.split(path_separator) + last_segment_index = len(path_segments) - 1 + for segment_index, path_segment in enumerate(path_segments): + recursion_depth = None + if path_segment.startswith('**'): + if len(path_segment) == 2: + recursion_depth = 10 + else: + try: + recursion_depth = int(path_segment[2:], 10) + except (TypeError, ValueError): + logging.warning(( + 'Globstar with suffix "{0:s}" in path "{1:s}" not ' + 'supported.').format(path_segment, path)) + + elif '**' in path_segment: + logging.warning(( + 'Globstar with prefix "{0:s}" in path "{1:s}" not ' + 'supported.').format(path_segment, path)) + + if recursion_depth is not None: + if (recursion_depth <= 1 or + recursion_depth > self._GLOBSTAR_RECURSION_LIMIT): + logging.warning(( + 'Globstar "{0:s}" in path "{1:s}" exceed recursion maximum ' + 'recursion depth, limiting to: {2:d}.').format( + path_segment, path, self._GLOBSTAR_RECURSION_LIMIT)) + recursion_depth = self._GLOBSTAR_RECURSION_LIMIT + + next_segment_index = segment_index + 1 + for expanded_path_segment in [ + ['*'] * depth for depth in range(1, recursion_depth + 1)]: + expanded_path_segments = list(path_segments[:segment_index]) + expanded_path_segments.extend(expanded_path_segment) + if next_segment_index <= last_segment_index: + expanded_path_segments.extend(path_segments[next_segment_index:]) + + expanded_path = path_separator.join(expanded_path_segments) + expanded_paths.append(expanded_path) + + return expanded_paths or [path] + + def ExpandUsersVariable(self, path, path_separator, user_accounts): + """Expands a users variable, such as %%users.appdata%%. + + Args: + path (str): path with users variable. + path_separator (str): path segment separator. + user_accounts (list[UserAccount]): user accounts. + + Returns: + list[str]: paths for which the users variables have been expanded. + """ + path_segments = path.split(path_separator) + return self._ExpandUsersVariableInPathSegments( + path_segments, path_separator, user_accounts) diff --git a/dfimagetools/resources.py b/dfimagetools/resources.py new file mode 100644 index 0000000..2f0fb78 --- /dev/null +++ b/dfimagetools/resources.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +"""Various resource classes.""" + + +class EnvironmentVariable(object): + """Environment variable. + + Attributes: + case_sensitive (bool): True if environment variable name is case sensitive. + name (str): environment variable name such as "SystemRoot" as in + "%SystemRoot%" or "HOME" as in "$HOME". + value (str): environment variable value such as "C:\\Windows" or + "/home/user". + """ + + def __init__(self, case_sensitive=True, name=None, value=None): + """Initializes an environment variable. + + Args: + case_sensitive (Optional[bool]): True if environment variable name + is case sensitive. + name (Optional[str]): environment variable name. + value (Optional[str]): environment variable value. + """ + super(EnvironmentVariable, self).__init__() + self.case_sensitive = case_sensitive + self.name = name + self.value = value + + +class UserAccount(object): + """User account. + + Attributes: + full_name (str): name describing the user. + group_identifier (str): identifier of the primary group the user is part of. + identifier (str): user identifier. + user_directory (str): path of the user (or home or profile) directory. + user_directory_path_separator (str): path segment separator of the user + directory. + username (str): name uniquely identifying the user. + """ + + def __init__( + self, full_name=None, group_identifier=None, identifier=None, + user_directory=None, user_directory_path_separator='/', username=None): + """Initializes a user account. + + Args: + full_name (Optional[str]): name describing the user. + group_identifier (Optional[str]): identifier of the primary group + the user is part of. + identifier (Optional[str]): user identifier. + user_directory (Optional[str]): path of the user (or home or profile) + directory. + user_directory_path_separator (Optional[str]): path segment separator of + the user directory. + username (Optional[str]): name uniquely identifying the user. + """ + super(UserAccount, self).__init__() + self.full_name = full_name + self.group_identifier = group_identifier + self.identifier = identifier + self.user_directory = user_directory + self.user_directory_path_separator = user_directory_path_separator + self.username = username diff --git a/requirements.txt b/requirements.txt index 5c7760f..619a713 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ pip >= 7.0.0 PyYAML >= 3.10 +artifacts >= 20211205 cffi >= 1.9.1 cryptography >= 2.0.2 dfdatetime >= 20211225 diff --git a/setup.cfg b/setup.cfg index db6721d..0e45d7d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,6 +35,7 @@ requires = libbde-python3 >= 20140531 libvsgpt-python3 >= 20210207 libvshadow-python3 >= 20160109 libvslvm-python3 >= 20160109 + python3-artifacts >= 20211205 python3-cffi >= 1.9.1 python3-cryptography >= 2.0.2 python3-dfdatetime >= 20211225 diff --git a/test_data/artifacts/artifacts.yaml b/test_data/artifacts/artifacts.yaml new file mode 100644 index 0000000..8472a00 --- /dev/null +++ b/test_data/artifacts/artifacts.yaml @@ -0,0 +1,32 @@ +# Artifact definitions. + +name: TestGroup1 +doc: Test group artifact definition +sources: +- type: ARTIFACT_GROUP + attributes: + names: + - 'TestFile1' + - 'TestFile2' +labels: [System] +supported_os: [Windows] +--- +name: TestFile1 +doc: Test file artifact definition +sources: +- type: FILE + attributes: + paths: ['%%users.homedir%%/AUTHORS'] + separator: '/' +labels: [System] +supported_os: [Linux] +--- +name: TestFile2 +doc: Test file artifact definition +sources: +- type: FILE + attributes: + paths: ['%%environ_systemroot%%\test_data\*.evtx'] + separator: '\' +labels: [System] +supported_os: [Windows] diff --git a/tests/artifact_filters.py b/tests/artifact_filters.py new file mode 100644 index 0000000..88935c5 --- /dev/null +++ b/tests/artifact_filters.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Tests for the helper for filtering based on artifact definitions.""" + +import unittest + +from artifacts import reader as artifacts_reader +from artifacts import registry as artifacts_registry + +from dfimagetools import artifact_filters +from dfimagetools import resources + +from tests import test_lib + + +class ArtifactDefinitionFiltersGeneratorTest(test_lib.BaseTestCase): + """Tests for the artifact definition filters generator.""" + + # pylint: disable=protected-access + + def testBuildFindSpecsFromArtifactDefinition(self): + """Tests the GetFindSpecs function.""" + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + test_artifacts_path = self._GetTestFilePath(['artifacts']) + self._SkipIfPathNotExists(test_artifacts_path) + + registry.ReadFromDirectory(reader, test_artifacts_path) + + environment_variables = [resources.EnvironmentVariable( + case_sensitive=False, name='SystemRoot', value='C:\\Windows')] + + # Test file artifact definition type. + test_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, environment_variables, []) + find_specs = list(test_generator._BuildFindSpecsFromArtifactDefinition( + 'TestFile2')) + + # Should build 1 find_spec. + self.assertEqual(len(find_specs), 1) + + # Location segments should be equivalent to \Windows\test_data\*.evtx. + # Underscores are not escaped in regular expressions in supported versions + # of Python 3. See https://bugs.python.org/issue2650. + expected_location_segments = ['Windows', 'test_data', '.*\\.evtx'] + + self.assertEqual( + find_specs[0]._location_segments, expected_location_segments) + + # Test group artifact definition type. + test_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, environment_variables, []) + find_specs = list(test_generator._BuildFindSpecsFromArtifactDefinition( + 'TestGroup1')) + + # Should build 1 find_spec. + self.assertEqual(len(find_specs), 1) + + def testBuildFindSpecsFromFileSourcePath(self): + """Tests the _BuildFindSpecsFromFileSourcePath function.""" + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + test_artifacts_path = self._GetTestFilePath(['artifacts']) + self._SkipIfPathNotExists(test_artifacts_path) + + registry.ReadFromDirectory(reader, test_artifacts_path) + + # Test expansion of environment variables. + environment_variables = [resources.EnvironmentVariable( + case_sensitive=False, name='SystemRoot', value='C:\\Windows')] + + test_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, environment_variables, []) + find_specs = list(test_generator._BuildFindSpecsFromFileSourcePath( + '%%environ_systemroot%%\\test_data\\*.evtx', '\\')) + + # Should build 1 find_spec. + self.assertEqual(len(find_specs), 1) + + # Location segments should be equivalent to \Windows\test_data\*.evtx. + # Underscores are not escaped in regular expressions in supported versions + # of Python 3. See https://bugs.python.org/issue2650. + expected_location_segments = ['Windows', 'test_data', '.*\\.evtx'] + + self.assertEqual( + find_specs[0]._location_segments, expected_location_segments) + + # Test expansion of globs. + test_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, [], []) + find_specs = list(test_generator._BuildFindSpecsFromFileSourcePath( + '\\test_data\\**', '\\')) + + # Glob expansion should by default recurse ten levels. + self.assertEqual(len(find_specs), 10) + + # Last entry in find_specs list should be 10 levels of depth. + # Underscores are not escaped in regular expressions in supported versions + # of Python 3. See https://bugs.python.org/issue2650 + expected_location_segments = ['test_data'] + + expected_location_segments.extend([ + '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*', '.*']) + + self.assertEqual( + find_specs[9]._location_segments, expected_location_segments) + + # Test expansion of user home directories + test_user1 = resources.UserAccount( + user_directory='/homes/testuser1', username='testuser1') + test_user2 = resources.UserAccount( + user_directory='/home/testuser2', username='testuser2') + + test_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, [], [test_user1, test_user2]) + find_specs = list(test_generator._BuildFindSpecsFromFileSourcePath( + '%%users.homedir%%/.thumbnails/**3', '/')) + + # 6 find specs should be created for testuser1 and testuser2. + self.assertEqual(len(find_specs), 6) + + # Last entry in find_specs list should be testuser2 with a depth of 3 + expected_location_segments = [ + 'home', 'testuser2', '\\.thumbnails', '.*', '.*', '.*'] + self.assertEqual( + find_specs[5]._location_segments, expected_location_segments) + + # Test Windows path with profile directories and globs with a depth of 4. + test_user1 = resources.UserAccount( + user_directory='C:\\Users\\testuser1', + user_directory_path_separator='\\', username='testuser1') + test_user2 = resources.UserAccount( + user_directory='%SystemDrive%\\Users\\testuser2', + user_directory_path_separator='\\', username='testuser2') + + test_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, [], [test_user1, test_user2]) + find_specs = list(test_generator._BuildFindSpecsFromFileSourcePath( + '%%users.userprofile%%\\AppData\\**4', '\\')) + + # 8 find specs should be created for testuser1 and testuser2. + self.assertEqual(len(find_specs), 8) + + # Last entry in find_specs list should be testuser2, with a depth of 4. + expected_location_segments = [ + 'Users', 'testuser2', 'AppData', '.*', '.*', '.*', '.*'] + self.assertEqual( + find_specs[7]._location_segments, expected_location_segments) + + test_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, [], [test_user1, test_user2]) + find_specs = list(test_generator._BuildFindSpecsFromFileSourcePath( + '%%users.localappdata%%\\Microsoft\\**4', '\\')) + + # 16 find specs should be created for testuser1 and testuser2. + self.assertEqual(len(find_specs), 16) + + # Last entry in find_specs list should be testuser2, with a depth of 4. + expected_location_segments = [ + 'Users', 'testuser2', 'Local\\ Settings', 'Application\\ Data', + 'Microsoft', '.*', '.*', '.*', '.*'] + self.assertEqual( + find_specs[15]._location_segments, expected_location_segments) + + def testGetFindSpecs(self): + """Tests the GetFindSpecs function.""" + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + test_artifacts_path = self._GetTestFilePath(['artifacts']) + self._SkipIfPathNotExists(test_artifacts_path) + + registry.ReadFromDirectory(reader, test_artifacts_path) + + environment_variables = [resources.EnvironmentVariable( + case_sensitive=False, name='SystemRoot', value='C:\\Windows')] + + test_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, environment_variables, []) + find_specs = list(test_generator.GetFindSpecs(['TestFile2'])) + + # Should build 1 find_spec. + self.assertEqual(len(find_specs), 1) + + # Location segments should be equivalent to \Windows\test_data\*.evtx. + # Underscores are not escaped in regular expressions in supported versions + # of Python 3. See https://bugs.python.org/issue2650. + expected_location_segments = ['Windows', 'test_data', '.*\\.evtx'] + + self.assertEqual( + find_specs[0]._location_segments, expected_location_segments) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/path_resover.py b/tests/path_resover.py new file mode 100644 index 0000000..6b61bfe --- /dev/null +++ b/tests/path_resover.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Tests for the helper for resolving paths.""" + +import unittest + +from dfimagetools import path_resolver +from dfimagetools import resources + +from tests import test_lib + + +class PathResolverTest(test_lib.BaseTestCase): + """Tests for the path resolver.""" + + # pylint: disable=protected-access + + def testExpandEnvironmentVariablesInPathSegments(self): + """Tests the _ExpandEnvironmentVariablesInPathSegments function.""" + test_resolver = path_resolver.PathResolver() + + environment_variables = [] + + environment_variable = resources.EnvironmentVariable( + case_sensitive=False, name='allusersappdata', + value='C:\\Documents and Settings\\All Users\\Application Data') + environment_variables.append(environment_variable) + + environment_variable = resources.EnvironmentVariable( + case_sensitive=False, name='allusersprofile', + value='C:\\Documents and Settings\\All Users') + environment_variables.append(environment_variable) + + environment_variable = resources.EnvironmentVariable( + case_sensitive=False, name='SystemRoot', value='C:\\Windows') + environment_variables.append(environment_variable) + + expected_path_segments = [ + '', 'Documents and Settings', 'All Users', 'Application Data', + 'Apache Software Foundation'] + + path_segments = test_resolver._ExpandEnvironmentVariablesInPathSegments( + ['%AllUsersAppData%', 'Apache Software Foundation'], + environment_variables) + self.assertEqual(path_segments, expected_path_segments) + + expected_path_segments = [ + '', 'Documents and Settings', 'All Users', 'Start Menu', 'Programs', + 'Startup'] + + path_segments = test_resolver._ExpandEnvironmentVariablesInPathSegments( + ['%AllUsersProfile%', 'Start Menu', 'Programs', 'Startup'], + environment_variables) + self.assertEqual(path_segments, expected_path_segments) + + path_segments = test_resolver._ExpandEnvironmentVariablesInPathSegments( + ['%SystemRoot%', 'System32'], environment_variables) + self.assertEqual(path_segments, ['', 'Windows', 'System32']) + + path_segments = test_resolver._ExpandEnvironmentVariablesInPathSegments( + ['C:', 'Windows', 'System32'], environment_variables) + self.assertEqual(path_segments, ['', 'Windows', 'System32']) + + path_segments = test_resolver._ExpandEnvironmentVariablesInPathSegments( + ['%SystemRoot%', 'System32'], None) + self.assertEqual(path_segments, ['%SystemRoot%', 'System32']) + + path_segments = test_resolver._ExpandEnvironmentVariablesInPathSegments( + ['%Bogus%', 'System32'], environment_variables) + self.assertEqual(path_segments, ['%Bogus%', 'System32']) + + path_segments = test_resolver._ExpandEnvironmentVariablesInPathSegments( + ['%%environ_systemroot%%', 'System32'], environment_variables) + self.assertEqual(path_segments, ['', 'Windows', 'System32']) + + # Test non-string environment variable. + environment_variables = [] + + environment_variable = resources.EnvironmentVariable( + case_sensitive=False, name='SystemRoot', value=('bogus', 0)) + environment_variables.append(environment_variable) + + path_segments = test_resolver._ExpandEnvironmentVariablesInPathSegments( + ['%SystemRoot%', 'System32'], environment_variables) + self.assertEqual(path_segments, ['%SystemRoot%', 'System32']) + + def testExpandUserDirectoryVariableInPathSegments(self): + """Tests the _ExpandUserDirectoryVariableInPathSegments function.""" + test_resolver = path_resolver.PathResolver() + + user_account_artifact1 = resources.UserAccount( + user_directory='/home/Test1', username='Test1') + user_account_artifact2 = resources.UserAccount( + user_directory='/Users/Test2', username='Test2') + user_account_artifact3 = resources.UserAccount(username='Test3') + + user_accounts = [ + user_account_artifact1, user_account_artifact2, user_account_artifact3] + + path_segments = ['%%users.homedir%%', '.bashrc'] + expanded_paths = test_resolver._ExpandUserDirectoryVariableInPathSegments( + path_segments, '/', user_accounts) + + expected_expanded_paths = [ + '/home/Test1/.bashrc', + '/Users/Test2/.bashrc'] + self.assertEqual(expanded_paths, expected_expanded_paths) + + path_segments = ['%%users.homedir%%', '.bashrc'] + expanded_paths = test_resolver._ExpandUserDirectoryVariableInPathSegments( + path_segments, '/', []) + + expected_expanded_paths = [ + '%%users.homedir%%/.bashrc'] + self.assertEqual(expanded_paths, expected_expanded_paths) + + user_account_artifact1 = resources.UserAccount( + user_directory='C:\\Users\\Test1', user_directory_path_separator='\\', + username='Test1') + user_account_artifact2 = resources.UserAccount( + user_directory='%SystemDrive%\\Users\\Test2', + user_directory_path_separator='\\', username='Test2') + + user_accounts = [user_account_artifact1, user_account_artifact2] + + path_segments = ['%%users.userprofile%%', 'Profile'] + expanded_paths = test_resolver._ExpandUserDirectoryVariableInPathSegments( + path_segments, '\\', user_accounts) + + expected_expanded_paths = [ + '\\Users\\Test1\\Profile', + '\\Users\\Test2\\Profile'] + self.assertEqual(expanded_paths, expected_expanded_paths) + + path_segments = ['C:', 'Temp'] + expanded_paths = test_resolver._ExpandUserDirectoryVariableInPathSegments( + path_segments, '\\', user_accounts) + + expected_expanded_paths = ['\\Temp'] + self.assertEqual(expanded_paths, expected_expanded_paths) + + path_segments = ['C:', 'Temp', '%%users.userprofile%%'] + expanded_paths = test_resolver._ExpandUserDirectoryVariableInPathSegments( + path_segments, '\\', user_accounts) + + expected_expanded_paths = ['\\Temp\\%%users.userprofile%%'] + self.assertEqual(expanded_paths, expected_expanded_paths) + + def testExpandUsersVariableInPathSegments(self): + """Tests the _ExpandUsersVariableInPathSegments function.""" + test_resolver = path_resolver.PathResolver() + + user_account_artifact1 = resources.UserAccount( + identifier='1000', user_directory='C:\\Users\\Test1', + user_directory_path_separator='\\', username='Test1') + user_account_artifact2 = resources.UserAccount( + identifier='1001', user_directory='%SystemDrive%\\Users\\Test2', + user_directory_path_separator='\\', username='Test2') + + user_accounts = [user_account_artifact1, user_account_artifact2] + + path_segments = ['%%users.appdata%%', 'Microsoft', 'Windows', 'Recent'] + expanded_paths = test_resolver._ExpandUsersVariableInPathSegments( + path_segments, '\\', user_accounts) + + expected_expanded_paths = [ + '\\Users\\Test1\\AppData\\Roaming\\Microsoft\\Windows\\Recent', + '\\Users\\Test1\\Application Data\\Microsoft\\Windows\\Recent', + '\\Users\\Test2\\AppData\\Roaming\\Microsoft\\Windows\\Recent', + '\\Users\\Test2\\Application Data\\Microsoft\\Windows\\Recent'] + self.assertEqual(sorted(expanded_paths), expected_expanded_paths) + + path_segments = ['C:', 'Windows'] + expanded_paths = test_resolver._ExpandUsersVariableInPathSegments( + path_segments, '\\', user_accounts) + + expected_expanded_paths = ['\\Windows'] + self.assertEqual(sorted(expanded_paths), expected_expanded_paths) + + def testIsWindowsDrivePathSegment(self): + """Tests the _IsWindowsDrivePathSegment function.""" + test_resolver = path_resolver.PathResolver() + + result = test_resolver._IsWindowsDrivePathSegment('C:') + self.assertTrue(result) + + result = test_resolver._IsWindowsDrivePathSegment('%SystemDrive%') + self.assertTrue(result) + + result = test_resolver._IsWindowsDrivePathSegment('%%environ_systemdrive%%') + self.assertTrue(result) + + result = test_resolver._IsWindowsDrivePathSegment('Windows') + self.assertFalse(result) + + def testExpandEnvironmentVariables(self): + """Tests the ExpandEnvironmentVariables function.""" + test_resolver = path_resolver.PathResolver() + + environment_variables = [] + + environment_variable = resources.EnvironmentVariable( + case_sensitive=False, name='SystemRoot', value='C:\\Windows') + environment_variables.append(environment_variable) + + expanded_path = test_resolver.ExpandEnvironmentVariables( + '%SystemRoot%\\System32', '\\', environment_variables) + self.assertEqual(expanded_path, '\\Windows\\System32') + + def testExpandGlobStars(self): + """Tests the ExpandGlobStars function.""" + test_resolver = path_resolver.PathResolver() + + paths = test_resolver.ExpandGlobStars('/etc/sysconfig/**', '/') + + self.assertEqual(len(paths), 10) + + expected_paths = sorted([ + '/etc/sysconfig/*', + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*']) + self.assertEqual(sorted(paths), expected_paths) + + # Test globstar with recursion depth of 4. + paths = test_resolver.ExpandGlobStars('/etc/sysconfig/**4', '/') + + self.assertEqual(len(paths), 4) + + expected_paths = sorted([ + '/etc/sysconfig/*', + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*']) + self.assertEqual(sorted(paths), expected_paths) + + # Test globstar with unsupported recursion depth of 99. + paths = test_resolver.ExpandGlobStars('/etc/sysconfig/**99', '/') + + self.assertEqual(len(paths), 10) + + expected_paths = sorted([ + '/etc/sysconfig/*', + '/etc/sysconfig/*/*', + '/etc/sysconfig/*/*/*', + '/etc/sysconfig/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*', + '/etc/sysconfig/*/*/*/*/*/*/*/*/*/*']) + self.assertEqual(sorted(paths), expected_paths) + + # Test globstar with prefix. + paths = test_resolver.ExpandGlobStars('/etc/sysconfig/my**', '/') + + self.assertEqual(len(paths), 1) + + self.assertEqual(paths, ['/etc/sysconfig/my**']) + + # Test globstar with suffix. + paths = test_resolver.ExpandGlobStars('/etc/sysconfig/**.exe', '/') + + self.assertEqual(len(paths), 1) + + self.assertEqual(paths, ['/etc/sysconfig/**.exe']) + + def testExpandUsersVariable(self): + """Tests the ExpandUsersVariable function.""" + test_resolver = path_resolver.PathResolver() + + user_account_artifact1 = resources.UserAccount( + user_directory='C:\\Users\\Test1', user_directory_path_separator='\\', + username='Test1') + user_account_artifact2 = resources.UserAccount( + user_directory='%SystemDrive%\\Users\\Test2', + user_directory_path_separator='\\', username='Test2') + + user_accounts = [user_account_artifact1, user_account_artifact2] + + path = '%%users.appdata%%\\Microsoft\\Windows\\Recent' + expanded_paths = test_resolver.ExpandUsersVariable( + path, '\\', user_accounts) + + expected_expanded_paths = [ + '\\Users\\Test1\\AppData\\Roaming\\Microsoft\\Windows\\Recent', + '\\Users\\Test1\\Application Data\\Microsoft\\Windows\\Recent', + '\\Users\\Test2\\AppData\\Roaming\\Microsoft\\Windows\\Recent', + '\\Users\\Test2\\Application Data\\Microsoft\\Windows\\Recent'] + self.assertEqual(sorted(expanded_paths), expected_expanded_paths) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/list_file_entries.py b/tools/list_file_entries.py index fb72a0c..fcf1e22 100755 --- a/tools/list_file_entries.py +++ b/tools/list_file_entries.py @@ -6,13 +6,18 @@ import logging import sys +from artifacts import reader as artifacts_reader +from artifacts import registry as artifacts_registry + from dfvfs.helpers import command_line from dfvfs.helpers import volume_scanner from dfvfs.lib import errors +from dfimagetools import artifact_filters from dfimagetools import bodyfile from dfimagetools import file_entry_lister from dfimagetools import helpers +from dfimagetools import resources def Main(): @@ -24,15 +29,36 @@ def Main(): argument_parser = argparse.ArgumentParser(description=( 'Lists metadata of file entries in a directory or storage media image.')) + # TODO: add filter group argument_parser.add_argument( - '--back_end', '--back-end', dest='back_end', action='store', - metavar='NTFS', default=None, help='preferred dfVFS back-end.') + '--artifact_definitions', '--artifact-definitions', + dest='artifact_definitions', type=str, metavar='PATH', action='store', + help=('Path to a directory or file containing the artifact definition ' + '.yaml files.')) + + argument_parser.add_argument( + '--artifact_filters', '--artifact-filters', dest='artifact_filters', + type=str, default=None, metavar='NAMES', action='store', help=( + 'Comma separated list of names of artifact definitions to extract.')) + + argument_parser.add_argument( + '--custom_artifact_definitions', '--custom-artifact-definitions', + dest='custom_artifact_definitions', type=str, metavar='PATH', + action='store', help=( + 'Path to a directory or file containing custom artifact definition ' + '.yaml files. ')) + # TODO: add output group argument_parser.add_argument( '--output_format', '--output-format', dest='output_format', action='store', metavar='FORMAT', default='bodyfile', help=( 'output format, default is bodyfile.')) + # TODO: add source group + argument_parser.add_argument( + '--back_end', '--back-end', dest='back_end', action='store', + metavar='NTFS', default=None, help='preferred dfVFS back-end.') + argument_parser.add_argument( '--partitions', '--partition', dest='partitions', action='store', type=str, default=None, help=( @@ -80,6 +106,16 @@ def Main(): print('') return False + if options.artifact_filters: + if (not options.artifact_definitions and + not options.custom_artifact_definitions): + print('[ERROR] artifact filters were specified but no paths to ' + 'artifact definitions were provided.') + print('') + argument_parser.print_help() + print('') + return False + helpers.SetDFVFSBackEnd(options.back_end) logging.basicConfig( @@ -101,7 +137,6 @@ def Main(): options.volumes) entry_lister = file_entry_lister.FileEntryLister(mediator=mediator) - return_value = True try: base_path_specs = entry_lister.GetBasePathSpecs( @@ -111,24 +146,58 @@ def Main(): print('') return False + find_specs = [] + if options.artifact_filters: + registry = artifacts_registry.ArtifactDefinitionsRegistry() + reader = artifacts_reader.YamlArtifactsReader() + + if options.artifact_definitions: + registry.ReadFromDirectory(reader, options.artifact_definitions) + if options.custom_artifact_definitions: + registry.ReadFromDirectory(reader, options.custom_artifact_definitions) + + # TODO: add support for determining environment variables and user + # accounts. + system_root_environment_variable = resources.EnvironmentVariable( + case_sensitive=False, name='SystemRoot', value='C:\\Windows') + windir_environment_variable = resources.EnvironmentVariable( + case_sensitive=False, name='WinDir', value='C:\\Windows') + + environment_variables = [ + system_root_environment_variable, windir_environment_variable] + + filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator( + registry, environment_variables, []) + + names = options.artifact_filters.split(',') + find_specs = list(filter_generator.GetFindSpecs(names)) + + if not find_specs: + print('[ERROR] an artifact filter was specified but no corresponding ' + 'file system find specifications were generated.') + return False + + if find_specs: + file_entries_generator = entry_lister.ListFileEntriesWithFindSpecs( + base_path_specs, find_specs) + else: + file_entries_generator = entry_lister.ListFileEntries(base_path_specs) + bodyfile_generator = bodyfile.BodyfileGenerator() - for file_entry, path_segments in entry_lister.ListFileEntries( - base_path_specs): + for file_entry, path_segments in file_entries_generator: for bodyfile_entry in bodyfile_generator.GetEntries( file_entry, path_segments): print(bodyfile_entry) except errors.ScannerError as exception: - return_value = False - print('[ERROR] {0!s}'.format(exception), file=sys.stderr) + return False except KeyboardInterrupt: - return_value = False - print('Aborted by user.', file=sys.stderr) + return False - return return_value + return True if __name__ == '__main__':