-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(handler): Add support for EWF format #582
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
import io | ||
import math | ||
import zlib | ||
from enum import Enum | ||
from pathlib import Path | ||
|
||
from dissect.cstruct import Instance | ||
from structlog import get_logger | ||
|
||
from unblob.file_utils import ( | ||
Endian, | ||
File, | ||
FileSystem, | ||
InvalidInputFormat, | ||
StructParser, | ||
iterate_file, | ||
) | ||
from unblob.models import Extractor, HexString, StructHandler, ValidChunk | ||
|
||
logger = get_logger() | ||
|
||
C_DEFINITIONS = r""" | ||
typedef struct ewf_header { | ||
char magic[8]; | ||
uint8 field_start; | ||
uint16 segment_number; | ||
uint16 field_end; | ||
} ewf_header_t; | ||
|
||
typedef struct data_descriptor { | ||
char definition[16]; | ||
uint64 next_offset; | ||
uint64 section_size; | ||
char padding[40]; | ||
uint32 adler_32; | ||
} data_descriptor_t; | ||
|
||
typedef struct volume_descriptor { | ||
uint32 unknown; | ||
uint32 chunk_count; | ||
uint32 sectors_per_chunks; | ||
uint32 bytes_per_sectors; | ||
uint32 sectors_count; | ||
} volume_descriptor_t; | ||
|
||
typedef struct table_descriptor { | ||
uint32 number_of_entries; | ||
char padding[16]; | ||
uint32 adler_32; | ||
} table_descriptor_t; | ||
|
||
typedef struct table_entry { | ||
char offset[3]; | ||
char compression_type[1]; | ||
} table_entry_t; | ||
typedef struct hash_descriptor { | ||
char md5_hash[16]; | ||
char unknown[16]; | ||
uint32 adler_32; | ||
} hash_descriptor_t; | ||
""" | ||
|
||
EWF_HEADER_LEN = 13 | ||
DESCRIPTOR_LEN = 76 | ||
|
||
|
||
class Definition(Enum): | ||
DONE = b"done".ljust(16, b"\x00") | ||
TABLE = b"table".ljust(16, b"\x00") | ||
SECTORS = b"sectors".ljust(16, b"\x00") | ||
VOLUME = b"volume".ljust(16, b"\x00") | ||
|
||
|
||
class ZlibMagic(Enum): | ||
LOW = b"\x78\x01" | ||
DEFAULT = b"\x78\x9c" | ||
BEST = b"\x78\xda" | ||
COMPRESSION = b"\x78\x5e" | ||
|
||
|
||
def find_chunk_size(header: Instance) -> int: | ||
size = header.sectors_per_chunks | ||
log = math.log(size, 2) | ||
power = math.pow(2, log + 9) | ||
return int(power) | ||
|
||
|
||
def is_valid_header(header: Instance) -> bool: | ||
if header.field_start != 0x01 or header.field_end != 0x0: | ||
return False | ||
return True | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class EWFExtractor(Extractor): | ||
def __init__(self, header_struct: str): | ||
self.header_struct = header_struct | ||
self._struct_parser = StructParser(C_DEFINITIONS) | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def table_descriptor( | ||
self, file: File, position: int, outdir: Path, sectors_per_chunk: int | ||
) -> Instance: | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fs = FileSystem(outdir) | ||
entries = [] | ||
header = self._struct_parser.parse("table_descriptor_t", file, Endian.LITTLE) | ||
entry_path = Path("ewf.decrypted") | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for _ in range(header.number_of_entries): | ||
entry = self._struct_parser.parse("table_entry_t", file, Endian.LITTLE) | ||
entries.append(entry.offset) | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
with fs.open(entry_path) as output_file: | ||
for offset in entries: | ||
file.seek( | ||
position | ||
+ int.from_bytes(offset, byteorder="little") | ||
- DESCRIPTOR_LEN, | ||
io.SEEK_SET, | ||
) | ||
|
||
magic_bytes = file.read(2) | ||
compressed = any(magic_bytes == magic.value for magic in ZlibMagic) | ||
|
||
for chunk in iterate_file( | ||
file, | ||
position | ||
+ int.from_bytes(offset, byteorder="little") | ||
- DESCRIPTOR_LEN, | ||
sectors_per_chunk, | ||
): | ||
if compressed: | ||
compressed_chunk = zlib.decompress(chunk) | ||
output_file.write(compressed_chunk) | ||
output_file.write(chunk) | ||
|
||
def extract(self, inpath: Path, outdir: Path): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a logic / naming issue here. The extraction is performed in |
||
with File.from_path(inpath) as file: | ||
file.seek(EWF_HEADER_LEN) # we skip the initial header | ||
data_descriptor = self._struct_parser.parse( | ||
"data_descriptor_t", file, Endian.LITTLE | ||
) | ||
logger.debug("data_descriptor_t", header=data_descriptor, _verbosity=3) | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# the file is made of section, we loop over all the sections | ||
while data_descriptor.definition != Definition.DONE.value: | ||
file.seek(data_descriptor.next_offset, io.SEEK_SET) | ||
data_descriptor = self._struct_parser.parse( | ||
"data_descriptor_t", file, Endian.LITTLE | ||
) | ||
logger.debug("data_descriptor_t", header=data_descriptor, _verbosity=3) | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if data_descriptor.definition == Definition.VOLUME.value: | ||
volume_descriptor = self._struct_parser.parse( | ||
"volume_descriptor_t", file, Endian.LITTLE | ||
) | ||
sectors_per_chunk = find_chunk_size(volume_descriptor) | ||
|
||
if data_descriptor.definition == Definition.SECTORS.value: | ||
position = file.tell() | ||
|
||
if data_descriptor.definition == Definition.TABLE.value: | ||
self.table_descriptor(file, position, outdir, sectors_per_chunk) | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain how EWF is structured ? Can we observe VOLUME, SECTORS, and TABLE in random orders ? Do they follow a strict ordering ? Is it possible to have multiple VOLUMEs ? What about the others ? |
||
|
||
|
||
class EFWHandlerBase(StructHandler): | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
HEADER_STRUCT = "ewf_header_t" | ||
|
||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk: | ||
header = self.parse_header(file, endian=Endian.LITTLE) | ||
|
||
if not is_valid_header(header): | ||
raise InvalidInputFormat("Invalid EWF header") | ||
|
||
data_descriptor = self._struct_parser.parse( | ||
"data_descriptor_t", file, Endian.LITTLE | ||
) | ||
while data_descriptor.definition != Definition.DONE.value: | ||
file.seek(data_descriptor.next_offset, io.SEEK_SET) | ||
data_descriptor = self._struct_parser.parse( | ||
"data_descriptor_t", file, Endian.LITTLE | ||
) | ||
|
||
return ValidChunk(start_offset=start_offset, end_offset=file.tell()) | ||
|
||
|
||
class EWFEHandler(EFWHandlerBase): | ||
NAME = "ewfe" | ||
|
||
PATTERNS = [HexString("45 56 46 09 0d 0a ff 00")] | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
C_DEFINITIONS = C_DEFINITIONS | ||
HEADER_STRUCT = "ewf_header_t" | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
EXTRACTOR = EWFExtractor("ewf_header_t") | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class EWFLHandler(EFWHandlerBase): | ||
NAME = "ewfl" | ||
|
||
PATTERNS = [HexString("4C 56 46 09 0d 0a ff 00")] | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
C_DEFINITIONS = C_DEFINITIONS | ||
HEADER_STRUCT = "ewf_header_t" | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
||
EXTRACTOR = EWFExtractor("ewf_header_t") | ||
nyuware marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your integration tests are not well structured. You've placed them in
tests/integration/archive/ewf/ewf
but your handlers are namedewfl
andewfe
.You must have these two directories:
tests/integration/archive/ewf/ewfe
tests/integration/archive/ewf/ewfl
These directories must contain integration tests files for both cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created 2 sample for each fortmat, one in clear text and one zlib compressed