Skip to content

Commit

Permalink
Added script to extract data streams log2timeline#1
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Jan 22, 2022
1 parent 6cb3acf commit abc6e95
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 4 deletions.
103 changes: 103 additions & 0 deletions dfimagetools/data_stream_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
"""Helper to write data streams."""

import os


class DataStreamWriter(object):
"""Data stream writer."""

_BUFFER_SIZE = 32768

_NON_PRINTABLE_CHARACTERS = list(range(0, 0x20)) + list(range(0x7f, 0xa0))

_ESCAPE_CHARACTERS = {
'/': '\\/',
':': '\\:',
'\\': '\\\\',
'|': '\\|'}
_ESCAPE_CHARACTERS.update({
value: '\\x{0:02x}'.format(value)
for value in _NON_PRINTABLE_CHARACTERS})

_INVALID_PATH_CHARACTERS = [
os.path.sep, '!', '$', '%', '&', '*', '+', ':', ';', '<', '>', '?', '@',
'|', '~']
_INVALID_PATH_CHARACTERS.extend(_NON_PRINTABLE_CHARACTERS)

def __init__(self):
"""Initializes a data stream writer."""
super(DataStreamWriter, self).__init__()
self._display_escape_characters = str.maketrans(self._ESCAPE_CHARACTERS)
self._invalid_path_characters = str.maketrans({
value: '_' for value in self._INVALID_PATH_CHARACTERS})

def GetDisplayPath(
self, source_path_segments, source_data_stream_name):
"""Retrieves a path to display.
Args:
source_path_segments (list[str]): path segment of the source file.
source_data_stream_name (str): name of the data stream of the source file.
Returns:
str: display path.
"""
path_segments = [
path_segment.translate(self._display_escape_characters)
for path_segment in source_path_segments]

display_path = os.path.join(*path_segments)
if source_data_stream_name:
display_path = ':'.join([display_path, source_data_stream_name])

return display_path

def GetSanitizedPath(
self, source_path_segments, source_data_stream_name, target_path):
"""Retrieves santized a path.
This function replaces non-printable and other invalid path characters with
an underscore "_".
Args:
source_path_segments (list[str]): path segment of the source file.
source_data_stream_name (str): name of the data stream of the source file.
target_path (str): path of the target directory.
Returns:
str: sanitized path.
"""
path_segments = [
path_segment.translate(self._invalid_path_characters)
for path_segment in source_path_segments]

destination_path = os.path.join(target_path, *path_segments)
if source_data_stream_name:
source_data_stream_name = source_data_stream_name.translate(
self._invalid_path_characters)
destination_path = '_'.join([destination_path, source_data_stream_name])

return destination_path

def WriteDataStream(self, file_entry, data_stream_name, destination_path):
"""Writes the contents of the source data stream to a destination file.
Note that this function will overwrite an existing file.
Args:
file_entry (dfvfs.FileEntry): file entry whose content is to be written.
data_stream_name (str): name of the data stream whose content is to be
written.
destination_path (str): path of the destination file.
"""
source_file_object = file_entry.GetFileObject(
data_stream_name=data_stream_name)
if source_file_object:
with open(destination_path, 'wb') as destination_file_object:
source_file_object.seek(0, os.SEEK_SET)

data = source_file_object.read(self._BUFFER_SIZE)
while data:
destination_file_object.write(data)
data = source_file_object.read(self._BUFFER_SIZE)
225 changes: 225 additions & 0 deletions tools/extract_data_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Script to extract data streams."""

import argparse
import logging
import os
import sys

from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry

from dfvfs.helpers import command_line
from dfvfs.helpers import volume_scanner
from dfvfs.lib import errors

from dfimagetools import artifact_filters
from dfimagetools import data_stream_writer
from dfimagetools import file_entry_lister
from dfimagetools import helpers
from dfimagetools import resources


def Main():
"""The main program function.
Returns:
bool: True if successful or False if not.
"""
argument_parser = argparse.ArgumentParser(description=(
'Extracts data streams from a storage media image.'))

# TODO: add filter group
argument_parser.add_argument(
'--artifact_definitions', '--artifact-definitions',
dest='artifact_definitions', type=str, metavar='PATH', action='store',
help=('Path to a directory or file containing the artifact definition '
'.yaml files.'))

argument_parser.add_argument(
'--artifact_filters', '--artifact-filters', dest='artifact_filters',
type=str, default=None, metavar='NAMES', action='store', help=(
'Comma separated list of names of artifact definitions to extract.'))

argument_parser.add_argument(
'--custom_artifact_definitions', '--custom-artifact-definitions',
dest='custom_artifact_definitions', type=str, metavar='PATH',
action='store', help=(
'Path to a directory or file containing custom artifact definition '
'.yaml files. '))

# TODO: add output group
argument_parser.add_argument(
'-t', '--target', dest='target', action='store', metavar='PATH',
default=None, help=(
'target (or destination) path of a directory where the extracted '
'data streams should be stored.'))

# TODO: add source group
argument_parser.add_argument(
'--back_end', '--back-end', dest='back_end', action='store',
metavar='NTFS', default=None, help='preferred dfVFS back-end.')

argument_parser.add_argument(
'--partitions', '--partition', dest='partitions', action='store',
type=str, default=None, help=(
'Define partitions to be processed. A range of partitions can be '
'defined as: "3..5". Multiple partitions can be defined as: "1,3,5" '
'(a list of comma separated values). Ranges and lists can also be '
'combined as: "1,3..5". The first partition is 1. All partitions '
'can be specified with: "all".'))

argument_parser.add_argument(
'--snapshots', '--snapshot', dest='snapshots', action='store', type=str,
default=None, help=(
'Define snapshots to be processed. A range of snapshots can be '
'defined as: "3..5". Multiple snapshots can be defined as: "1,3,5" '
'(a list of comma separated values). Ranges and lists can also be '
'combined as: "1,3..5". The first snapshot is 1. All snapshots can '
'be specified with: "all".'))

argument_parser.add_argument(
'--volumes', '--volume', dest='volumes', action='store', type=str,
default=None, help=(
'Define volumes to be processed. A range of volumes can be defined '
'as: "3..5". Multiple volumes can be defined as: "1,3,5" (a list '
'of comma separated values). Ranges and lists can also be combined '
'as: "1,3..5". The first volume is 1. All volumes can be specified '
'with: "all".'))

argument_parser.add_argument(
'source', nargs='?', action='store', metavar='image.raw',
default=None, help='path of the storage media image.')

options = argument_parser.parse_args()

if not options.source:
print('Source value is missing.')
print('')
argument_parser.print_help()
print('')
return False

if options.artifact_filters:
if (not options.artifact_definitions and
not options.custom_artifact_definitions):
print('[ERROR] artifact filters were specified but no paths to '
'artifact definitions were provided.')
print('')
return False

# TODO: improve this, for now this script needs at least 1 filter.
if not options.artifact_filters:
print('[ERROR] no artifact filters were specified.')
print('')
return False

target_path = options.target
if not target_path:
target_path = '{0:s}.extracted'.format(os.path.basename(options.source))
target_path = os.path.join(os.getcwd(), target_path)

if not os.path.exists(target_path):
os.makedirs(target_path)

elif not os.path.isdir(target_path):
print('[ERROR] target path is not a directory.')
print('')
return False

helpers.SetDFVFSBackEnd(options.back_end)

logging.basicConfig(
level=logging.INFO, format='[%(levelname)s] %(message)s')

mediator = command_line.CLIVolumeScannerMediator()

volume_scanner_options = volume_scanner.VolumeScannerOptions()
volume_scanner_options.partitions = mediator.ParseVolumeIdentifiersString(
options.partitions)

if options.snapshots == 'none':
volume_scanner_options.snapshots = ['none']
else:
volume_scanner_options.snapshots = mediator.ParseVolumeIdentifiersString(
options.snapshots)

volume_scanner_options.volumes = mediator.ParseVolumeIdentifiersString(
options.volumes)

entry_lister = file_entry_lister.FileEntryLister(mediator=mediator)

try:
base_path_specs = entry_lister.GetBasePathSpecs(
options.source, options=volume_scanner_options)
if not base_path_specs:
print('No supported file system found in source.')
print('')
return False

registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()

if options.artifact_definitions:
registry.ReadFromDirectory(reader, options.artifact_definitions)
if options.custom_artifact_definitions:
registry.ReadFromDirectory(reader, options.custom_artifact_definitions)

# TODO: add support for determining environment variables and user
# accounts.
system_root_environment_variable = resources.EnvironmentVariable(
case_sensitive=False, name='SystemRoot', value='C:\\Windows')
windir_environment_variable = resources.EnvironmentVariable(
case_sensitive=False, name='WinDir', value='C:\\Windows')

environment_variables = [
system_root_environment_variable, windir_environment_variable]

filter_generator = artifact_filters.ArtifactDefinitionFiltersGenerator(
registry, environment_variables, [])

names = options.artifact_filters.split(',')
find_specs = list(filter_generator.GetFindSpecs(names))

if not find_specs:
print('[ERROR] an artifact filter was specified but no corresponding '
'file system find specifications were generated.')
print('')
return False

stream_writer = data_stream_writer.DataStreamWriter()
for file_entry, path_segments in entry_lister.ListFileEntriesWithFindSpecs(
base_path_specs, find_specs):
for data_stream in file_entry.data_streams:
display_path = stream_writer.GetDisplayPath(
path_segments, data_stream.name)
destination_path = stream_writer.GetSanitizedPath(
path_segments, data_stream.name, target_path)
logging.info('Extracting: {0:s} to: {1:s}'.format(
display_path, destination_path))

destination_directory = os.path.dirname(destination_path)
os.makedirs(destination_directory, exist_ok=True)

stream_writer.WriteDataStream(
file_entry, data_stream.name, destination_path)

except errors.ScannerError as exception:
print('[ERROR] {0!s}'.format(exception), file=sys.stderr)
print('')
return False

except KeyboardInterrupt:
print('Aborted by user.', file=sys.stderr)
print('')
return False

return True


if __name__ == '__main__':
if not Main():
sys.exit(1)
else:
sys.exit(0)
9 changes: 5 additions & 4 deletions tools/list_file_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def Main():
bool: True if successful or False if not.
"""
argument_parser = argparse.ArgumentParser(description=(
'Lists metadata of file entries in a directory or storage media image.'))
'Lists metadata of file entries in a storage media image.'))

# TODO: add filter group
argument_parser.add_argument(
Expand Down Expand Up @@ -88,7 +88,7 @@ def Main():

argument_parser.add_argument(
'source', nargs='?', action='store', metavar='image.raw',
default=None, help='path of the directory or storage media image.')
default=None, help='path of the storage media image.')

options = argument_parser.parse_args()

Expand All @@ -112,8 +112,6 @@ def Main():
print('[ERROR] artifact filters were specified but no paths to '
'artifact definitions were provided.')
print('')
argument_parser.print_help()
print('')
return False

helpers.SetDFVFSBackEnd(options.back_end)
Expand Down Expand Up @@ -175,6 +173,7 @@ def Main():
if not find_specs:
print('[ERROR] an artifact filter was specified but no corresponding '
'file system find specifications were generated.')
print('')
return False

if find_specs:
Expand All @@ -191,10 +190,12 @@ def Main():

except errors.ScannerError as exception:
print('[ERROR] {0!s}'.format(exception), file=sys.stderr)
print('')
return False

except KeyboardInterrupt:
print('Aborted by user.', file=sys.stderr)
print('')
return False

return True
Expand Down

0 comments on commit abc6e95

Please sign in to comment.