forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 3
/
cve_advanced.py
175 lines (152 loc) · 6.93 KB
/
cve_advanced.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import json
import requests
from . import check_input_attribute, standard_error_message
from collections import defaultdict
from pymisp import MISPAttribute, MISPEvent, MISPObject
misperrors = {'error': 'Error'}
mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'}
moduleinfo = {'version': '2', 'author': 'Christian Studer',
'description': 'An expansion module to enrich a CVE attribute with the vulnerability information.',
'module-type': ['expansion', 'hover']}
moduleconfig = ["custom_API"]
cveapi_url = 'https://cvepremium.circl.lu/api/'
class VulnerabilityParser():
def __init__(self, attribute, api_url):
misp_attribute = MISPAttribute()
misp_attribute.from_dict(**attribute)
misp_event = MISPEvent()
misp_event.add_attribute(**misp_attribute)
self.__misp_attribute = misp_attribute
self.__misp_event = misp_event
self.__api_url = api_url
self.references = defaultdict(list)
self.__capec_features = ('id', 'name', 'summary', 'prerequisites', 'solutions')
self.__vulnerability_mapping = {
'id': 'id', 'summary': 'summary',
'Modified': 'modified', 'cvss3': 'cvss-score',
'cvss3-vector': 'cvss-string'
}
self.__vulnerability_multiple_mapping = {
'vulnerable_configuration': 'vulnerable-configuration',
'vulnerable_configuration_cpe_2_2': 'vulnerable-configuration',
'references': 'references'
}
self.__weakness_mapping = {
'name': 'name', 'description_summary': 'description',
'status': 'status', 'weaknessabs': 'weakness-abs'
}
@property
def api_url(self) -> str:
return self.__api_url
@property
def capec_features(self) -> tuple:
return self.__capec_features
@property
def misp_attribute(self) -> MISPAttribute:
return self.__misp_attribute
@property
def misp_event(self) -> MISPEvent:
return self.__misp_event
@property
def vulnerability_mapping(self) -> dict:
return self.__vulnerability_mapping
@property
def vulnerability_multiple_mapping(self) -> dict:
return self.__vulnerability_multiple_mapping
@property
def weakness_mapping(self) -> dict:
return self.__weakness_mapping
def get_result(self):
if self.references:
self.__build_references()
event = json.loads(self.misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {'results': results}
def parse_vulnerability_information(self, vulnerability):
vulnerability_object = MISPObject('vulnerability')
for feature, relation in self.vulnerability_mapping.items():
if vulnerability.get(feature):
vulnerability_object.add_attribute(relation, vulnerability[feature])
if 'Published' in vulnerability:
vulnerability_object.add_attribute('published', vulnerability['Published'])
vulnerability_object.add_attribute('state', 'Published')
for feature, relation in self.vulnerability_multiple_mapping.items():
if feature in vulnerability:
for value in vulnerability[feature]:
if isinstance(value, dict):
value = value['title']
vulnerability_object.add_attribute(relation, value)
vulnerability_object.add_reference(self.misp_attribute.uuid, 'related-to')
self.misp_event.add_object(vulnerability_object)
if 'cwe' in vulnerability and vulnerability['cwe'] not in ('Unknown', 'NVD-CWE-noinfo'):
self.__parse_weakness(vulnerability['cwe'], vulnerability_object.uuid)
if 'capec' in vulnerability:
self.__parse_capec(vulnerability['capec'], vulnerability_object.uuid)
def __build_references(self):
for object_uuid, references in self.references.items():
for misp_object in self.misp_event.objects:
if misp_object.uuid == object_uuid:
for reference in references:
misp_object.add_reference(**reference)
break
def __parse_capec(self, capec_values, vulnerability_uuid):
for capec in capec_values:
capec_object = MISPObject('attack-pattern')
for feature in self.capec_features:
capec_object.add_attribute(feature, capec[feature])
for related_weakness in capec['related_weakness']:
capec_object.add_attribute('related-weakness', f"CWE-{related_weakness}")
self.misp_event.add_object(capec_object)
self.references[vulnerability_uuid].append(
{
'referenced_uuid': capec_object.uuid,
'relationship_type': 'targeted-by'
}
)
def __parse_weakness(self, cwe_value, vulnerability_uuid):
cwe_string, cwe_id = cwe_value.split('-')[:2]
cwe = requests.get(f'{self.api_url}cwe/{cwe_id}')
if cwe.status_code == 200:
cwe = cwe.json()
weakness_object = MISPObject('weakness')
weakness_object.add_attribute('id', f'{cwe_string}-{cwe_id}')
for feature, relation in self.weakness_mapping.items():
if cwe.get(feature):
weakness_object.add_attribute(relation, cwe[feature])
self.misp_event.add_object(weakness_object)
self.references[vulnerability_uuid].append(
{
'referenced_uuid': weakness_object.uuid,
'relationship_type': 'weakened-by'
}
)
def check_url(url):
return f"{url}/" if not url.endswith('/') else url
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if attribute.get('type') != 'vulnerability':
misperrors['error'] = 'Vulnerability id missing.'
return misperrors
api_url = check_url(request['config']['custom_API']) if request.get('config', {}).get('custom_API') else cveapi_url
r = requests.get(f"{api_url}cve/{attribute['value']}")
if r.status_code == 200:
vulnerability = r.json()
if not vulnerability:
misperrors['error'] = 'Non existing CVE'
return misperrors['error']
else:
misperrors['error'] = 'API not accessible'
return misperrors['error']
parser = VulnerabilityParser(attribute, api_url)
parser.parse_vulnerability_information(vulnerability)
return parser.get_result()
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo