diff --git a/cve_bin_tool/parsers/__init__.py b/cve_bin_tool/parsers/__init__.py index 21b223a486..553e5f81cc 100644 --- a/cve_bin_tool/parsers/__init__.py +++ b/cve_bin_tool/parsers/__init__.py @@ -15,6 +15,7 @@ "swift", "php", "perl", + "rpm", ] diff --git a/cve_bin_tool/parsers/parse.py b/cve_bin_tool/parsers/parse.py index 032fdcf30a..14d3514527 100644 --- a/cve_bin_tool/parsers/parse.py +++ b/cve_bin_tool/parsers/parse.py @@ -8,6 +8,7 @@ from cve_bin_tool.parsers.php import PhpParser from cve_bin_tool.parsers.python import PythonParser, PythonRequirementsParser from cve_bin_tool.parsers.r import RParser +from cve_bin_tool.parsers.rpm import RpmParser from cve_bin_tool.parsers.ruby import RubyParser from cve_bin_tool.parsers.rust import RustParser from cve_bin_tool.parsers.swift import SwiftParser @@ -25,6 +26,7 @@ "Package.resolved": SwiftParser, "composer.lock": PhpParser, "cpanfile": PerlParser, + ".rpm:": RpmParser, } diff --git a/cve_bin_tool/parsers/rpm.py b/cve_bin_tool/parsers/rpm.py new file mode 100644 index 0000000000..941d1b3cce --- /dev/null +++ b/cve_bin_tool/parsers/rpm.py @@ -0,0 +1,230 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: GPL-3.0-or-later + +import io +from enum import IntEnum + +from cve_bin_tool.parsers import Parser + + +class RpmParser(Parser): + # more details about rpm structure can be found here: + # https://rpm-software-management.github.io/rpm/manual/format.html + class Type(IntEnum): + NULL = 0 + CHAR = 1 + INT8 = 2 + INT16 = 3 + INT32 = 4 + INT64 = 5 + STRING = 6 + BIN = 7 + STRING_ARRAY = 8 + I18NSTRING_TYPE = 9 + + class Tag(IntEnum): + RPMTAG_NAME = 1000 + RPMTAG_VERSION = 1001 + + TAGS_TO_PARSE = [Tag.RPMTAG_NAME, Tag.RPMTAG_VERSION] + + RPM_LEAD_MAGIC = b"\xed\xab\xee\xdb" + RPM_HEADER_MAGIC = b"\x8e\xad\xe8" + RPM_LEAD_LEN = 96 + RPM_LEAD_NAME_OFFSET = 10 + RPM_LEAD_NAME_LEN = 66 + RPM_HEADER_LEN = 16 + RPM_HEADER_INDEX_LEN = 16 + + def __init__(self, cve_db, logger, validate=True): + super().__init__(cve_db, logger) + self.validate = validate + + def validate_rpm(self, filename): + with open(filename, "rb") as rpm: + rpm_lead_magic = rpm.read(len(self.RPM_LEAD_MAGIC)) + if self.RPM_LEAD_MAGIC == rpm_lead_magic: + return True + return False + + def get_rpm_entry(self, rpm, rpm_size, base_offset, entry_type, offset, count): + if rpm_size < (base_offset + offset + count): + self.logger.error(f"{self.filename} - entry corrupted") + return None + rpm.seek(base_offset + offset) + data = b"" + rpm_entry = None + if entry_type == self.Type.STRING: + # string can only have count 1 + char = rpm.read(1) + while char != b"\x00": + data += char + char = rpm.read(1) + try: + rpm_entry = data.rstrip(b"\x00").decode("ascii") + except UnicodeError: + self.logger.error( + f"{self.filename} - {data} - invalid string in rpm with nonascii characters at offset 0x{base_offset+offset:X}" + ) + else: + # unsupported - if more info is needed feel free to add parsing here + # at the moment all the data that is extracted is string + pass + return rpm_entry + + def extract_info(self): + # File structure is as follows: + # Lead + # Signature + # Header + # Payload + + with open(self.filename, "rb") as rpm: + rpm.seek(0, io.SEEK_END) + rpm_size = rpm.tell() + rpm.seek(0) + + # Lead + rpm_lead = rpm.read(self.RPM_LEAD_LEN) + if len(rpm_lead) != self.RPM_LEAD_LEN: + # file corrupted + self.logger.error( + f"{self.filename} - file is too short, possibly corrupted" + ) + return None + name_bytes = rpm_lead[ + self.RPM_LEAD_NAME_OFFSET : self.RPM_LEAD_NAME_LEN + 1 + ] + try: + self.name = name_bytes.rstrip(b"\x00").decode("ascii") + except UnicodeError: + self.logger.error( + f"{self.filename} - invalid name in rpm with nonascii characters" + ) + return None + + self.logger.debug(f"{self.filename} - RPM Lead OK") + self.logger.debug(f"{self.filename} - {self.name}") + + # Signature / Header + # 3 bytes magic + # 1 byte version + # 4 bytes reserved + # 4 bytes number of index entries + # 4 bytes data size + # n i* 16 index entries + + # Signature and header have the same structure + header = rpm.read(self.RPM_HEADER_LEN) + if len(header) != self.RPM_HEADER_LEN: + self.logger.error( + f"{self.filename} - file is too short, possibly corrupted" + ) + return None + + if header[0:3] != self.RPM_HEADER_MAGIC: + self.logger.error(f"{self.filename} - corrupted RPM signature header") + return None + + entries = int.from_bytes(header[8:12], byteorder="big") + data_size = int.from_bytes(header[12:16], byteorder="big") + self.logger.debug(f"signature index entries: {entries}") + + # skip signature indexes and data + target_offset = rpm.tell() + ( + entries * self.RPM_HEADER_INDEX_LEN + data_size + ) + # Header is aligned to 8-byte boundary + if target_offset % 8: + target_offset = target_offset - (target_offset % 8) + 8 + + if target_offset > rpm_size: + self.logger.error(f"{self.filename} - corrupted RPM") + return None + + rpm.seek(target_offset) + + # Header + header = rpm.read(self.RPM_HEADER_LEN) + if len(header) != self.RPM_HEADER_LEN: + self.logger.error( + f"{self.filename} - file is too short, possibly corrupted" + ) + return None + + if header[0:3] != self.RPM_HEADER_MAGIC: + self.logger.error(f"{self.filename} - corrupted RPM header - {header}") + return None + + entries = int.from_bytes(header[8:12], byteorder="big") + data_size = int.from_bytes(header[12:16], byteorder="big") + self.logger.debug(f"header index entries: {entries}") + + header_entries_offset = rpm.tell() + target_offset = rpm.tell() + ( + entries * self.RPM_HEADER_INDEX_LEN + data_size + ) + # Header is aligned to 8-byte boundary + if target_offset % 8: + target_offset = target_offset - (target_offset % 8) + 8 + + if target_offset > rpm_size: + self.logger.error(f"{self.filename} - corrupted RPM") + return None + + # Index Entry + # 4 bytes Tag + # 4 bytes Type + # 4 bytes Offset + # 4 bytes Count + # Parse through index entries + data_offset = header_entries_offset + (entries * self.RPM_HEADER_INDEX_LEN) + rpm_info = {} + entries_tags = self.TAGS_TO_PARSE.copy() + for i in range(0, entries): + entry_raw = rpm.read(self.RPM_HEADER_INDEX_LEN) + entry_tag = int.from_bytes(entry_raw[0:4], byteorder="big") + entry_type = self.Type(int.from_bytes(entry_raw[4:8], byteorder="big")) + entry_offset = int.from_bytes(entry_raw[8:12], byteorder="big") + entry_count = int.from_bytes(entry_raw[12:16], byteorder="big") + + if entry_tag in entries_tags: + entries_tags.remove(entry_tag) + restore_offset = rpm.tell() + rpm_entry = self.get_rpm_entry( + rpm, + rpm_size, + data_offset, + entry_type, + entry_offset, + entry_count, + ) + rpm.seek(restore_offset) + self.logger.debug( + f"{entry_tag} - {entry_type} - {entry_offset} - {entry_count} - data: {rpm_entry}" + ) + rpm_info[entry_tag] = rpm_entry + if not entries_tags: + # we got all the info we need + break + + self.logger.debug(f"{rpm_info}") + return rpm_info + + def run_checker(self, filename): + """Process RPM file and extract product""" + self.filename = filename + continue_processing = True + if self.validate: + continue_processing = self.validate_rpm(self.filename) + self.logger.debug(f"Validation of {filename} - {continue_processing}") + if continue_processing: + rpm_info = self.extract_info() + if rpm_info: + product_info = self.find_vendor( + rpm_info.get(self.Tag.RPMTAG_NAME), + rpm_info.get(self.Tag.RPMTAG_VERSION), + ) + if product_info is not None: + yield from product_info + self.logger.debug(f"Done scanning file: {filename}")