Skip to content

Commit

Permalink
feat: intial improved triage process
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersans committed Jul 22, 2024
1 parent ed05458 commit bb8dfda
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 179 deletions.
56 changes: 44 additions & 12 deletions cve_bin_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from cve_bin_tool.util import ProductInfo
from cve_bin_tool.version import VERSION
from cve_bin_tool.version_scanner import VersionScanner
from cve_bin_tool.vex_manager.parse import VEXParse

sys.excepthook = excepthook # Always install excepthook for entrypoint module.

Expand Down Expand Up @@ -380,6 +381,12 @@ def main(argv=None):
default="",
help="Vendor/Supplier of Product",
)
output_group.add_argument(
"--filter-triage",
action="store",
default=True,
help="Filter cves based on triage data from Vex file",
)
parser.add_argument(
"-e",
"--exclude",
Expand Down Expand Up @@ -1021,18 +1028,6 @@ def main(argv=None):
LOGGER.debug(f"{product_info}, {triage_data}")
cve_scanner.get_cves(product_info, triage_data)

if args["triage_input_file"]:
input_engine = InputEngine(
args["triage_input_file"],
logger=LOGGER,
error_mode=error_mode,
filetype="vex",
)
parsed_data = input_engine.parse_input()
for product_info, triage_data in parsed_data.items():
LOGGER.debug(f"{product_info}, {triage_data}")
cve_scanner.get_cves(product_info, triage_data)

if args["input_file"]:
input_engine = InputEngine(
args["input_file"], logger=LOGGER, error_mode=error_mode
Expand Down Expand Up @@ -1092,6 +1087,40 @@ def main(argv=None):
LOGGER.debug(f"{product_info}, {triage_data}")
cve_scanner.get_cves(product_info, triage_data)

if args["vex_file"]:
# for now use cyclonedx as auto detection is not implemented in latest pypi package of lib4vex
vexdata = VEXParse(
filename=args["vex_file"],
vextype="cyclonedx",
logger=LOGGER,
)
parsed_vex_data = vexdata.parse_vex()
if parsed_data.is_empty():
# assume the vex file being scanned is a standalone file
args["filter_triage"] = False
parsed_data = parsed_vex_data
for product_info, triage_data in parsed_data.items():
LOGGER.debug(f"{product_info}, {triage_data}")
cve_scanner.get_cves(product_info, triage_data)
else:
LOGGER.info(
f"VEX file {args['vex_file']} is not a standalone file and will be used as a triage file"
)
# need to do validation on the sbom part
# need to implement is_linked() function which will check the linkage.
if args["sbom_file"]:
LOGGER.warning(
f"SBOM file: {args['sbom_file']} is not linked to VEX file: {args['vex_file']}."
)
for product_info, triage_data in parsed_vex_data.items():
LOGGER.debug(f"{product_info}, {triage_data}")
if product_info in parsed_data:
cve_scanner.get_cves(product_info, triage_data)
else:
LOGGER.info(
f"Product: {product_info.product} with Version: {product_info.version} not found in Parsed Data, is valid vex file being used?"
)

LOGGER.info("Overall CVE summary: ")
LOGGER.info(
f"There are {cve_scanner.products_with_cve} products with known CVEs detected"
Expand All @@ -1118,6 +1147,9 @@ def main(argv=None):
"Please provide --product, --release and --vendor for VEX generation"
)
return ERROR_CODES[InsufficientArgs]

if args["vex_file"] and args["filter_triage"]:
cve_scanner.filter_triage_data()
# Creates an Object for OutputEngine
output = OutputEngine(
all_cve_data=cve_scanner.all_cve_data,
Expand Down
36 changes: 36 additions & 0 deletions cve_bin_tool/cve_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,42 @@ def get_cves(self, product_info: ProductInfo, triage_data: TriageData):
if product_info not in self.all_product_data:
self.all_product_data[product_info] = len(cves)

def filter_triage_data(self):
"""
Filter out triage data that is not relevant to the CVEs found,
specifically those marked as NotAffected or FalsePositives.
"""
to_delete: List[ProductInfo] = []

for product_info, cve_data in self.all_cve_data.items():
original_cves = cve_data["cves"]
filtered_cves = [
cve
for cve in original_cves
if cve.remarks not in {Remarks.NotAffected, Remarks.FalsePositive}
]

filtered_out_cves = set(original_cves) - set(filtered_cves)
for cve in filtered_out_cves:
self.logger.info(
f"Filtered CVE: {cve.cve_number} for Product: {product_info.product}"
)

if filtered_cves:
cve_data["cves"] = filtered_cves
else:
to_delete.append(product_info)

self.logger.debug(
f"Filtered triage data for {product_info.product}: {[cve.cve_number for cve in filtered_cves]}"
)

for product_info in to_delete:
del self.all_cve_data[product_info]
self.logger.debug(
f"Removed product info for {product_info.product} due to no relevant CVEs"
)

def affected(self):
"""Returns list of vendor.product and version tuples identified from
scan"""
Expand Down
162 changes: 2 additions & 160 deletions cve_bin_tool/input_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import csv
import json
import re
from collections import defaultdict
from logging import Logger
from pathlib import Path
Expand All @@ -35,7 +34,7 @@ class InputEngine:
"""
Class: InputEngine
This class is responsible for parsing various input file formats (CSV, VEX, JSON) in the CVE Bin Tool.
This class is responsible for parsing various input file formats (CSV, JSON) in the CVE Bin Tool.
Attributes:
- parsed_data (DefaultDict[ProductInfo, TriageData]): Dictionary containing parsed input data.
Expand All @@ -45,20 +44,14 @@ class InputEngine:
Initializes the InputEngine with the specified filename, logger, error mode, and filetype.
- parse_input(self) -> DefaultDict[ProductInfo, TriageData]:
Parses the input file based on its type (CSV, VEX, JSON) and returns the parsed data.
Parses the input file based on its type (CSV, JSON) and returns the parsed data.
- input_csv(self) -> None:
Parses input data from a CSV file.
- input_json(self) -> None:
Parses input data from a JSON file.
- input_vex(self) -> None:
Parses input data from a CycloneDX VEX file.
- validate_product(self, product: str) -> bool:
Validates if a product name conforms to the CPE 2.3 standard.
- parse_data(self, fields: Set[str], data: Iterable) -> None:
Parses common data structure for CSV and JSON input formats.
Expand Down Expand Up @@ -106,8 +99,6 @@ def parse_input(self) -> DefaultDict[ProductInfo, TriageData]:
raise FileNotFoundError(self.filename)
if self.filename.endswith(".csv"):
self.input_csv()
elif self.filename.endswith(".vex") or self.filetype == "vex":
self.input_vex()
elif self.filename.endswith(".json"):
self.input_json()
return self.parsed_data
Expand Down Expand Up @@ -144,155 +135,6 @@ def input_json(self) -> None:

self.parse_data(set(json_data[0].keys()), json_data)

def validate_product(self, product: str) -> bool:
"""
Validates if a product name conforms to the CPE 2.3 standard.
Args:
- product (str): Product name.
Returns:
- bool: True if the product name is valid, False otherwise.
"""
"""
Ensure product name conforms to CPE 2.3 standard.
See https://csrc.nist.gov/schema/cpe/2.3/cpe-naming_2.3.xsd for naming specification
"""
cpe_regex = r"\A([A-Za-z0-9\._\-~ %])+\Z"
return re.search(cpe_regex, product) is not None

def input_vex(self) -> None:
"""
Parses input data from a VEX file.
"""
with open(self.filename) as json_file:
json_data = json.load(json_file)

# Only handle CycloneDX VEX file format
if json_data["bomFormat"] == "CycloneDX":
self.input_vex_cyclone_dx(json_data)

def input_vex_cyclone_dx(self, json_data):
"""
Parses input data from a CycloneDX VEX file.
"""

def strip_remark(detail) -> str:
detail = re.sub("^" + Remarks.NewFound.name + "(: )?", "", detail)
detail = re.sub("^" + Remarks.Unexplored.name + "(: )?", "", detail)
detail = re.sub("^" + Remarks.Confirmed.name + "(: )?", "", detail)
detail = re.sub("^" + Remarks.Mitigated.name + "(: )?", "", detail)
detail = re.sub("^" + Remarks.FalsePositive.name + "(: )?", "", detail)
detail = re.sub("^" + Remarks.NotAffected.name + "(: )?", "", detail)
return detail

# Map CycloneDX v1.4 anaylsis state to the Remarks enumeration.
remarks_lookup = {
"resolved": Remarks.Mitigated,
"resolved_with_pedigree": Remarks.Mitigated,
"exploitable": Remarks.Confirmed,
"in_triage": Remarks.Unexplored,
"false_positive": Remarks.FalsePositive,
"not_affected": Remarks.NotAffected,
}

# Not all data from the BOM needs to be read because it will be updated from the
# CVE DB. The analysis fields may have been updated in the VEX and should be
# read.
for vulnerability in json_data["vulnerabilities"]:
id = vulnerability["id"]
analysis_state = vulnerability["analysis"]["state"].lower()
remarks = Remarks.Unexplored
if analysis_state in remarks_lookup:
remarks = remarks_lookup[analysis_state]
justification = vulnerability["analysis"].get("justification", None)
response = vulnerability["analysis"].get("response", None)
comments = strip_remark(vulnerability["analysis"]["detail"])
severity = None
if "ratings" in vulnerability:
for rating in vulnerability["ratings"]:
severity = rating["severity"].upper()
for affect in vulnerability["affects"]:
product_info = self.decode_bom_ref(affect["ref"])

if product_info is not None:
self.parsed_data[product_info][id.strip() or "default"] = {
"remarks": remarks,
"comments": comments.strip(),
"response": response,
}
if justification:
self.parsed_data[product_info][id.strip() or "default"][
"justification"
] = justification.strip()
if severity:
self.parsed_data[product_info][id.strip() or "default"][
"severity"
] = severity.strip()
self.parsed_data[product_info]["paths"] = {}

def decode_bom_ref(self, ref) -> ProductInfo:
"""
Decodes the BOM reference for each component.
Args:
- ref (str): BOM reference string
Returns:
- bool: ProductInfo object containing the vendor, product, and version.
"""
# urn:cbt:{bom_version}/{vendor}#{product}-{version}
urn_cbt_ref = re.compile(
r"urn:cbt:(?P<bom_version>.*?)\/(?P<vendor>.*?)#(?P<product>.*?)-(?P<version>.*)"
)

# This URN was added to support CPE's that have dashes in their version field.
# urn:cbt:{bom_version}/{vendor}#{product}:{version}
urn_cbt_ext_ref = re.compile(
r"urn:cbt:(?P<bom_version>.*?)\/(?P<vendor>.*?)#(?P<product>.*?):(?P<version>.*)"
)

# urn:cdx:serialNumber/version#bom-ref (https://cyclonedx.org/capabilities/bomlink/)
urn_cdx = re.compile(
r"urn:cdx:(?P<bomSerialNumber>.*?)\/(?P<bom_version>.*?)#(?P<bom_ref>.*)"
)
location = "location/to/product"
if urn_cbt_ext_ref.match(ref):
urn_dict = urn_cbt_ext_ref.match(ref).groupdict()
vendor = urn_dict["vendor"]
product = urn_dict["product"]
version = urn_dict["version"]
elif urn_cbt_ref.match(ref):
urn_dict = urn_cbt_ref.match(ref).groupdict()
vendor = urn_dict["vendor"]
product = urn_dict["product"]
version = urn_dict["version"]
elif urn_cdx.match(ref):
urn_dict = urn_cdx.match(ref).groupdict()
cdx_bom_ref = urn_dict["bom_ref"]
# Try to decode the CDX BOM reference. This can be any unique identifier but may contain
# product:version
# or it could be a Package URL.
try:
product, version = cdx_bom_ref.rsplit("-", 1)
except ValueError:
product, version = None, None
vendor = "UNKNOWN"
else:
product = None
version = None
vendor = None

product_info = None
if product is not None and self.validate_product(product):
product_info = ProductInfo(
vendor.strip(), product.strip(), version.strip(), location
)

return product_info

def parse_data(self, fields: Set[str], data: Iterable) -> None:
"""
Parses common data structure for CSV and JSON input formats.
Expand Down
15 changes: 8 additions & 7 deletions cve_bin_tool/vex_manager/generate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later
import os
from datetime import datetime
from logging import Logger
from pathlib import Path
from typing import Dict, List, Optional
Expand Down Expand Up @@ -75,12 +74,14 @@ def generate_vex(self) -> None:
if self.sbom:
kwargs["sbom"] = self.sbom
vexgen.set_product(**kwargs)
if Path(self.filename).is_file():
self.logger.warning(
f"Failed to write '{self.filename}'. File already exists"
if not self.filename:
self.logger.info(
"No filename defined, Generating a new filename with Default Naming Convention"
)
self.logger.info("Generating a new filename with Default Naming Convention")
self.filename = self.generate_vex_filename()
if Path(self.filename).is_file():
self.logger.info(f"Updating the vex file: {self.filename}")

vexgen.generate(
project_name=self.product,
vex_data=self.get_vulnerabilities(),
Expand All @@ -95,10 +96,10 @@ def generate_vex_filename(self) -> str:
Returns:
str: The generated VEX filename.
"""
now = datetime.now().strftime("%Y-%m-%d.%H-%M-%S")
filename = os.path.abspath(
os.path.join(
os.getcwd(), f"{self.product}_{self.release}_{self.vextype}.{now}.json"
os.getcwd(),
f"{self.product}_{self.release}_{self.vendor}_{self.vextype}.json",
)
)
return filename
Expand Down

0 comments on commit bb8dfda

Please sign in to comment.