forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 3
/
xforceexchange.py
179 lines (154 loc) · 7.91 KB
/
xforceexchange.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import requests
import json
import sys
from . import check_input_attribute, standard_error_message
from collections import defaultdict
from pymisp import MISPAttribute, MISPEvent, MISPObject
from requests.auth import HTTPBasicAuth
sys.path.append('./')
misperrors = {'error': 'Error'}
mispattributes = {'input': ['ip-src', 'ip-dst', 'vulnerability', 'md5', 'sha1', 'sha256', 'domain', 'hostname', 'url'],
'output': ['ip-src', 'ip-dst', 'text', 'domain'],
'format': 'misp_standard'}
# possible module-types: 'expansion', 'hover' or both
moduleinfo = {'version': '2', 'author': 'Joerg Stephan (@johest)',
'description': 'IBM X-Force Exchange expansion module',
'module-type': ['expansion', 'hover']}
# config fields that your code expects from the site admin
moduleconfig = ["apikey", "apipassword"]
class XforceExchange():
def __init__(self, attribute, apikey, apipassword):
self.base_url = "https://api.xforce.ibmcloud.com"
self.misp_event = MISPEvent()
self.attribute = MISPAttribute()
self.attribute.from_dict(**attribute)
self._apikey = apikey
self._apipassword = apipassword
self.result = {}
self.objects = defaultdict(dict)
self.status_mapping = {403: "Access denied, please check if your authentication is valid and if you did not reach the limit of queries.",
404: "No result found for your query."}
def parse(self):
mapping = {'url': '_parse_url', 'vulnerability': '_parse_vulnerability'}
mapping.update(dict.fromkeys(('md5', 'sha1', 'sha256'), '_parse_hash'))
mapping.update(dict.fromkeys(('domain', 'hostname'), '_parse_dns'))
mapping.update(dict.fromkeys(('ip-src', 'ip-dst'), '_parse_ip'))
to_call = mapping[self.attribute.type]
getattr(self, to_call)(self.attribute.value)
def get_result(self):
if not self.misp_event.objects:
if 'error' not in self.result:
self.result['error'] = "No additional data found on Xforce Exchange."
return self.result
self.misp_event.add_attribute(**self.attribute)
event = json.loads(self.misp_event.to_json())
result = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {'results': result}
def _api_call(self, url):
try:
result = requests.get(url, auth=HTTPBasicAuth(self._apikey, self._apipassword))
except Exception as e:
self.result['error'] = e
return
status_code = result.status_code
if status_code != 200:
try:
self.result['error'] = self.status_mapping[status_code]
except KeyError:
self.result['error'] = 'An error with the API has occurred.'
return
return result.json()
def _create_file(self, malware, relationship):
file_object = MISPObject('file')
for key, relation in zip(('filepath', 'md5'), ('filename', 'md5')):
file_object.add_attribute(relation, malware[key])
file_object.add_reference(self.attribute.uuid, relationship)
return file_object
def _create_url(self, malware):
url_object = MISPObject('url')
for key, relation in zip(('uri', 'domain'), ('url', 'domain')):
url_object.add_attribute(relation, malware[key])
attributes = tuple(f'{attribute.object_relation}_{attribute.value}' for attribute in url_object.attributes)
if attributes in self.objects['url']:
del url_object
return self.objects['url'][attributes]
url_uuid = url_object.uuid
self.misp_event.add_object(**url_object)
self.objects['url'][attributes] = url_uuid
return url_uuid
def _fetch_types(self, value):
if self.attribute.type in ('ip-src', 'ip-dst'):
return 'ip', 'domain', self.attribute.value
return 'domain', 'ip', value
def _handle_file(self, malware, relationship):
file_object = self._create_file(malware, relationship)
attributes = tuple(f'{attribute.object_relation}_{attribute.value}' for attribute in file_object.attributes)
if attributes in self.objects['file']:
self.objects['file'][attributes].add_reference(self._create_url(malware), 'dropped-by')
del file_object
return
file_object.add_reference(self._create_url(malware), 'dropped-by')
self.objects['file'][attributes] = file_object
self.misp_event.add_object(**file_object)
def _parse_dns(self, value):
dns_result = self._api_call(f'{self.base_url}/resolve/{value}')
if dns_result.get('Passive') and dns_result['Passive'].get('records'):
itype, ftype, value = self._fetch_types(dns_result['Passive']['query'])
misp_object = MISPObject('domain-ip')
misp_object.add_attribute(itype, value)
for record in dns_result['Passive']['records']:
misp_object.add_attribute(ftype, record['value'])
misp_object.add_reference(self.attribute.uuid, 'related-to')
self.misp_event.add_object(**misp_object)
def _parse_hash(self, value):
malware_result = self._api_call(f'{self.base_url}/malware/{value}')
if malware_result and malware_result.get('malware'):
malware_report = malware_result['malware']
for malware in malware_report.get('origins', {}).get('CnCServers', {}).get('rows', []):
self._handle_file(malware, 'related-to')
def _parse_ip(self, value):
self._parse_dns(value)
self._parse_malware(value, 'ipr')
def _parse_malware(self, value, feature):
malware_result = self._api_call(f'{self.base_url}/{feature}/malware/{value}')
if malware_result and malware_result.get('malware'):
for malware in malware_result['malware']:
self._handle_file(malware, 'associated-with')
def _parse_url(self, value):
self._parse_dns(value)
self._parse_malware(value, 'url')
def _parse_vulnerability(self, value):
vulnerability_result = self._api_call(f'{self.base_url}/vulnerabilities/search/{value}')
if vulnerability_result:
for vulnerability in vulnerability_result:
misp_object = MISPObject('vulnerability')
for code in vulnerability['stdcode']:
misp_object.add_attribute('id', code)
for feature, relation in zip(('title', 'description', 'temporal_score'),
('summary', 'description', 'cvss-score')):
misp_object.add_attribute(relation, vulnerability[feature])
for reference in vulnerability['references']:
misp_object.add_attribute('references', reference['link_target'])
misp_object.add_reference(self.attribute.uuid, 'related-to')
self.misp_event.add_object(**misp_object)
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get('config') or not (request['config'].get('apikey') and request['config'].get('apipassword')):
misperrors['error'] = 'An API authentication is required (key and password).'
return misperrors
key = request["config"]["apikey"]
password = request['config']['apipassword']
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message} which should contain at least a type, a value and an uuid.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
parser = XforceExchange(request['attribute'], key, password)
parser.parse()
return parser.get_result()
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo