Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
title: "OpenVAS Parser V2"
toc_hide: true
---
This is version 2 of the OpenVAS / Greenbone parser.
You can upload your scanns in eighter csv or xml format. For the parser to recognize the difference they have to end with .csv or .xml.

### V2 Changes
Version 2 comes with multiple improvments TODO:
- Using using unique_id_from_tool for deduplication
- Increased parsing Consistensy between the xml and csv parser
- Combined findings where the only differences are in fields that can’t be rehashed due to inconsistent values between scans e.g fields with timestamps or packet ids.
- Parser now combines multiple identical findings with different endpoints into one findings with multiple endpoints (instead of multiple findings with one endpoint each)

### Sample Scan Data
Sample OpenVAS scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/openvas).
3 changes: 3 additions & 0 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,7 @@ def saml2_attrib_map_format(din):
"Qualys Hacker Guardian Scan": ["title", "severity", "description"],
"Cyberwatch scan (Galeax)": ["title", "description", "severity"],
"Cycognito Scan": ["title", "severity"],
"OpenVAS Parser v2": ["title", "unique_id_from_tool", "vuln_id_from_tool"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unique_id_from_tool should not be in here, you should set DEDUPLICATION_ALGORITHM_PER_PARSER to DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE for this v2 parser.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my reply to your comment.

}

# Override the hardcoded settings here via the env var
Expand Down Expand Up @@ -1419,6 +1420,7 @@ def saml2_attrib_map_format(din):
"HCL AppScan on Cloud SAST XML": True,
"AWS Inspector2 Scan": True,
"Cyberwatch scan (Galeax)": True,
"OpenVAS Parser v2": True,
}

# List of fields that are known to be usable in hash_code computation)
Expand Down Expand Up @@ -1605,6 +1607,7 @@ def saml2_attrib_map_format(din):
"Red Hat Satellite": DEDUPE_ALGO_HASH_CODE,
"Qualys Hacker Guardian Scan": DEDUPE_ALGO_HASH_CODE,
"Cyberwatch scan (Galeax)": DEDUPE_ALGO_HASH_CODE,
"OpenVAS Parser v2": DEDUPE_ALGO_HASH_CODE,
}

# Override the hardcoded settings here via the env var
Expand Down
1 change: 1 addition & 0 deletions dojo/tools/openvas_v2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__author__ = "manuel-sommer"
97 changes: 97 additions & 0 deletions dojo/tools/openvas_v2/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import hashlib
from dataclasses import dataclass

from dojo.models import Finding


@dataclass
class OpenVASFindingAuxData:

"""Dataclass to contain all information added later to fields"""

summary: str = ""
qod: str = ""
openvas_result: str = ""


def is_valid_severity(severity):
valid_severity = ("Info", "Low", "Medium", "High", "Critical")
return severity in valid_severity


def cleanup_openvas_text(text: str):
return text.replace("\n ", " ")


def update_finding(finding: Finding, aux_info: OpenVASFindingAuxData):
"""Update finding description"""
if aux_info.openvas_result:
finding.steps_to_reproduce = aux_info.openvas_result
if aux_info.summary:
finding.description += f"\n**Summary**: {cleanup_openvas_text(aux_info.summary)}"
if aux_info.qod:
finding.description += f"\n**QoD**: {aux_info.qod}"


def deduplicate(dupes: dict[str, Finding], finding: Finding):
"""Combine multiple openvas findings into one defectdojo finding with multiple endpoints"""
finding_hash = dedup_finding_hash(finding)
# deliberately missuse unique_id_from_tool to save some original values
finding.unique_id_from_tool = id_from_tool_finding_hash(finding)

if finding_hash not in dupes:
dupes[finding_hash] = finding
else:
# OpenVas does not combine multiple findings into one
# e.g if 2 vulnerable java runtimes are present on the host this is reported as 2 finding.
# The only way do differantiate theese findings when they are based on the same vulnerabilty
# is the data in mapped to steps to reproduce.
# However we cannot hash this field as it can contain data that changes between scans
# e.g timestamps or packet ids
# we therfore combine them into one defectdojo finding because duplicates during reimport cause
# https://github.com/DefectDojo/django-DefectDojo/issues/3958
org = dupes[finding_hash]
if org.steps_to_reproduce != finding.steps_to_reproduce:
if "Endpoint" in org.steps_to_reproduce:
org.steps_to_reproduce += "\n---------------------------------------\n"
org.steps_to_reproduce += f"**Endpoint**: {finding.unsaved_endpoints[0].host}\n"
org.steps_to_reproduce += finding.steps_to_reproduce
else:
tmp = org.steps_to_reproduce
org.steps_to_reproduce = f"**Endpoint**: {org.unsaved_endpoints[0].host}\n"
org.steps_to_reproduce += tmp

# combine identical findings on different hosts into one with multiple hosts
endpoint = finding.unsaved_endpoints[0]
if endpoint not in org.unsaved_endpoints:
org.unsaved_endpoints += finding.unsaved_endpoints


def id_from_tool_finding_hash(finding: Finding):
"""Generate a hash that complements final hash generating outside of this parser"""
endpoint = finding.unsaved_endpoints[0]
hash_data = [
str(endpoint.protocol),
str(endpoint.userinfo),
str(endpoint.port), # keep findings on different port seperate as it may be different applications
str(endpoint.path),
str(endpoint.fragment),
finding.severity, # allows changing severity of finding after import
]
return hashlib.sha256("|".join(hash_data).encode("utf-8")).hexdigest()


def dedup_finding_hash(finding: Finding):
"""Generate a hash for a finding that is used for deduplication of findings inside the current report"""
endpoint = finding.unsaved_endpoints[0]
hash_data = [
str(endpoint.protocol),
str(endpoint.userinfo),
str(endpoint.port),
str(endpoint.path),
str(endpoint.fragment),
finding.title,
finding.vuln_id_from_tool,
finding.severity,
]
return hashlib.sha256("|".join(hash_data).encode("utf-8")).hexdigest()
113 changes: 113 additions & 0 deletions dojo/tools/openvas_v2/csv_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import csv
import io

from dateutil.parser import parse

from dojo.models import Endpoint, Finding
from dojo.tools.openvas_v2.common import (
OpenVASFindingAuxData,
cleanup_openvas_text,
deduplicate,
is_valid_severity,
update_finding,
)


def evaluate_bool_value(column_value):
value = column_value.lower()
if value == "true":
return True
if value == "false":
return False
return None


class OpenVASCSVParserV2:
def get_findings(self, filename, test):
dupes = {}
content = filename.read()
if isinstance(content, bytes):
content = content.decode("utf-8")

csv_reader = csv.reader(io.StringIO(content), delimiter=",", quotechar='"')
column_names = [column_name.lower() for column_name in next(csv_reader) if column_name]

if "nvt name" not in column_names:
msg = "Invalid OpenVAS csv file"
raise ValueError(msg)

for row in csv_reader:
finding = Finding(test=test, dynamic_finding=True, static_finding=False, severity="Info")
finding.unsaved_vulnerability_ids = []
finding.unsaved_endpoints = [Endpoint()]
aux_info = OpenVASFindingAuxData()

for value, name in zip(row, column_names, strict=False):
self.process_column_element(value, name, finding, aux_info)

update_finding(finding, aux_info)
deduplicate(dupes, finding)

return list(dupes.values())

def process_column_element(
self,
column_value: str,
column_name: str,
finding: Finding,
aux_info: OpenVASFindingAuxData,
):
# skip columns with empty values
if not column_value:
return

# process column names
if column_name == "nvt name":
finding.title = column_value
elif column_name == "cweid":
if column_value.isdigit():
finding.cwe = int(column_value)
elif column_name == "cves":
for cve in column_value.split(","):
finding.unsaved_vulnerability_ids.append(cve)
elif column_name == "nvt oid":
finding.vuln_id_from_tool = column_value
elif column_name == "hostname":
# strip due to https://github.com/greenbone/gvmd/issues/2378
finding.unsaved_endpoints[0].host = column_value.strip()
elif column_name == "ip":
# fallback to ip if hostname is not aviable
if not finding.unsaved_endpoints[0].host:
# strip due to https://github.com/greenbone/gvmd/issues/2378
finding.unsaved_endpoints[0].host = column_value.strip()
elif column_name == "port":
if column_value.isdigit():
finding.unsaved_endpoints[0].port = int(column_value)
elif column_name == "port protocol":
finding.unsaved_endpoints[0].protocol = column_value
elif column_name == "severity":
if is_valid_severity(column_value):
finding.severity = column_value
elif column_name == "cvss":
finding.cvssv3_score = float(column_value)
elif column_name == "summary":
aux_info.summary = column_value
elif column_name == "solution":
finding.mitigation = cleanup_openvas_text(column_value)
elif column_name == "vulnerability insight":
finding.impact = cleanup_openvas_text(column_value)
elif column_name == "specific result":
aux_info.openvas_result = column_value
elif column_name == "qod":
aux_info.qod = column_value
# columns not part of default openvas csv export
elif column_name == "active":
finding.active = evaluate_bool_value(column_value)
elif column_name == "verified":
finding.verified = evaluate_bool_value(column_value)
elif column_name == "falsepositive":
finding.false_p = evaluate_bool_value(column_value)
elif column_name == "duplicate":
finding.duplicate = evaluate_bool_value(column_value)
elif column_name == "timestamp":
finding.date = parse(column_value).date()
20 changes: 20 additions & 0 deletions dojo/tools/openvas_v2/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dojo.tools.openvas_v2.csv_parser import OpenVASCSVParserV2
from dojo.tools.openvas_v2.xml_parser import OpenVASXMLParserV2


class OpenVASV2Parser:
def get_scan_types(self):
return ["OpenVAS Parser v2"]

def get_label_for_scan_types(self, scan_type):
return scan_type

def get_description_for_scan_types(self, scan_type):
return "Import CSV or XML output of Greenbone OpenVAS report."

def get_findings(self, filename, test):
if str(filename.name).endswith(".csv"):
return OpenVASCSVParserV2().get_findings(filename, test)
if str(filename.name).endswith(".xml"):
return OpenVASXMLParserV2().get_findings(filename, test)
return None
117 changes: 117 additions & 0 deletions dojo/tools/openvas_v2/xml_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import contextlib
from xml.dom import NamespaceErr

from defusedxml import ElementTree

from dojo.models import Endpoint, Finding
from dojo.tools.openvas_v2.common import (
OpenVASFindingAuxData,
cleanup_openvas_text,
deduplicate,
is_valid_severity,
update_finding,
)


class OpenVASXMLParserV2:
def get_findings(self, filename, test):
dupes = {}
tree = ElementTree.parse(filename)
root = tree.getroot()

if "report" not in root.tag:
msg = "This doesn't seem to be a valid Greenbone/ OpenVAS XML file."
raise NamespaceErr(msg)

report = root.find("report")
results = report.find("results")

for result in results:
finding = Finding(
test=test,
dynamic_finding=True,
static_finding=False,
severity="Info",
)
aux_info = OpenVASFindingAuxData()

finding.unsaved_vulnerability_ids = []
finding.unsaved_endpoints = [Endpoint()]

for field in result:
self.process_field_element(field, finding, aux_info)

update_finding(finding, aux_info)
deduplicate(dupes, finding)

return list(dupes.values())

def parse_nvt_tags(self, text):
parts = text.strip().split("|")
tags = {}

for part in parts:
idx = part.find("=")
if idx == -1 or (len(part) < idx + 2):
continue

key = part[0:idx]
val = part[idx + 1 :]
tags[key] = val
return tags

def process_field_element(self, field, finding: Finding, aux_info: OpenVASFindingAuxData):
if field.tag == "nvt":
# parse general field
finding.vuln_id_from_tool = field.get("oid")
nvt_name = field.find("name").text
if nvt_name:
finding.title = nvt_name

# parse tags field
tag_field = field.find("tags")
tags = self.parse_nvt_tags(tag_field.text)
summary = tags.get("summary", None)
if summary:
aux_info.summary = summary

impact = tags.get("impact", None)
if impact:
finding.impact = cleanup_openvas_text(impact)

# parse cves
refs_node = field.find("refs")
if refs_node is not None:
refs = refs_node.findall(".//ref[@type='cve']")
finding.unsaved_vulnerability_ids = [ref.get("id") for ref in refs]
elif field.tag == "qod":
aux_info.qod = field.find("value").text

if not field.text:
return

if field.tag == "name":
finding.title = field.text
elif field.tag == "host":
hostname_field = field.find("hostname")
# default to hostname else ip
if hostname_field is not None and hostname_field.text:
# strip due to https://github.com/greenbone/gvmd/issues/2378
finding.unsaved_endpoints[0].host = hostname_field.text.strip()
else:
# strip due to https://github.com/greenbone/gvmd/issues/2378
finding.unsaved_endpoints[0].host = field.text.strip()
elif field.tag == "port":
port_str, protocol = field.text.split("/")
finding.unsaved_endpoints[0].protocol = protocol
with contextlib.suppress(ValueError):
finding.unsaved_endpoints[0].port = int(port_str)
elif field.tag == "severity":
finding.cvssv3_score = float(field.text)
elif field.tag == "threat":
if is_valid_severity(field.text):
finding.severity = field.text
elif field.tag == "description":
aux_info.openvas_result = field.text.strip()
elif field.tag == "solution":
finding.mitigation = cleanup_openvas_text(field.text)
Loading
Loading