Skip to content

Commit

Permalink
Fix population of partition path_spec from location (#1028)
Browse files Browse the repository at this point in the history
* Fix population of partition path_spec from location

* Fix population of partition path_spec from location
  • Loading branch information
dfjxs authored Mar 25, 2022
1 parent e10c812 commit 4fcab02
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 30 deletions.
9 changes: 5 additions & 4 deletions turbinia/evidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,22 +578,20 @@ class DiskPartition(RawDisk):
partition_offset (int): Offset of the partition in bytes.
partition_size (int): Size of the partition in bytes.
path_spec (dfvfs.PathSpec): Partition path spec.
type_indicator (str): Partition type indicator from dfVFS.
"""

POSSIBLE_STATES = [EvidenceState.ATTACHED, EvidenceState.MOUNTED]

def __init__(
self, partition_location=None, partition_offset=None, partition_size=None,
lv_uuid=None, path_spec=None, type_indicator=None, *args, **kwargs):
lv_uuid=None, path_spec=None, *args, **kwargs):
"""Initialization for raw volume evidence object."""

self.partition_location = partition_location
self.partition_offset = partition_offset
self.partition_size = partition_size
self.lv_uuid = lv_uuid
self.path_spec = path_spec
self.type_indicator = type_indicator
super(DiskPartition, self).__init__(*args, **kwargs)

# This Evidence needs to have a parent
Expand All @@ -614,7 +612,10 @@ def _preprocess(self, _, required_states):
path_specs, self.partition_location)
if path_spec:
self.path_spec = path_spec
self.type_indicator = path_spec.type_indicator
else:
raise TurbiniaException(
'Could not find path_spec for location {0:s}'.format(
self.partition_location))

# In attaching a partition, we create a new loopback device using the
# partition offset and size.
Expand Down
7 changes: 4 additions & 3 deletions turbinia/processors/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ def GetPathSpecByLocation(path_specs, location):
for path_spec in path_specs:
child_path_spec = path_spec
fs_location = getattr(path_spec, 'location', None)
if fs_location and fs_location == location:
return child_path_spec
while path_spec.HasParent():
type_indicator = path_spec.type_indicator
if type_indicator in dfvfs_definitions.VOLUME_SYSTEM_TYPE_INDICATORS:
if fs_location in ('\\', '/'):
fs_location = getattr(path_spec, 'location', None)
fs_location = getattr(path_spec, 'location', None)
break
path_spec = path_spec.parent
if fs_location == location:
return child_path_spec
log.warn('Could not find path_spec for location {0:s}'.format(location))
log.error('Could not find path_spec for location {0:s}'.format(location))
return None
15 changes: 8 additions & 7 deletions turbinia/workers/fsstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ def run(self, evidence, result):
"""
fsstat_output = os.path.join(self.output_dir, 'fsstat.txt')

# Since fsstat does not support XFS, we won't run it when we know the
# partition is XFS.

# Note: an evidence object that was serialized will not have a path_spec
# because it is removed prior to JSON serialization in evidence.py:243
if evidence.type_indicator == "XFS":
message = 'Not running fsstat since partition is XFS'
if evidence.path_spec is None:
message = 'Could not run fsstat since partition does not have a path_spec'
result.log(message)
result.close(self, success=False, status=message)
# Since fsstat does not support some filesystems, we won't run it when we
# know the partition is not supported.
elif evidence.path_spec.type_indicator in ("APFS", "XFS"):
message = 'Not running fsstat since partition is not supported'
result.log(message)
result.close(self, success=True, status=message)
else:
Expand Down
6 changes: 3 additions & 3 deletions turbinia/workers/fsstat_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def setUp(self):
def testFsstatRun(self, mock_evidence):
"""Test fsstat task run."""
self.task.execute = mock.MagicMock(return_value=0)
mock_evidence.type_indicator = 'EXT'
mock_evidence.path_spec.type_indicator = 'EXT'
result = self.task.run(mock_evidence, self.result)

# Ensure execute method is being called.
Expand All @@ -49,7 +49,7 @@ def testFsstatRun(self, mock_evidence):

# Test for XFS
self.task.execute.reset_mock()
mock_evidence.type_indicator = 'XFS'
mock_evidence.path_spec.type_indicator = 'XFS'
result = self.task.run(mock_evidence, self.result)

# Ensure execute method is not being called.
Expand All @@ -65,7 +65,7 @@ def testFsstatRunNoPathSpec(self, mock_evidence):
result = self.task.run(mock_evidence, self.result)

# Ensure execute method is not being called.
self.task.execute.assert_called_once()
self.task.execute.assert_not_called()
# Ensure run method returns a TurbiniaTaskResult instance.
self.assertIsInstance(result, TurbiniaTaskResult)

Expand Down
40 changes: 27 additions & 13 deletions turbinia/workers/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ class PartitionEnumerationTask(TurbiniaTask):

REQUIRED_STATES = [EvidenceState.ATTACHED]

def _GetLocation(self, path_spec):
"""Retrieve the best location for a partition.
Args:
path_spec (dfvfs.PathSpec): dfVFS path spec.
Returns:
The location attribute for the partition.
"""
location = getattr(path_spec, 'location', None)
if location and location not in ('\\', '/'):
return location
while path_spec.HasParent():
path_spec = path_spec.parent
new_location = getattr(path_spec, 'location', None)
if new_location and new_location not in ('\\', '/'):
type_indicator = path_spec.type_indicator
if type_indicator in dfvfs_definitions.VOLUME_SYSTEM_TYPE_INDICATORS:
return new_location
return location

def _ProcessPartition(self, path_spec):
"""Generate RawDiskPartition from a PathSpec.
Expand All @@ -53,18 +74,16 @@ def _ProcessPartition(self, path_spec):
status_report = []

fs_path_spec = path_spec
fs_location = None
partition_location = None
location = None
volume_index = None
partition_index = None
partition_offset = None
partition_size = None
lv_uuid = None
type_indicator = None

# File system location / identifier
is_lvm = False
fs_location = getattr(path_spec, 'location', None)
location = self._GetLocation(path_spec)
while path_spec.HasParent():
type_indicator = path_spec.type_indicator
if type_indicator == dfvfs_definitions.TYPE_INDICATOR_APFS_CONTAINER:
Expand All @@ -74,10 +93,6 @@ def _ProcessPartition(self, path_spec):
if type_indicator in (dfvfs_definitions.TYPE_INDICATOR_GPT,
dfvfs_definitions.TYPE_INDICATOR_LVM,
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION):
if fs_location in ('\\', '/'):
# Partition location / identifier
fs_location = getattr(path_spec, 'location', None)
partition_location = getattr(path_spec, 'location', None)
# Partition index
partition_index = getattr(path_spec, 'part_index', None)

Expand All @@ -90,7 +105,7 @@ def _ProcessPartition(self, path_spec):
volume_system = gpt_volume_system.GPTVolumeSystem()
try:
volume_system.Open(path_spec)
volume_identifier = partition_location.replace('/', '')
volume_identifier = location.replace('/', '')
volume = volume_system.GetVolumeByIdentifier(volume_identifier)

if is_lvm:
Expand All @@ -108,7 +123,7 @@ def _ProcessPartition(self, path_spec):

path_spec = path_spec.parent

status_report.append(fmt.heading5('{0!s}:'.format(fs_location)))
status_report.append(fmt.heading5('{0!s}:'.format(location)))
status_report.append(
fmt.bullet('Filesystem: {0!s}'.format(fs_path_spec.type_indicator)))
if volume_index is not None:
Expand All @@ -126,9 +141,8 @@ def _ProcessPartition(self, path_spec):

# Not setting path_spec here as it will need to be generated for each task
partition_evidence = DiskPartition(
partition_location=fs_location, partition_offset=partition_offset,
partition_size=partition_size, lv_uuid=lv_uuid,
type_indicator=type_indicator)
partition_location=location, partition_offset=partition_offset,
partition_size=partition_size, lv_uuid=lv_uuid)

return partition_evidence, status_report

Expand Down

0 comments on commit 4fcab02

Please sign in to comment.