Skip to content

Commit

Permalink
Worked on script to extract data streams log2timeline#1
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Aug 5, 2023
1 parent af6c344 commit 942fcaa
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 42 deletions.
25 changes: 9 additions & 16 deletions dfimagetools/data_stream_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,23 @@

import os

from dfimagetools import definitions


class DataStreamWriter(object):
"""Data stream writer."""

_BUFFER_SIZE = 32768

_NON_PRINTABLE_CHARACTERS = list(range(0, 0x20)) + list(range(0x7f, 0xa0))

_ESCAPE_CHARACTERS = {
'/': '\\/',
':': '\\:',
'\\': '\\\\',
'|': '\\|'}
_ESCAPE_CHARACTERS.update({
value: f'\\x{value:02x}' for value in _NON_PRINTABLE_CHARACTERS})
# Class constant that defines the default read buffer size.
_READ_BUFFER_SIZE = 16 * 1024 * 1024

_INVALID_PATH_CHARACTERS = [
os.path.sep, '!', '$', '%', '&', '*', '+', ':', ';', '<', '>', '?', '@',
'|', '~']
_INVALID_PATH_CHARACTERS.extend(_NON_PRINTABLE_CHARACTERS)
_INVALID_PATH_CHARACTERS.extend(definitions.NON_PRINTABLE_CHARACTERS.keys())

def __init__(self):
"""Initializes a data stream writer."""
super(DataStreamWriter, self).__init__()
self._display_escape_characters = str.maketrans(self._ESCAPE_CHARACTERS)
self._invalid_path_characters = str.maketrans({
value: '_' for value in self._INVALID_PATH_CHARACTERS})

Expand All @@ -43,7 +35,8 @@ def GetDisplayPath(
str: display path.
"""
path_segments = [
path_segment.translate(self._display_escape_characters)
path_segment.translate(
definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)
for path_segment in source_path_segments]

display_path = '/'.join(path_segments)
Expand Down Expand Up @@ -96,7 +89,7 @@ def WriteDataStream(self, file_entry, data_stream_name, destination_path):
with open(destination_path, 'wb') as destination_file_object:
source_file_object.seek(0, os.SEEK_SET)

data = source_file_object.read(self._BUFFER_SIZE)
data = source_file_object.read(self._READ_BUFFER_SIZE)
while data:
destination_file_object.write(data)
data = source_file_object.read(self._BUFFER_SIZE)
data = source_file_object.read(self._READ_BUFFER_SIZE)
3 changes: 1 addition & 2 deletions dfimagetools/file_entry_lister.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ class FileEntryLister(volume_scanner.VolumeScanner):
'C:\\Windows',
'C:\\WINNT',
'C:\\WTSRV',
'C:\\WINNT35',
])
'C:\\WINNT35'])

def __init__(self, mediator=None, use_aliases=True):
"""Initializes a file entry lister.
Expand Down
45 changes: 21 additions & 24 deletions tests/data_stream_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,28 @@ class DataStreamWriterTest(test_lib.BaseTestCase):

# pylint: disable=protected-access

def testGetSanitizedPath(self):
"""Tests the GetSanitizedPath function."""
path = self._GetTestFilePath(['image.qcow2'])
self._SkipIfPathNotExists(path)

path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=path)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_QCOW, parent=path_spec)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/passwords.txt',
parent=path_spec)

file_system = resolver.Resolver.OpenFileSystem(path_spec)
file_entry = resolver.Resolver.OpenFileEntry(path_spec)

test_lister = file_entry_lister.FileEntryLister()
file_entries = list(test_lister._ListFileEntry(
file_system, file_entry, ['']))

self.assertEqual(len(file_entries), 1)
_FILE_DATA = b'\n'.join([
b'place,user,password',
b'bank,joesmith,superrich',
b'alarm system,-,1234',
b'treasure chest,-,1111',
b'uber secret laire,admin,admin',
b''])

def testGetDisplayPath(self):
"""Tests the GetDisplayPath function."""
test_data_stream_writer = data_stream_writer.DataStreamWriter()

_, path_segments = file_entries[0]
display_path = test_data_stream_writer.GetDisplayPath(
['passwords.txt'], 'data_stream')
self.assertEqual(display_path, 'passwords.txt:data_stream')

def testGetSanitizedPath(self):
"""Tests the GetSanitizedPath function."""
test_data_stream_writer = data_stream_writer.DataStreamWriter()

sanitized_path = test_data_stream_writer.GetSanitizedPath(
path_segments, '', '')
['passwords.txt'], '', '')
self.assertEqual(sanitized_path, 'passwords.txt')

def testWriteDataStream(self):
Expand Down Expand Up @@ -80,7 +74,10 @@ def testWriteDataStream(self):
destination_path = os.path.join(temp_directory, 'passwords.txt')
test_data_stream_writer.WriteDataStream(file_entry, '', destination_path)

# TODO: check contents of exported file
with open(destination_path, 'rb') as file_object:
file_data = file_object.read()

self.assertEqual(file_data, self._FILE_DATA)


if __name__ == '__main__':
Expand Down

0 comments on commit 942fcaa

Please sign in to comment.