From 9270ecc33e1aa2fa6d19dfc8f3b8ae4e785e7e17 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sat, 30 Mar 2024 21:08:18 +0100 Subject: [PATCH] Changes to make timeliner support date-less log formats #4697 (#4854) --- config/dpkg/control | 2 +- dependencies.ini | 2 +- docs/sources/user/Using-log2timeline.md | 18 ++-- plaso/cli/status_view.py | 4 +- plaso/containers/events.py | 47 +++++++++++ plaso/dependencies.py | 2 +- plaso/engine/timeliner.py | 105 +++++++++++++++--------- requirements.txt | 2 +- tests/cli/log2timeline_tool.py | 22 ++--- tests/cli/psteal_tool.py | 14 ++-- tests/cli/status_view.py | 6 +- tests/containers/events.py | 50 +++++++++-- tests/engine/timeliner.py | 66 ++++++++------- 13 files changed, 226 insertions(+), 114 deletions(-) diff --git a/config/dpkg/control b/config/dpkg/control index c1cd841201..dfe59ff47c 100644 --- a/config/dpkg/control +++ b/config/dpkg/control @@ -17,7 +17,7 @@ Description: Data files for plaso (log2timeline) Package: python3-plaso Architecture: all -Depends: plaso-data (>= ${binary:Version}), libbde-python3 (>= 20220121), libcaes-python3 (>= 20240114), libcreg-python3 (>= 20200725), libesedb-python3 (>= 20220806), libevt-python3 (>= 20191104), libevtx-python3 (>= 20220724), libewf-python3 (>= 20131210), libfcrypto-python3 (>= 20240114), libfsapfs-python3 (>= 20220709), libfsext-python3 (>= 20220829), libfsfat-python3 (>= 20220925), libfshfs-python3 (>= 20220831), libfsntfs-python3 (>= 20211229), libfsxfs-python3 (>= 20220829), libfvde-python3 (>= 20220121), libfwnt-python3 (>= 20210717), libfwsi-python3 (>= 20240225), liblnk-python3 (>= 20230716), libluksde-python3 (>= 20220121), libmodi-python3 (>= 20210405), libmsiecf-python3 (>= 20150314), libolecf-python3 (>= 20151223), libphdi-python3 (>= 20220228), libqcow-python3 (>= 20201213), libregf-python3 (>= 20201002), libscca-python3 (>= 20190605), libsigscan-python3 (>= 20230109), libsmdev-python3 (>= 20140529), libsmraw-python3 (>= 20140612), libvhdi-python3 (>= 20201014), libvmdk-python3 (>= 20140421), libvsapm-python3 (>= 20230506), libvsgpt-python3 (>= 20211115), libvshadow-python3 (>= 20160109), libvslvm-python3 (>= 20160109), python3-acstore (>= 20240121), python3-artifacts (>= 20220219), python3-bencode, python3-certifi (>= 2016.9.26), python3-cffi-backend (>= 1.9.1), python3-chardet (>= 2.0.1), python3-dateutil (>= 1.5), python3-defusedxml (>= 0.5.0), python3-dfdatetime (>= 20221112), python3-dfvfs (>= 20240115), python3-dfwinreg (>= 20240229), python3-dtfabric (>= 20230518), python3-flor (>= 1.1.3), python3-future (>= 0.16.0), python3-idna (>= 2.5), python3-lz4 (>= 0.10.0), python3-opensearch, python3-pefile (>= 2021.5.24), python3-psutil (>= 5.4.3), python3-pyparsing (>= 3.0.0), python3-pytsk3 (>= 20210419), python3-redis (>= 3.4), python3-requests (>= 2.18.0), python3-six (>= 1.1.0), python3-tz, python3-urllib3 (>= 1.21.1), python3-xattr (>= 0.7.2), python3-xlsxwriter (>= 0.9.3), python3-yaml (>= 3.10), python3-yara (>= 3.4.0), python3-zmq (>= 2.1.11), python3-zstd (>= 1.3.0.2), ${misc:Depends} +Depends: plaso-data (>= ${binary:Version}), libbde-python3 (>= 20220121), libcaes-python3 (>= 20240114), libcreg-python3 (>= 20200725), libesedb-python3 (>= 20220806), libevt-python3 (>= 20191104), libevtx-python3 (>= 20220724), libewf-python3 (>= 20131210), libfcrypto-python3 (>= 20240114), libfsapfs-python3 (>= 20220709), libfsext-python3 (>= 20220829), libfsfat-python3 (>= 20220925), libfshfs-python3 (>= 20220831), libfsntfs-python3 (>= 20211229), libfsxfs-python3 (>= 20220829), libfvde-python3 (>= 20220121), libfwnt-python3 (>= 20210717), libfwsi-python3 (>= 20240225), liblnk-python3 (>= 20230716), libluksde-python3 (>= 20220121), libmodi-python3 (>= 20210405), libmsiecf-python3 (>= 20150314), libolecf-python3 (>= 20151223), libphdi-python3 (>= 20220228), libqcow-python3 (>= 20201213), libregf-python3 (>= 20201002), libscca-python3 (>= 20190605), libsigscan-python3 (>= 20230109), libsmdev-python3 (>= 20140529), libsmraw-python3 (>= 20140612), libvhdi-python3 (>= 20201014), libvmdk-python3 (>= 20140421), libvsapm-python3 (>= 20230506), libvsgpt-python3 (>= 20211115), libvshadow-python3 (>= 20160109), libvslvm-python3 (>= 20160109), python3-acstore (>= 20240121), python3-artifacts (>= 20220219), python3-bencode, python3-certifi (>= 2016.9.26), python3-cffi-backend (>= 1.9.1), python3-chardet (>= 2.0.1), python3-dateutil (>= 1.5), python3-defusedxml (>= 0.5.0), python3-dfdatetime (>= 20240330), python3-dfvfs (>= 20240115), python3-dfwinreg (>= 20240229), python3-dtfabric (>= 20230518), python3-flor (>= 1.1.3), python3-future (>= 0.16.0), python3-idna (>= 2.5), python3-lz4 (>= 0.10.0), python3-opensearch, python3-pefile (>= 2021.5.24), python3-psutil (>= 5.4.3), python3-pyparsing (>= 3.0.0), python3-pytsk3 (>= 20210419), python3-redis (>= 3.4), python3-requests (>= 2.18.0), python3-six (>= 1.1.0), python3-tz, python3-urllib3 (>= 1.21.1), python3-xattr (>= 0.7.2), python3-xlsxwriter (>= 0.9.3), python3-yaml (>= 3.10), python3-yara (>= 3.4.0), python3-zmq (>= 2.1.11), python3-zstd (>= 1.3.0.2), ${misc:Depends} Description: Python 3 module of plaso (log2timeline) Plaso (log2timeline) is a framework to create super timelines. Its purpose is to extract timestamps from various files found on typical diff --git a/dependencies.ini b/dependencies.ini index 264445be72..cabdc68d7b 100644 --- a/dependencies.ini +++ b/dependencies.ini @@ -52,7 +52,7 @@ version_property: __version__ [dfdatetime] dpkg_name: python3-dfdatetime -minimum_version: 20221112 +minimum_version: 20240330 rpm_name: python3-dfdatetime version_property: __version__ diff --git a/docs/sources/user/Using-log2timeline.md b/docs/sources/user/Using-log2timeline.md index 036ba96c3d..2c0967aa30 100644 --- a/docs/sources/user/Using-log2timeline.md +++ b/docs/sources/user/Using-log2timeline.md @@ -39,9 +39,9 @@ Checking availability and versions of dependencies. [OK] -Source path : /PATH/image.E01 -Source type : storage media image -Processing time : 00:00:00 +Source path : /PATH/image.E01 +Source type : storage media image +Processing time : 00:00:00 Processing started. ``` @@ -49,9 +49,9 @@ Processing started. ```bash plaso - log2timeline version 20210412 -Source path : /PATH/image.E01 -Source type : storage media image -Processing time : 00:04:57 +Source path : /PATH/image.E01 +Source type : storage media image +Processing time : 00:04:57 Tasks: Queued Processing Merging Abandoned Total 0 0 0 0 18675 @@ -186,9 +186,9 @@ every discovered file. To do more targeted extraction a filter file can be used. ```bash $ log2timeline.py -f filter --storage-file timeline.plaso test.vhd -Source path : /PATH/test.vhd -Source type : storage media image -Filter file : filter +Source path : /PATH/test.vhd +Source type : storage media image +Filter file : filter Processing started. ... diff --git a/plaso/cli/status_view.py b/plaso/cli/status_view.py index e26d6772ca..34e6665792 100644 --- a/plaso/cli/status_view.py +++ b/plaso/cli/status_view.py @@ -237,7 +237,7 @@ def _PrintAnalysisStatusHeader(self, processing_status): f'Storage file\t\t: {self._storage_file_path:s}\n') processing_time = self._FormatProcessingTime(processing_status) - self._output_writer.Write(f'Processing time\t\t: {processing_time:s}\n') + self._output_writer.Write(f'Processing time\t: {processing_time:s}\n') if processing_status and processing_status.events_status: self._PrintEventsStatus(processing_status.events_status) @@ -525,7 +525,7 @@ def PrintExtractionStatusHeader(self, processing_status): self._output_writer.Write(f'Filter file\t\t: {self._filter_file:s}\n') processing_time = self._FormatProcessingTime(processing_status) - self._output_writer.Write(f'Processing time\t\t: {processing_time:s}\n') + self._output_writer.Write(f'Processing time\t: {processing_time:s}\n') self._PrintTasksStatus(processing_status) self._output_writer.Write('\n') diff --git a/plaso/containers/events.py b/plaso/containers/events.py index 01902ec21e..51c89643d0 100644 --- a/plaso/containers/events.py +++ b/plaso/containers/events.py @@ -88,6 +88,7 @@ class DateLessLogHelper(interface.AttributeContainer): Attributes: earliest_date (list[int, int, int]): earliest possible date the event data stream was created. The date is a tuple of year, month and day of month. + granularity (str): granularity of the date-less log format. last_relative_date (list[int, int, int]): last relative date determined by the date-less log helper. The date is a tuple of year, month and day of month. @@ -100,17 +101,25 @@ class DateLessLogHelper(interface.AttributeContainer): SCHEMA = { '_event_data_stream_identifier': 'AttributeContainerIdentifier', 'earliest_date': 'List[int]', + 'granularity': 'str', 'last_relative_date': 'List[int]', 'latest_date': 'List[int]'} _SERIALIZABLE_PROTECTED_ATTRIBUTES = [ '_event_data_stream_identifier'] + # The date-less log format only supports time. + GRANULARITY_NO_DATE = 'd' + + # The date-less log format only supports month and day of month. + GRANULARITY_NO_YEARS = 'y' + def __init__(self): """Initializes a date-less log helper attribute container.""" super(DateLessLogHelper, self).__init__() self._event_data_stream_identifier = None self.earliest_date = None + self.granularity = self.GRANULARITY_NO_YEARS self.last_relative_date = None self.latest_date = None @@ -123,9 +132,22 @@ def CopyFromYearLessLogHelper(self, year_less_log_helper): year_less_log_helper (YearLessLogHelper): year-less log helper. """ self.earliest_date = (year_less_log_helper.earliest_year, 1, 1) + self.granularity = self.GRANULARITY_NO_YEARS self.last_relative_date = (year_less_log_helper.last_relative_year, 0, 0) self.latest_date = (year_less_log_helper.latest_year, 1, 1) + def GetEarliestDate(self): + """Retrieves the earliest date adjusted to the granularity. + + Returns: + tuple[int, int, int]: earliest date as tuple of year, month and day of + month or None if not available. + """ + if self.earliest_date and self.granularity == self.GRANULARITY_NO_YEARS: + return self.earliest_date[0], 0, 0 + + return self.earliest_date + def GetEventDataStreamIdentifier(self): """Retrieves the identifier of the associated event data stream. @@ -137,6 +159,31 @@ def GetEventDataStreamIdentifier(self): """ return self._event_data_stream_identifier + def GetLastRelativeDate(self): + """Retrieves the last relative date adjusted to the granularity. + + Returns: + tuple[int, int, int]: last relative date as tuple of year, month and day + of month or None if not available. + """ + if (self.last_relative_date and + self.granularity == self.GRANULARITY_NO_YEARS): + return self.last_relative_date[0], 0, 0 + + return self.last_relative_date + + def GetLatestDate(self): + """Retrieves the latest date adjusted to the granularity. + + Returns: + tuple[int, int, int]: latest date as tuple of year, month and day of + month or None if not available. + """ + if self.latest_date and self.granularity == self.GRANULARITY_NO_YEARS: + return self.latest_date[0], 0, 0 + + return self.latest_date + def SetEventDataStreamIdentifier(self, event_data_stream_identifier): """Sets the identifier of the associated event data stream. diff --git a/plaso/dependencies.py b/plaso/dependencies.py index 01e14d6ec8..2009fbde38 100644 --- a/plaso/dependencies.py +++ b/plaso/dependencies.py @@ -22,7 +22,7 @@ 'certifi': ('__version__', '2016.9.26', None, True), 'dateutil': ('__version__', '1.5', None, True), 'defusedxml': ('__version__', '0.5.0', None, True), - 'dfdatetime': ('__version__', '20221112', None, True), + 'dfdatetime': ('__version__', '20240330', None, True), 'dfvfs': ('__version__', '20240115', None, True), 'dfwinreg': ('__version__', '20240229', None, True), 'dtfabric': ('__version__', '20230518', None, True), diff --git a/plaso/engine/timeliner.py b/plaso/engine/timeliner.py index d0ee3029f4..88758153f5 100644 --- a/plaso/engine/timeliner.py +++ b/plaso/engine/timeliner.py @@ -46,8 +46,8 @@ def __init__( """ super(EventDataTimeliner, self).__init__() self._attribute_mappings = {} - self._base_years = {} - self._current_year = self._GetCurrentYear() + self._base_dates = {} + self._current_date = self._GetCurrentDate() self._data_location = data_location self._place_holder_event = set() self._preferred_time_zone = None @@ -75,32 +75,34 @@ def _CreateTimeZonePerPathSpec(self, system_configurations): self._time_zone_per_path_spec[path_spec.parent] = ( system_configuration.time_zone) - def _GetBaseYear(self, storage_writer, event_data): - """Retrieves the base year. + def _GetBaseDate(self, storage_writer, event_data): + """Retrieves the base date. Args: storage_writer (StorageWriter): storage writer. event_data (EventData): event data. Returns: - int: base year. + tuple[int, int, int]: base date, as a tuple of year, month, day of month. """ # If preferred year is set considered it a user override, otherwise try # to determine the year based on the date-less log helper or fallback to # the current year. if self._preferred_year: - return self._preferred_year + current_date = (self._preferred_year, 1, 1) + else: + current_date = self._current_date event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() if not event_data_stream_identifier: - return self._current_year + return current_date[0], 0, 0 lookup_key = event_data_stream_identifier.CopyToString() - base_year = self._base_years.get(lookup_key, None) - if base_year: - return base_year + base_date = self._base_dates.get(lookup_key, None) + if base_date: + return base_date filter_expression = f'_event_data_stream_identifier == "{lookup_key:s}"' date_less_log_helpers = list(storage_writer.GetAttributeContainers( @@ -108,59 +110,84 @@ def _GetBaseYear(self, storage_writer, event_data): filter_expression=filter_expression)) if not date_less_log_helpers: message = ( - f'missing date-less log helper, defaulting to current year: ' - f'{self._current_year:d}') + f'missing date-less log helper, defaulting to date: ' + f'{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d}') self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_year = self._current_year + base_date = (current_date[0], 0, 0) else: - earliest_date = date_less_log_helpers[0].earliest_date - last_relative_date = date_less_log_helpers[0].last_relative_date - latest_date = date_less_log_helpers[0].latest_date + date_less_log_helper = date_less_log_helpers[0] + + earliest_date = date_less_log_helper.GetEarliestDate() + last_relative_date = date_less_log_helper.GetLastRelativeDate() + latest_date = date_less_log_helper.GetLatestDate() + + if date_less_log_helper.granularity == ( + date_less_log_helper.GRANULARITY_NO_YEARS): + current_date = (current_date[0], 0, 0) + + if earliest_date is None or last_relative_date is None: + last_date = None + else: + last_date = tuple(map( + lambda earliest, last_relative: earliest + last_relative, + earliest_date, last_relative_date)) if earliest_date is None and latest_date is None: message = ( - f'missing earliest and latest year in date-less log helper, ' - f'defaulting to current year: {self._current_year:d}') + f'missing earliest and latest date in date-less log helper, ' + f'defaulting to date: {current_date[0]:d}-{current_date[1]:d}-' + f'{current_date[2]:d}') self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_year = self._current_year + base_date = current_date - elif earliest_date[0] + last_relative_date[0] < self._current_year: - base_year = earliest_date[0] + elif last_date < current_date: + base_date = earliest_date - elif latest_date[0] < self._current_year: + elif latest_date < current_date: message = ( - f'earliest year: {earliest_date[0]:d} as base year would exceed ' - f'current year: {self._current_year:d} + ' - f'{last_relative_date[0]:d}, using latest year: {latest_date[0]:d}') + f'earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-' + f'{earliest_date[2]:d} as base date would exceed : ' + f'{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d} + ' + f'{last_relative_date[0]:d}-{last_relative_date[1]:d}-' + f'{last_relative_date[2]:d}, using latest date: {latest_date[0]:d}-' + f'{latest_date[1]:d}-{latest_date[2]:d}') self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_year = latest_date[0] - last_relative_date[0] + base_date = tuple(map( + lambda latest, last_relative: latest - last_relative, + latest_date, last_relative_date)) else: message = ( - f'earliest year: {earliest_date[0]:d} and latest: year: ' - f'{latest_date[0]:d} as base year would exceed current year: ' - f'{self._current_year:d} + {last_relative_date[0]:d}, using ' - f'current year') + f'earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-' + f'{earliest_date[2]:d} and latest: date: {latest_date[0]:d}-' + f'{latest_date[1]:d}-{latest_date[2]:d} as base date would exceed ' + f'date: {current_date[0]:d}-{current_date[1]:d}-' + f'{current_date[2]:d} + {last_relative_date[0]:d}-' + f'{last_relative_date[1]:d}-{last_relative_date[2]:d}, using date: ' + f'{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d}') self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_year = self._current_year - last_relative_date[0] + base_date = tuple(map( + lambda current, last_relative: current - last_relative, + current_date, last_relative_date)) - self._base_years[lookup_key] = base_year + self._base_dates[lookup_key] = base_date - return base_year + return base_date - def _GetCurrentYear(self): - """Retrieves current year. + def _GetCurrentDate(self): + """Retrieves current date. Returns: - int: the current year. + tuple[int, int, int]: current date, as a tuple of year, month, day of + month. """ datetime_object = datetime.datetime.now(pytz.UTC) - return datetime_object.year + return datetime_object.year, datetime_object.month, datetime_object.day def _GetEvent( self, storage_writer, event_data, event_data_stream, date_time, @@ -180,10 +207,10 @@ def _GetEvent( """ timestamp = None if date_time.is_delta: - base_year = self._GetBaseYear(storage_writer, event_data) + base_date = self._GetBaseDate(storage_writer, event_data) try: - date_time = date_time.NewFromDeltaAndYear(base_year) + date_time = date_time.NewFromDeltaAndDate(*base_date) except ValueError as exception: self._ProduceTimeliningWarning( storage_writer, event_data, str(exception)) diff --git a/requirements.txt b/requirements.txt index 7b55e74eec..10de57ac8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ bencode.py certifi >= 2016.9.26 cffi >= 1.9.1 defusedxml >= 0.5.0 -dfdatetime >= 20221112 +dfdatetime >= 20240330 dfvfs >= 20240115 dfwinreg >= 20240229 dtfabric >= 20230518 diff --git a/tests/cli/log2timeline_tool.py b/tests/cli/log2timeline_tool.py index 06e095bfdd..40ca144912 100644 --- a/tests/cli/log2timeline_tool.py +++ b/tests/cli/log2timeline_tool.py @@ -37,7 +37,7 @@ def _CheckOutput(self, output, expected_output): output = output.split('\n') self.assertEqual(output[:3], expected_output[:3]) - self.assertTrue(output[3].startswith('Processing time\t\t: ')) + self.assertTrue(output[3].startswith('Processing time\t: ')) self.assertEqual(output[4:], expected_output[4:]) def _CreateExtractionOptions(self, source_path, password=None): @@ -191,7 +191,7 @@ def testExtractEventsFromSourcesOnDirectory(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: directory', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -224,7 +224,7 @@ def testExtractEventsFromSourcesOnAPFSImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -258,7 +258,7 @@ def testExtractEventsFromSourcesOnBDEImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -291,7 +291,7 @@ def testExtractEventsFromSourcesOnCompressedDMGImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -324,7 +324,7 @@ def testExtractEventsFromSourcesImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -359,7 +359,7 @@ def testExtractEventsFromSourcesPartitionedImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -394,7 +394,7 @@ def testExtractEventsFromSourcesOnVSSImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -431,7 +431,7 @@ def testExtractEventsFromSourcesOnFile(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: single file', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -468,7 +468,7 @@ def testExtractEventsFromSourcesOnLinkToDirectory(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: directory', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -503,7 +503,7 @@ def testExtractEventsFromSourcesOnLinkToFile(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: single file', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', diff --git a/tests/cli/psteal_tool.py b/tests/cli/psteal_tool.py index b77d3d9eb7..1c81978c02 100644 --- a/tests/cli/psteal_tool.py +++ b/tests/cli/psteal_tool.py @@ -33,7 +33,7 @@ def _CheckOutput(self, output, expected_output): output = output.split('\n') self.assertEqual(output[:3], expected_output[:3]) - self.assertTrue(output[3].startswith('Processing time\t\t: ')) + self.assertTrue(output[3].startswith('Processing time\t: ')) self.assertEqual(output[4:], expected_output[4:]) def testFailWhenOutputAlreadyExists(self): @@ -163,7 +163,7 @@ def testExtractEventsFromSourceDirectory(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: directory', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -207,7 +207,7 @@ def testExtractEventsFromSourceBDEImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -248,7 +248,7 @@ def testExtractEventsFromSourcesImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -291,7 +291,7 @@ def testExtractEventsFromSourcePartitionedImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -335,7 +335,7 @@ def testExtractEventsFromSourceVSSImage(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: storage media image', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', @@ -380,7 +380,7 @@ def testExtractEventsFromSourceSingleFile(self): '', 'Source path\t\t: {0:s}'.format(options.source), 'Source type\t\t: single file', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', 'Processing started.', 'Processing completed.', diff --git a/tests/cli/status_view.py b/tests/cli/status_view.py index 170c1c9ecc..2f0a55f9dc 100644 --- a/tests/cli/status_view.py +++ b/tests/cli/status_view.py @@ -54,7 +54,7 @@ def _CheckOutput(self, output, expected_output): output = output.split('\n') self.assertEqual(output[:4], expected_output[:4]) - self.assertTrue(output[4].startswith('Processing time\t\t: ')) + self.assertTrue(output[4].startswith('Processing time\t: ')) self.assertEqual(output[5:], expected_output[5:]) # TODO: add tests for _ClearScreen @@ -136,7 +136,7 @@ def testPrintExtractionStatusUpdateWindow(self): '', 'Source path\t\t: /test/source/path', 'Source type\t\t: directory', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', table_header, ('f_identifier ' @@ -174,7 +174,7 @@ def testPrintExtractionStatusUpdateWindow(self): '', 'Source path\t\t: /test/source/path', 'Source type\t\t: directory', - 'Processing time\t\t: 00:00:00', + 'Processing time\t: 00:00:00', '', table_header, ('f_identifier ' diff --git a/tests/containers/events.py b/tests/containers/events.py index 6ed0f09822..1ad30a3138 100644 --- a/tests/containers/events.py +++ b/tests/containers/events.py @@ -9,6 +9,38 @@ from tests import test_lib as shared_test_lib +class DateLessLogHelperTest(shared_test_lib.BaseTestCase): + """Tests for the date-less log helper attribute container.""" + + def testGetAttributeNames(self): + """Tests the GetAttributeNames function.""" + attribute_container = events.DateLessLogHelper() + + expected_attribute_names = [ + '_event_data_stream_identifier', + 'earliest_date', + 'granularity', + 'last_relative_date', + 'latest_date'] + + attribute_names = sorted(attribute_container.GetAttributeNames()) + + self.assertEqual(attribute_names, expected_attribute_names) + + def testGetEventDataStreamIdentifier(self): + """Tests the GetEventDataStreamIdentifier function.""" + attribute_container = events.DateLessLogHelper() + + identifier = attribute_container.GetEventDataStreamIdentifier() + self.assertIsNone(identifier) + + def testSetEventDataStreamIdentifier(self): + """Tests the SetEventDataStreamIdentifier function.""" + attribute_container = events.DateLessLogHelper() + + attribute_container.SetEventDataStreamIdentifier(None) + + class EventValuesHelperTest(shared_test_lib.BaseTestCase): """Tests for the event values helper functions.""" @@ -153,18 +185,20 @@ def testSetEventIdentifier(self): attribute_container.SetEventIdentifier(None) -class DateLessLogHelperTest(shared_test_lib.BaseTestCase): - """Tests for the date-less log helper attribute container.""" +# TODO: the YearLessLogHelper attribute container is kept for backwards +# compatibility remove once storage format 20230327 is obsolete. +class YearLessLogHelperTest(shared_test_lib.BaseTestCase): + """Tests for the year-less log helper attribute container.""" def testGetAttributeNames(self): """Tests the GetAttributeNames function.""" - attribute_container = events.DateLessLogHelper() + attribute_container = events.YearLessLogHelper() expected_attribute_names = [ '_event_data_stream_identifier', - 'earliest_date', - 'last_relative_date', - 'latest_date'] + 'earliest_year', + 'last_relative_year', + 'latest_year'] attribute_names = sorted(attribute_container.GetAttributeNames()) @@ -172,14 +206,14 @@ def testGetAttributeNames(self): def testGetEventDataStreamIdentifier(self): """Tests the GetEventDataStreamIdentifier function.""" - attribute_container = events.DateLessLogHelper() + attribute_container = events.YearLessLogHelper() identifier = attribute_container.GetEventDataStreamIdentifier() self.assertIsNone(identifier) def testSetEventDataStreamIdentifier(self): """Tests the SetEventDataStreamIdentifier function.""" - attribute_container = events.DateLessLogHelper() + attribute_container = events.YearLessLogHelper() attribute_container.SetEventDataStreamIdentifier(None) diff --git a/tests/engine/timeliner.py b/tests/engine/timeliner.py index 675e1bd151..f36368f1a8 100644 --- a/tests/engine/timeliner.py +++ b/tests/engine/timeliner.py @@ -55,12 +55,13 @@ class EventDataTimelinerTest(test_lib.EngineTestCase): # pylint: disable=protected-access # pylint: disable=arguments-differ - def _CreateStorageWriter(self, event_data, base_year=None): + def _CreateStorageWriter(self, event_data, base_date=None): """Creates a storage writer object. Args: event_data (EventData): event data. - base_year (Optional[int]): base year. + base_date (Optional[tuple[int, int, int]]): base date, as tuple of year, + month and day of month. Returns: FakeStorageWriter: storage writer. @@ -73,9 +74,9 @@ def _CreateStorageWriter(self, event_data, base_year=None): event_data_stream_identifier = event_data_stream.GetIdentifier() - if base_year: + if base_date: date_less_log_helper = events.DateLessLogHelper() - date_less_log_helper.earliest_date = (base_year, 1, 1) + date_less_log_helper.earliest_date = base_date date_less_log_helper.last_relative_date = (0, 0, 0) date_less_log_helper.SetEventDataStreamIdentifier( @@ -87,24 +88,25 @@ def _CreateStorageWriter(self, event_data, base_year=None): return storage_writer - def testGetBaseYear(self): - """Tests the _GetBaseYear function.""" + def testGetBaseDate(self): + """Tests the _GetBaseDate function.""" event_data_timeliner = timeliner.EventDataTimeliner( data_location=shared_test_lib.TEST_DATA_PATH) - current_year = event_data_timeliner._GetCurrentYear() + current_date = event_data_timeliner._GetCurrentDate() event_data = TestEventData1() event_data.value = 'MyValue' # Test with date-less log helper. - storage_writer = self._CreateStorageWriter(event_data, base_year=2012) + storage_writer = self._CreateStorageWriter( + event_data, base_date=(2012, 3, 30)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} - base_year = event_data_timeliner._GetBaseYear(storage_writer, event_data) - self.assertEqual(base_year, 2012) + base_date = event_data_timeliner._GetBaseDate(storage_writer, event_data) + self.assertEqual(base_date, (2012, 0, 0)) number_of_warnings = storage_writer.GetNumberOfAttributeContainers( 'timelining_warning') @@ -113,11 +115,11 @@ def testGetBaseYear(self): # Test missing date-less log helper. storage_writer = self._CreateStorageWriter(event_data) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} - base_year = event_data_timeliner._GetBaseYear(storage_writer, event_data) - self.assertEqual(base_year, current_year) + base_date = event_data_timeliner._GetBaseDate(storage_writer, event_data) + self.assertEqual(base_date, (current_date[0], 0, 0)) number_of_warnings = storage_writer.GetNumberOfAttributeContainers( 'timelining_warning') @@ -125,13 +127,13 @@ def testGetBaseYear(self): # TODO: improve test coverage. - def testGetCurrentYear(self): - """Tests the _GetCurrentYear function.""" + def testGetCurrentDate(self): + """Tests the _GetCurrentDate function.""" event_data_timeliner = timeliner.EventDataTimeliner( data_location=shared_test_lib.TEST_DATA_PATH) - current_year = event_data_timeliner._GetCurrentYear() - self.assertIsNotNone(current_year) + current_date = event_data_timeliner._GetCurrentDate() + self.assertIsNotNone(current_date) def testGetEvent(self): """Tests the _GetEvent function.""" @@ -147,8 +149,8 @@ def testGetEvent(self): date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( time_elements_tuple=(2010, 8, 12, 20, 6, 31, 429876)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} event = event_data_timeliner._GetEvent( storage_writer, event_data, None, date_time, 'Test Time') @@ -161,13 +163,14 @@ def testGetEvent(self): time_elements_tuple=(2010, 8, 12, 20, 6, 31, 429876)) # Test date time delta of February 29 with leap year. - storage_writer = self._CreateStorageWriter(event_data, base_year=2012) + storage_writer = self._CreateStorageWriter( + event_data, base_date=(2012, 1, 1)) date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( is_delta=True, time_elements_tuple=(0, 2, 29, 20, 6, 31, 429876)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} event = event_data_timeliner._GetEvent( storage_writer, event_data, None, date_time, 'Test Time') @@ -180,13 +183,14 @@ def testGetEvent(self): is_delta=True, time_elements_tuple=(1, 8, 12, 20, 6, 31, 429876)) # Test date time delta of February 29 with non-leap year. - storage_writer = self._CreateStorageWriter(event_data, base_year=2013) + storage_writer = self._CreateStorageWriter( + event_data, base_date=(2013, 1, 1)) date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( is_delta=True, time_elements_tuple=(0, 2, 29, 20, 6, 31, 429876)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} event = event_data_timeliner._GetEvent( storage_writer, event_data, None, date_time, 'Test Time') @@ -194,14 +198,14 @@ def testGetEvent(self): self.assertIsNotNone(event.date_time) self.assertIsInstance(event.date_time, dfdatetime_semantic_time.InvalidTime) - # Test date time delta without a base year. + # Test date time delta without a base date. storage_writer = self._CreateStorageWriter(event_data) date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( time_elements_tuple=(4, 2, 29, 20, 6, 31, 429876)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} event = event_data_timeliner._GetEvent( storage_writer, event_data, None, date_time, 'Test Time')