From 4fcab02e8109896841f160678d7db552ad9dab4f Mon Sep 17 00:00:00 2001 From: Jason <52063018+dfjxs@users.noreply.github.com> Date: Fri, 25 Mar 2022 18:03:31 +1100 Subject: [PATCH] Fix population of partition path_spec from location (#1028) * Fix population of partition path_spec from location * Fix population of partition path_spec from location --- turbinia/evidence.py | 9 +++---- turbinia/processors/partitions.py | 7 +++--- turbinia/workers/fsstat.py | 15 ++++++------ turbinia/workers/fsstat_test.py | 6 ++--- turbinia/workers/partitions.py | 40 +++++++++++++++++++++---------- 5 files changed, 47 insertions(+), 30 deletions(-) diff --git a/turbinia/evidence.py b/turbinia/evidence.py index 3b924980d..0e562d6fb 100644 --- a/turbinia/evidence.py +++ b/turbinia/evidence.py @@ -578,14 +578,13 @@ class DiskPartition(RawDisk): partition_offset (int): Offset of the partition in bytes. partition_size (int): Size of the partition in bytes. path_spec (dfvfs.PathSpec): Partition path spec. - type_indicator (str): Partition type indicator from dfVFS. """ POSSIBLE_STATES = [EvidenceState.ATTACHED, EvidenceState.MOUNTED] def __init__( self, partition_location=None, partition_offset=None, partition_size=None, - lv_uuid=None, path_spec=None, type_indicator=None, *args, **kwargs): + lv_uuid=None, path_spec=None, *args, **kwargs): """Initialization for raw volume evidence object.""" self.partition_location = partition_location @@ -593,7 +592,6 @@ def __init__( self.partition_size = partition_size self.lv_uuid = lv_uuid self.path_spec = path_spec - self.type_indicator = type_indicator super(DiskPartition, self).__init__(*args, **kwargs) # This Evidence needs to have a parent @@ -614,7 +612,10 @@ def _preprocess(self, _, required_states): path_specs, self.partition_location) if path_spec: self.path_spec = path_spec - self.type_indicator = path_spec.type_indicator + else: + raise TurbiniaException( + 'Could not find path_spec for location {0:s}'.format( + self.partition_location)) # In attaching a partition, we create a new loopback device using the # partition offset and size. diff --git a/turbinia/processors/partitions.py b/turbinia/processors/partitions.py index 8f8e45010..4ddc68eb4 100644 --- a/turbinia/processors/partitions.py +++ b/turbinia/processors/partitions.py @@ -90,14 +90,15 @@ def GetPathSpecByLocation(path_specs, location): for path_spec in path_specs: child_path_spec = path_spec fs_location = getattr(path_spec, 'location', None) + if fs_location and fs_location == location: + return child_path_spec while path_spec.HasParent(): type_indicator = path_spec.type_indicator if type_indicator in dfvfs_definitions.VOLUME_SYSTEM_TYPE_INDICATORS: - if fs_location in ('\\', '/'): - fs_location = getattr(path_spec, 'location', None) + fs_location = getattr(path_spec, 'location', None) break path_spec = path_spec.parent if fs_location == location: return child_path_spec - log.warn('Could not find path_spec for location {0:s}'.format(location)) + log.error('Could not find path_spec for location {0:s}'.format(location)) return None diff --git a/turbinia/workers/fsstat.py b/turbinia/workers/fsstat.py index 6339c2cfa..d94b2218c 100644 --- a/turbinia/workers/fsstat.py +++ b/turbinia/workers/fsstat.py @@ -39,13 +39,14 @@ def run(self, evidence, result): """ fsstat_output = os.path.join(self.output_dir, 'fsstat.txt') - # Since fsstat does not support XFS, we won't run it when we know the - # partition is XFS. - - # Note: an evidence object that was serialized will not have a path_spec - # because it is removed prior to JSON serialization in evidence.py:243 - if evidence.type_indicator == "XFS": - message = 'Not running fsstat since partition is XFS' + if evidence.path_spec is None: + message = 'Could not run fsstat since partition does not have a path_spec' + result.log(message) + result.close(self, success=False, status=message) + # Since fsstat does not support some filesystems, we won't run it when we + # know the partition is not supported. + elif evidence.path_spec.type_indicator in ("APFS", "XFS"): + message = 'Not running fsstat since partition is not supported' result.log(message) result.close(self, success=True, status=message) else: diff --git a/turbinia/workers/fsstat_test.py b/turbinia/workers/fsstat_test.py index 295dc4efb..f8805decf 100644 --- a/turbinia/workers/fsstat_test.py +++ b/turbinia/workers/fsstat_test.py @@ -39,7 +39,7 @@ def setUp(self): def testFsstatRun(self, mock_evidence): """Test fsstat task run.""" self.task.execute = mock.MagicMock(return_value=0) - mock_evidence.type_indicator = 'EXT' + mock_evidence.path_spec.type_indicator = 'EXT' result = self.task.run(mock_evidence, self.result) # Ensure execute method is being called. @@ -49,7 +49,7 @@ def testFsstatRun(self, mock_evidence): # Test for XFS self.task.execute.reset_mock() - mock_evidence.type_indicator = 'XFS' + mock_evidence.path_spec.type_indicator = 'XFS' result = self.task.run(mock_evidence, self.result) # Ensure execute method is not being called. @@ -65,7 +65,7 @@ def testFsstatRunNoPathSpec(self, mock_evidence): result = self.task.run(mock_evidence, self.result) # Ensure execute method is not being called. - self.task.execute.assert_called_once() + self.task.execute.assert_not_called() # Ensure run method returns a TurbiniaTaskResult instance. self.assertIsInstance(result, TurbiniaTaskResult) diff --git a/turbinia/workers/partitions.py b/turbinia/workers/partitions.py index 6e8c99cdf..401f32b35 100644 --- a/turbinia/workers/partitions.py +++ b/turbinia/workers/partitions.py @@ -40,6 +40,27 @@ class PartitionEnumerationTask(TurbiniaTask): REQUIRED_STATES = [EvidenceState.ATTACHED] + def _GetLocation(self, path_spec): + """Retrieve the best location for a partition. + + Args: + path_spec (dfvfs.PathSpec): dfVFS path spec. + + Returns: + The location attribute for the partition. + """ + location = getattr(path_spec, 'location', None) + if location and location not in ('\\', '/'): + return location + while path_spec.HasParent(): + path_spec = path_spec.parent + new_location = getattr(path_spec, 'location', None) + if new_location and new_location not in ('\\', '/'): + type_indicator = path_spec.type_indicator + if type_indicator in dfvfs_definitions.VOLUME_SYSTEM_TYPE_INDICATORS: + return new_location + return location + def _ProcessPartition(self, path_spec): """Generate RawDiskPartition from a PathSpec. @@ -53,18 +74,16 @@ def _ProcessPartition(self, path_spec): status_report = [] fs_path_spec = path_spec - fs_location = None - partition_location = None + location = None volume_index = None partition_index = None partition_offset = None partition_size = None lv_uuid = None - type_indicator = None # File system location / identifier is_lvm = False - fs_location = getattr(path_spec, 'location', None) + location = self._GetLocation(path_spec) while path_spec.HasParent(): type_indicator = path_spec.type_indicator if type_indicator == dfvfs_definitions.TYPE_INDICATOR_APFS_CONTAINER: @@ -74,10 +93,6 @@ def _ProcessPartition(self, path_spec): if type_indicator in (dfvfs_definitions.TYPE_INDICATOR_GPT, dfvfs_definitions.TYPE_INDICATOR_LVM, dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION): - if fs_location in ('\\', '/'): - # Partition location / identifier - fs_location = getattr(path_spec, 'location', None) - partition_location = getattr(path_spec, 'location', None) # Partition index partition_index = getattr(path_spec, 'part_index', None) @@ -90,7 +105,7 @@ def _ProcessPartition(self, path_spec): volume_system = gpt_volume_system.GPTVolumeSystem() try: volume_system.Open(path_spec) - volume_identifier = partition_location.replace('/', '') + volume_identifier = location.replace('/', '') volume = volume_system.GetVolumeByIdentifier(volume_identifier) if is_lvm: @@ -108,7 +123,7 @@ def _ProcessPartition(self, path_spec): path_spec = path_spec.parent - status_report.append(fmt.heading5('{0!s}:'.format(fs_location))) + status_report.append(fmt.heading5('{0!s}:'.format(location))) status_report.append( fmt.bullet('Filesystem: {0!s}'.format(fs_path_spec.type_indicator))) if volume_index is not None: @@ -126,9 +141,8 @@ def _ProcessPartition(self, path_spec): # Not setting path_spec here as it will need to be generated for each task partition_evidence = DiskPartition( - partition_location=fs_location, partition_offset=partition_offset, - partition_size=partition_size, lv_uuid=lv_uuid, - type_indicator=type_indicator) + partition_location=location, partition_offset=partition_offset, + partition_size=partition_size, lv_uuid=lv_uuid) return partition_evidence, status_report