From 2bbf16c3b9acdbd5feb2c521177f676d069942a6 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Fri, 29 Mar 2024 10:54:53 +0100 Subject: [PATCH] Changes to make timeliner support date-less log formats #4697 --- plaso/containers/events.py | 10 ++++ plaso/engine/timeliner.py | 106 +++++++++++++++++++++++-------------- tests/containers/events.py | 50 ++++++++++++++--- tests/engine/timeliner.py | 64 +++++++++++----------- 4 files changed, 153 insertions(+), 77 deletions(-) diff --git a/plaso/containers/events.py b/plaso/containers/events.py index 01902ec21e..3663f5bd17 100644 --- a/plaso/containers/events.py +++ b/plaso/containers/events.py @@ -88,6 +88,7 @@ class DateLessLogHelper(interface.AttributeContainer): Attributes: earliest_date (list[int, int, int]): earliest possible date the event data stream was created. The date is a tuple of year, month and day of month. + granularity (str): granularity of the date-less log format. last_relative_date (list[int, int, int]): last relative date determined by the date-less log helper. The date is a tuple of year, month and day of month. @@ -100,17 +101,25 @@ class DateLessLogHelper(interface.AttributeContainer): SCHEMA = { '_event_data_stream_identifier': 'AttributeContainerIdentifier', 'earliest_date': 'List[int]', + 'granularity': 'str', 'last_relative_date': 'List[int]', 'latest_date': 'List[int]'} _SERIALIZABLE_PROTECTED_ATTRIBUTES = [ '_event_data_stream_identifier'] + # The date-less log format only supports time. + GRANULARITY_NO_DATE = 'd' + + # The date-less log format only supports month and day of month. + GRANULARITY_NO_YEARS = 'y' + def __init__(self): """Initializes a date-less log helper attribute container.""" super(DateLessLogHelper, self).__init__() self._event_data_stream_identifier = None self.earliest_date = None + self.granularity = self.GRANULARITY_NO_YEARS self.last_relative_date = None self.latest_date = None @@ -123,6 +132,7 @@ def CopyFromYearLessLogHelper(self, year_less_log_helper): year_less_log_helper (YearLessLogHelper): year-less log helper. """ self.earliest_date = (year_less_log_helper.earliest_year, 1, 1) + self.granularity = self.GRANULARITY_NO_YEARS self.last_relative_date = (year_less_log_helper.last_relative_year, 0, 0) self.latest_date = (year_less_log_helper.latest_year, 1, 1) diff --git a/plaso/engine/timeliner.py b/plaso/engine/timeliner.py index d0ee3029f4..628b940384 100644 --- a/plaso/engine/timeliner.py +++ b/plaso/engine/timeliner.py @@ -46,8 +46,8 @@ def __init__( """ super(EventDataTimeliner, self).__init__() self._attribute_mappings = {} - self._base_years = {} - self._current_year = self._GetCurrentYear() + self._base_dates = {} + self._current_date = self._GetCurrentDate() self._data_location = data_location self._place_holder_event = set() self._preferred_time_zone = None @@ -75,32 +75,32 @@ def _CreateTimeZonePerPathSpec(self, system_configurations): self._time_zone_per_path_spec[path_spec.parent] = ( system_configuration.time_zone) - def _GetBaseYear(self, storage_writer, event_data): - """Retrieves the base year. + def _GetBaseDate(self, storage_writer, event_data): + """Retrieves the base date. Args: storage_writer (StorageWriter): storage writer. event_data (EventData): event data. Returns: - int: base year. + tuple[int, int, int]: base date, as a tuple of year, month, day of month. """ # If preferred year is set considered it a user override, otherwise try # to determine the year based on the date-less log helper or fallback to # the current year. if self._preferred_year: - return self._preferred_year + return (self._preferred_year, 1, 1) event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() if not event_data_stream_identifier: - return self._current_year + return self._current_date lookup_key = event_data_stream_identifier.CopyToString() - base_year = self._base_years.get(lookup_key, None) - if base_year: - return base_year + base_date = self._base_dates.get(lookup_key, None) + if base_date: + return base_date filter_expression = f'_event_data_stream_identifier == "{lookup_key:s}"' date_less_log_helpers = list(storage_writer.GetAttributeContainers( @@ -108,59 +108,86 @@ def _GetBaseYear(self, storage_writer, event_data): filter_expression=filter_expression)) if not date_less_log_helpers: message = ( - f'missing date-less log helper, defaulting to current year: ' - f'{self._current_year:d}') + f'missing date-less log helper, defaulting to current date: ' + f'{self._current_date[0]:d}-{self._current_date[1]:d}-' + f'{self._current_date[2]:d}') self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_year = self._current_year + base_date = self._current_date else: - earliest_date = date_less_log_helpers[0].earliest_date - last_relative_date = date_less_log_helpers[0].last_relative_date - latest_date = date_less_log_helpers[0].latest_date + date_less_log_helper = date_less_log_helpers[0] + + earliest_date = date_less_log_helper.earliest_date + last_relative_date = date_less_log_helper.last_relative_date + latest_date = date_less_log_helper.latest_date + current_date = self._current_date + + if date_less_log_helper.granularity == ( + date_less_log_helper.GRANULARITY_NO_YEARS): + if earliest_date: + earliest_date = (earliest_date[0], 0, 0) + if last_relative_date: + last_relative_date = (last_relative_date[0], 0, 0) + if latest_date: + latest_date = (latest_date[0], 0, 0) + + current_date = (current_date[0], 0, 0) if earliest_date is None and latest_date is None: message = ( - f'missing earliest and latest year in date-less log helper, ' - f'defaulting to current year: {self._current_year:d}') + f'missing earliest and latest date in date-less log helper, ' + f'defaulting to current date: {current_date[0]:d}-' + f'{current_date[1]:d}-{current_date[2]:d}') self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_year = self._current_year + base_date = current_date - elif earliest_date[0] + last_relative_date[0] < self._current_year: - base_year = earliest_date[0] + elif earliest_date[0] + last_relative_date[0] < current_date[0]: + base_date = (earliest_date[0], 1, 1) - elif latest_date[0] < self._current_year: + elif latest_date[0] < current_date[0]: message = ( - f'earliest year: {earliest_date[0]:d} as base year would exceed ' - f'current year: {self._current_year:d} + ' - f'{last_relative_date[0]:d}, using latest year: {latest_date[0]:d}') + f'earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-' + f'{earliest_date[2]:d} as base date would exceed current date: ' + f'{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d} + ' + f'{last_relative_date[0]:d}-{last_relative_date[1]:d}-' + f'{last_relative_date[2]:d}, using latest date: {latest_date[0]:d}-' + f'{latest_date[1]:d}-{latest_date[2]:d}') self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_year = latest_date[0] - last_relative_date[0] + base_date = tuple(map( + lambda latest, last_relative: latest - last_relative, + latest_date, last_relative_date)) else: message = ( - f'earliest year: {earliest_date[0]:d} and latest: year: ' - f'{latest_date[0]:d} as base year would exceed current year: ' - f'{self._current_year:d} + {last_relative_date[0]:d}, using ' - f'current year') + f'earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-' + f'{earliest_date[2]:d} and latest: date: {latest_date[0]:d}-' + f'{latest_date[1]:d}-{latest_date[2]:d} as base date would exceed ' + f'current date: {current_date[0]:d}-{current_date[1]:d}-' + f'{current_date[2]:d} + {last_relative_date[0]:d}-' + f'{last_relative_date[1]:d}-{last_relative_date[2]:d}, using ' + f'current date') self._ProduceTimeliningWarning(storage_writer, event_data, message) - base_year = self._current_year - last_relative_date[0] + base_date = tuple(map( + lambda current, last_relative: current - last_relative, + current_date, last_relative_date)) - self._base_years[lookup_key] = base_year + self._base_dates[lookup_key] = base_date - return base_year + return base_date - def _GetCurrentYear(self): - """Retrieves current year. + def _GetCurrentDate(self): + """Retrieves current date. Returns: - int: the current year. + tuple[int, int, int]: current date, as a tuple of year, month, day of + month. """ datetime_object = datetime.datetime.now(pytz.UTC) - return datetime_object.year + return datetime_object.year, datetime_object.month, datetime_object.day def _GetEvent( self, storage_writer, event_data, event_data_stream, date_time, @@ -180,10 +207,11 @@ def _GetEvent( """ timestamp = None if date_time.is_delta: - base_year = self._GetBaseYear(storage_writer, event_data) + base_date = self._GetBaseDate(storage_writer, event_data) try: - date_time = date_time.NewFromDeltaAndYear(base_year) + # TODO: change dfDateTime to support NewFromDeltaAndDate. + date_time = date_time.NewFromDeltaAndYear(base_date[0]) except ValueError as exception: self._ProduceTimeliningWarning( storage_writer, event_data, str(exception)) diff --git a/tests/containers/events.py b/tests/containers/events.py index 6ed0f09822..1ad30a3138 100644 --- a/tests/containers/events.py +++ b/tests/containers/events.py @@ -9,6 +9,38 @@ from tests import test_lib as shared_test_lib +class DateLessLogHelperTest(shared_test_lib.BaseTestCase): + """Tests for the date-less log helper attribute container.""" + + def testGetAttributeNames(self): + """Tests the GetAttributeNames function.""" + attribute_container = events.DateLessLogHelper() + + expected_attribute_names = [ + '_event_data_stream_identifier', + 'earliest_date', + 'granularity', + 'last_relative_date', + 'latest_date'] + + attribute_names = sorted(attribute_container.GetAttributeNames()) + + self.assertEqual(attribute_names, expected_attribute_names) + + def testGetEventDataStreamIdentifier(self): + """Tests the GetEventDataStreamIdentifier function.""" + attribute_container = events.DateLessLogHelper() + + identifier = attribute_container.GetEventDataStreamIdentifier() + self.assertIsNone(identifier) + + def testSetEventDataStreamIdentifier(self): + """Tests the SetEventDataStreamIdentifier function.""" + attribute_container = events.DateLessLogHelper() + + attribute_container.SetEventDataStreamIdentifier(None) + + class EventValuesHelperTest(shared_test_lib.BaseTestCase): """Tests for the event values helper functions.""" @@ -153,18 +185,20 @@ def testSetEventIdentifier(self): attribute_container.SetEventIdentifier(None) -class DateLessLogHelperTest(shared_test_lib.BaseTestCase): - """Tests for the date-less log helper attribute container.""" +# TODO: the YearLessLogHelper attribute container is kept for backwards +# compatibility remove once storage format 20230327 is obsolete. +class YearLessLogHelperTest(shared_test_lib.BaseTestCase): + """Tests for the year-less log helper attribute container.""" def testGetAttributeNames(self): """Tests the GetAttributeNames function.""" - attribute_container = events.DateLessLogHelper() + attribute_container = events.YearLessLogHelper() expected_attribute_names = [ '_event_data_stream_identifier', - 'earliest_date', - 'last_relative_date', - 'latest_date'] + 'earliest_year', + 'last_relative_year', + 'latest_year'] attribute_names = sorted(attribute_container.GetAttributeNames()) @@ -172,14 +206,14 @@ def testGetAttributeNames(self): def testGetEventDataStreamIdentifier(self): """Tests the GetEventDataStreamIdentifier function.""" - attribute_container = events.DateLessLogHelper() + attribute_container = events.YearLessLogHelper() identifier = attribute_container.GetEventDataStreamIdentifier() self.assertIsNone(identifier) def testSetEventDataStreamIdentifier(self): """Tests the SetEventDataStreamIdentifier function.""" - attribute_container = events.DateLessLogHelper() + attribute_container = events.YearLessLogHelper() attribute_container.SetEventDataStreamIdentifier(None) diff --git a/tests/engine/timeliner.py b/tests/engine/timeliner.py index 675e1bd151..a600e21d8b 100644 --- a/tests/engine/timeliner.py +++ b/tests/engine/timeliner.py @@ -55,12 +55,13 @@ class EventDataTimelinerTest(test_lib.EngineTestCase): # pylint: disable=protected-access # pylint: disable=arguments-differ - def _CreateStorageWriter(self, event_data, base_year=None): + def _CreateStorageWriter(self, event_data, base_date=None): """Creates a storage writer object. Args: event_data (EventData): event data. - base_year (Optional[int]): base year. + base_date (Optional[tuple[int, int, int]]): base date, as tuple of year, + month and day of month. Returns: FakeStorageWriter: storage writer. @@ -73,9 +74,9 @@ def _CreateStorageWriter(self, event_data, base_year=None): event_data_stream_identifier = event_data_stream.GetIdentifier() - if base_year: + if base_date: date_less_log_helper = events.DateLessLogHelper() - date_less_log_helper.earliest_date = (base_year, 1, 1) + date_less_log_helper.earliest_date = base_date date_less_log_helper.last_relative_date = (0, 0, 0) date_less_log_helper.SetEventDataStreamIdentifier( @@ -87,24 +88,25 @@ def _CreateStorageWriter(self, event_data, base_year=None): return storage_writer - def testGetBaseYear(self): - """Tests the _GetBaseYear function.""" + def testGetBaseDate(self): + """Tests the _GetBaseDate function.""" event_data_timeliner = timeliner.EventDataTimeliner( data_location=shared_test_lib.TEST_DATA_PATH) - current_year = event_data_timeliner._GetCurrentYear() + current_date = event_data_timeliner._GetCurrentDate() event_data = TestEventData1() event_data.value = 'MyValue' # Test with date-less log helper. - storage_writer = self._CreateStorageWriter(event_data, base_year=2012) + storage_writer = self._CreateStorageWriter( + event_data, base_date=(2012, 1, 1)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} - base_year = event_data_timeliner._GetBaseYear(storage_writer, event_data) - self.assertEqual(base_year, 2012) + base_date = event_data_timeliner._GetBaseDate(storage_writer, event_data) + self.assertEqual(base_date, (2012, 1, 1)) number_of_warnings = storage_writer.GetNumberOfAttributeContainers( 'timelining_warning') @@ -113,11 +115,11 @@ def testGetBaseYear(self): # Test missing date-less log helper. storage_writer = self._CreateStorageWriter(event_data) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} - base_year = event_data_timeliner._GetBaseYear(storage_writer, event_data) - self.assertEqual(base_year, current_year) + base_date = event_data_timeliner._GetBaseDate(storage_writer, event_data) + self.assertEqual(base_date, current_date) number_of_warnings = storage_writer.GetNumberOfAttributeContainers( 'timelining_warning') @@ -126,12 +128,12 @@ def testGetBaseYear(self): # TODO: improve test coverage. def testGetCurrentYear(self): - """Tests the _GetCurrentYear function.""" + """Tests the _GetCurrentDate function.""" event_data_timeliner = timeliner.EventDataTimeliner( data_location=shared_test_lib.TEST_DATA_PATH) - current_year = event_data_timeliner._GetCurrentYear() - self.assertIsNotNone(current_year) + current_date = event_data_timeliner._GetCurrentDate() + self.assertIsNotNone(current_date) def testGetEvent(self): """Tests the _GetEvent function.""" @@ -147,8 +149,8 @@ def testGetEvent(self): date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( time_elements_tuple=(2010, 8, 12, 20, 6, 31, 429876)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} event = event_data_timeliner._GetEvent( storage_writer, event_data, None, date_time, 'Test Time') @@ -161,13 +163,14 @@ def testGetEvent(self): time_elements_tuple=(2010, 8, 12, 20, 6, 31, 429876)) # Test date time delta of February 29 with leap year. - storage_writer = self._CreateStorageWriter(event_data, base_year=2012) + storage_writer = self._CreateStorageWriter( + event_data, base_date=(2012, 1, 1)) date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( is_delta=True, time_elements_tuple=(0, 2, 29, 20, 6, 31, 429876)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} event = event_data_timeliner._GetEvent( storage_writer, event_data, None, date_time, 'Test Time') @@ -180,13 +183,14 @@ def testGetEvent(self): is_delta=True, time_elements_tuple=(1, 8, 12, 20, 6, 31, 429876)) # Test date time delta of February 29 with non-leap year. - storage_writer = self._CreateStorageWriter(event_data, base_year=2013) + storage_writer = self._CreateStorageWriter( + event_data, base_date=(2013, 1, 1)) date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( is_delta=True, time_elements_tuple=(0, 2, 29, 20, 6, 31, 429876)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} event = event_data_timeliner._GetEvent( storage_writer, event_data, None, date_time, 'Test Time') @@ -194,14 +198,14 @@ def testGetEvent(self): self.assertIsNotNone(event.date_time) self.assertIsInstance(event.date_time, dfdatetime_semantic_time.InvalidTime) - # Test date time delta without a base year. + # Test date time delta without a base date. storage_writer = self._CreateStorageWriter(event_data) date_time = dfdatetime_time_elements.TimeElementsInMicroseconds( time_elements_tuple=(4, 2, 29, 20, 6, 31, 429876)) - # Ensure to reset the timeliner base years cache. - event_data_timeliner._base_years = {} + # Ensure to reset the timeliner base dates cache. + event_data_timeliner._base_dates = {} event = event_data_timeliner._GetEvent( storage_writer, event_data, None, date_time, 'Test Time')