Skip to content

Commit

Permalink
tools: neuvector: improve formatting of findings with assets, support…
Browse files Browse the repository at this point in the history
… all existing types of assets
  • Loading branch information
pna-nca committed Oct 17, 2024
1 parent 6e27d10 commit dc79d96
Showing 1 changed file with 132 additions and 46 deletions.
178 changes: 132 additions & 46 deletions dojo/tools/neuvector/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,89 +122,175 @@ def get_item(vulnerability, test):
return finding


ASSET_FINDING_DESCRIPTION_TEMPLATE = """**Title:** {title}
**Details:**
{description}
**Feed rating:** {feed_rating}
**Published on**: {published_date}
**Reference**: {reference}
**Affected packages:**
{affected_packages}
**Affected systems:**
{affected_systems}
"""


def get_asset_item(vulnerability, test):
severity = (
convert_severity(vulnerability.get("severity"))
if "severity" in vulnerability
else "Info"
)

feed_rating = vulnerability.get("feed_rating", "")
vulnerability_id = vulnerability.get("name")

description = vulnerability.get("description", "").strip()
vuln_description = vulnerability.get("description", "").strip()

if len(feed_rating) > 0:
description += "<p>Rating from vendor: {rating}</p>".format(rating=feed_rating)
published_date = None
published_ts = vulnerability.get("published_timestamp", 0)
if published_ts > 0:
published_date = datetime.fromtimestamp(int(published_ts))

mitigation = ""
reference = vulnerability.get("link", "not provided")

package_names = []
affected_packages = ""

packages = vulnerability.get("packages", {})
if len(packages.values()) > 0:
mitigation += "<p>update the affected packages to the following versions:</p>"
description += "<p>The following packages are affected:</p>"

for package_name, package_versions in packages.items():
package_names.append(package_name.split('/')[0])

mitigation += "<p>{name}:</p>".format(name=package_name)
affected_packages += f"*{package_name}*\n"

description += "<p>{name}:</p>".format(name=package_name)
for versions in package_versions:
mitigation += "<p> {fixed}</p>".format(fixed=versions.get("fixed_version", "unknown"))

description += "<p> installed version: {installed}</p>".format(installed=versions.get("package_version", "unknown"))
description += "<p> fixed version: {fixed}</p>".format(fixed=versions.get("fixed_version", "unknown"))

link = vulnerability.get("link") if "link" in vulnerability else ""

vectors_v3 = vulnerability.get("vectors_v3", "")

score_v3 = vulnerability.get("score_v3", "")
installed=versions.get("package_version", "unknown")
fixed=versions.get("fixed_version", "unknown")
affected_packages += f" installed version: {installed}\n"
affected_packages += f" fixed version: {fixed}\n"

published_date = None
published_ts = vulnerability.get("published_timestamp", 0)
if published_ts > 0:
published_date = datetime.fromtimestamp(int(published_ts))

vulnerability_id = vulnerability.get("name")

# there is nothing like short description, short name or title
package_names_combined = ','.join(sorted(set(package_names), key=str))
if len(package_names_combined) > 32:
package_names_combined = package_names_combined[-32:]
affected_packages += "\n"

title = "{packages}: ({vuln})".format(packages=package_names_combined, vuln=vulnerability.get("name").upper())
nodes = vulnerability.get("nodes", [])
workloads = vulnerability.get("workloads", [])
images = vulnerability.get("images", [])
platforms = vulnerability.get("platforms", [])

# the same information is saved as Endpoint(s), however, DefectDojo
# Endpoint lacks many metadata fields, thus, difficult to read.
affected_systems = ""

for asset in nodes:
display_name = asset.get("display_name", "")
domains = ','.join(asset.get("domains", []))
affected_systems += f"*Node {display_name}*\n"
affected_systems += f" domains: {domains}\n"

for asset in platforms:
display_name = asset.get("display_name", "")
domains = ','.join(asset.get("domains", []))
affected_systems += f"*Platform {display_name}*\n"
affected_systems += f" domains: {domains}\n"

for asset in images:
display_name = asset.get("display_name", "")
domains = ','.join(asset.get("domains", []))
affected_systems += f"*Image {display_name}*\n"
affected_systems += f" domains: {domains}\n"

for asset in workloads:
display_name = asset.get("display_name", "")
domains = ','.join(asset.get("domains", []))
service = asset.get("service", "")
image = asset.get("image", "")
affected_systems += f"*Workload {display_name}*\n"
affected_systems += f" domains: {domains}\n"
affected_systems += f" service: {service}\n"
affected_systems += f" image: {image}\n"

description = ASSET_FINDING_DESCRIPTION_TEMPLATE.format(
title=vulnerability_id,
description=vuln_description,
feed_rating=vulnerability.get("feed_rating", "not provided"),
published_date=published_date,
reference=reference,
affected_packages=affected_packages,
affected_systems=affected_systems,
)

# create the finding object
finding = Finding(
title=title,
title=vulnerability_id,
test=test,
description=description,
severity=severity,
mitigation=mitigation,
impact="",
url=link,
cvssv3=vectors_v3,
cvssv3_score=score_v3,
references=reference,
cvssv3=vulnerability.get("vectors_v3", ""),
cvssv3_score=vulnerability.get("score_v3", ""),
publish_date=published_date,
)

finding.unsaved_vulnerability_ids = [vulnerability_id]

finding.unsaved_endpoints = []

nodes = vulnerability.get("nodes", [])
for node in nodes:
endpoint = Endpoint(
host=node.get("display_name", ""),
)
finding.unsaved_endpoints.append(endpoint)
for asset in nodes:
endpoints = endpoints_from_asset("node", asset)
finding.unsaved_endpoints += endpoints

for asset in workloads:
endpoints = endpoints_from_asset("workload", asset)
finding.unsaved_endpoints += endpoints

for asset in images:
endpoints = endpoints_from_asset("image", asset)
finding.unsaved_endpoints += endpoints

for asset in platforms:
endpoints = endpoints_from_asset("platform", asset)
finding.unsaved_endpoints += endpoints

return finding


def endpoints_from_asset(kind, asset):
endpoints = []

# usually, there is only one namespace (domain, as NeuVector name it)
namespaces = asset.get("domains", [])

name = asset.get("display_name", "")

if kind == "workload":
# only workload assets have 'service' field
service = asset.get("service", "unknown_service")
name += f"/{service}"

# in principle, naming follows the approach chosen for trivy parser
endpoints.append(Endpoint(
# host needs to comply with domain name syntax, we just expect that
# there will be only one namespace
host='-'.join(namespaces),
# we abuse path to have as much details as possible
path=f"{kind}/{name}",
))

# if it is a workload and it has an associated image, add image as a
# separate endpoint
if kind == "workload" and asset.get("image", "") != "":
image = asset.get("image", "unknown_image")
# image value example:
# someregistry.com/bitnami/postgresql:11.21.0-debian-11-r58
artifact_and_tag = image.split("/")[-1]
# extracting only image name, without tag or digest
artifact_name = artifact_and_tag.split("@")[0]
artifact_name = artifact_name.split(":")[0]

endpoints.append(Endpoint(
host=f"{artifact_name}",
path=f"{image}",
))

return endpoints

# see neuvector/share/types.go
def convert_severity(severity):
if severity.lower() == "critical":
Expand Down

0 comments on commit dc79d96

Please sign in to comment.