forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
censys_enrich.py
276 lines (238 loc) · 11.5 KB
/
censys_enrich.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# encoding: utf-8
import json
import configparser
import base64
import codecs
import censys.common.config
from dateutil.parser import isoparse
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
try:
#needed in order to overwrite the censys module intent of creating config files in the home folder of the proccess owner
#--
def get_config_over() -> configparser.ConfigParser:
config = configparser.ConfigParser()
config[censys.common.config.DEFAULT] = censys.common.config.default_config
return config
censys.common.config.get_config = get_config_over
#--
from censys.search import CensysHosts
from censys.search import CensysCertificates
from censys.common.base import *
except ImportError:
print("Censys module not installed. Try 'pip install censys'")
misperrors = {'error': 'Error'}
moduleconfig = ['api_id', 'api_secret']
mispattributes = {'input': ['ip-src', 'ip-dst', 'domain', 'hostname', 'hostname|port', 'domain|ip', 'ip-dst|port', 'ip-src|port',
'x509-fingerprint-md5', 'x509-fingerprint-sha1', 'x509-fingerprint-sha256'], 'format': 'misp_standard'}
moduleinfo = {'version': '0.1', 'author': 'Loïc Fortemps',
'description': 'Censys.io expansion module', 'module-type': ['expansion', 'hover']}
api_id = None
api_secret = None
def handler(q=False):
global api_id, api_secret
if q is False:
return False
request = json.loads(q)
if request.get('config'):
if (request['config'].get('api_id') is None) or (request['config'].get('api_secret') is None):
misperrors['error'] = "Censys API credentials are missing"
return misperrors
else:
misperrors['error'] = "Please provide config options"
return misperrors
api_id = request['config']['api_id']
api_secret = request['config']['api_secret']
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if not any(input_type == attribute['type'] for input_type in mispattributes['input']):
return {'error': 'Unsupported attribute type.'}
attribute = MISPAttribute()
attribute.from_dict(**request['attribute'])
# Lists to accomodate multi-types attribute
types = list()
values = list()
results = list()
if "|" in attribute.type:
t_1, t_2 = attribute.type.split('|')
v_1, v_2 = attribute.value.split('|')
# We cannot use the port information
if t_2 == "port":
types.append(t_1)
values.append(v_1)
else:
types = [t_1, t_2]
values = [v_1, v_2]
else:
types.append(attribute.type)
values.append(attribute.value)
found = False
for t in types:
try:
value = values.pop(0)
# ip, ip-src or ip-dst
if t[:2] == "ip":
r = CensysHosts(api_id, api_secret).view(value)
results.append(parse_response(r, attribute))
found = True
elif t == 'domain' or t == "hostname":
# get ips
endpoint = CensysHosts(api_id, api_secret)
for r_list in endpoint.search(query=value, per_page=5, pages=1):
for r in r_list:
results.append(parse_response(r, attribute))
found = True
elif 'x509-fingerprint-sha256' in t:
# use api_v1 as Certificates endpoint in api_v2 doesn't yet provide all the details
r = CensysCertificates(api_id, api_secret).view(value)
results.append(parse_response(r, attribute))
found = True
except CensysException as e:
misperrors['error'] = "ERROR: param {} / response: {}".format(value, e)
return misperrors
if not found:
misperrors['error'] = "Nothing could be found on Censys"
return misperrors
return {'results': remove_duplicates(results)}
def parse_response(censys_output, attribute):
misp_event = MISPEvent()
misp_event.add_attribute(**attribute)
# Generic fields (for IP/Websites)
if censys_output.get('autonomous_system'):
cen_as = censys_output.get('autonomous_system')
asn_object = MISPObject('asn')
asn_object.add_attribute('asn', value=cen_as.get("asn"))
asn_object.add_attribute('description', value=cen_as.get('name'))
asn_object.add_attribute('subnet-announced', value=cen_as.get('routed_prefix'))
asn_object.add_attribute('country', value=cen_as.get('country_code'))
asn_object.add_reference(attribute.uuid, 'associated-to')
misp_event.add_object(**asn_object)
if censys_output.get('ip') and len(censys_output.get('services')): #"ports" in censys_output
ip_object = MISPObject('ip-port')
ip_object.add_attribute('ip', value=censys_output.get('ip'))
for serv in censys_output.get('services'):
if serv.get('port'):
ip_object.add_attribute('dst-port', value=serv.get('port'))
ip_object.add_reference(attribute.uuid, 'associated-to')
misp_event.add_object(**ip_object)
# We explore all ports to find https or ssh services
for serv in censys_output.get('services', []):
if not isinstance(serv, dict):
continue
if serv.get('service_name').lower() == 'http' and serv.get('certificate', None):
try:
cert = serv.get('certificate', None)
if cert:
# TODO switch to api_v2 once available
# use api_v1 as Certificates endpoint in api_v2 doesn't yet provide all the details
cert_details = CensysCertificates(api_id, api_secret).view(cert)
cert_obj = get_certificate_object(cert_details, attribute)
misp_event.add_object(**cert_obj)
except KeyError:
print("Error !")
if serv.get('ssh') and serv.get('service_name').lower() == 'ssh':
try:
cert = serv.get('ssh').get('server_host_key').get('fingerprint_sha256')
# TODO enable once the type is merged
# misp_event.add_attribute(type='hasshserver-sha256', value=cert['fingerprint_sha256'])
except KeyError:
pass
# Info from certificate query
if "parsed" in censys_output:
cert_obj = get_certificate_object(censys_output, attribute)
misp_event.add_object(**cert_obj)
# Location can be present for IP/Websites results
if "location" in censys_output:
loc_obj = MISPObject('geolocation')
loc = censys_output['location']
loc_obj.add_attribute('latitude', value=loc.get('coordinates', {}).get('latitude', None))
loc_obj.add_attribute('longitude', value=loc.get('coordinates', {}).get('longitude', None))
if 'city' in loc:
loc_obj.add_attribute('city', value=loc.get('city'))
loc_obj.add_attribute('country', value=loc.get('country'))
if 'postal_code' in loc:
loc_obj.add_attribute('zipcode', value=loc.get('postal_code'))
if 'province' in loc:
loc_obj.add_attribute('region', value=loc.get('province'))
loc_obj.add_reference(attribute.uuid, 'associated-to')
misp_event.add_object(**loc_obj)
event = json.loads(misp_event.to_json())
return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])}
# In case of multiple enrichment (ip and domain), we need to filter out similar objects
# TODO: make it more granular
def remove_duplicates(results):
# Only one enrichment was performed so no duplicate
if len(results) == 1:
return results[0]
else:
final_result = results[0]
for i,result in enumerate(results[1:]):
obj_l = results[i+1].get('Object', [])
for o2 in obj_l:
if o2['name'] == "asn":
key = "asn"
elif o2['name'] == "ip-port":
key = "ip"
elif o2['name'] == "x509":
key = "x509-fingerprint-sha256"
elif o2['name'] == "geolocation":
key = "latitude"
if not check_if_present(o2, key, final_result.get('Object', [])):
final_result['Object'].append(o2)
return final_result
def check_if_present(object, attribute_name, list_objects):
"""
Assert if a given object is present in the list.
This function check if object (json format) is present in list_objects
using attribute_name for the matching
"""
for o in list_objects:
# We first look for a match on the name
if o['name'] == object['name']:
for attr in object['Attribute']:
# Within the attributes, we look for the one to compare
if attr['type'] == attribute_name:
# Then we check the attributes of the other object and look for a match
for attr2 in o['Attribute']:
if attr2['type'] == attribute_name and attr2['value'] == attr['value']:
return True
return False
def get_certificate_object(cert, attribute):
parsed = cert['parsed']
cert_object = MISPObject('x509')
cert_object.add_attribute('x509-fingerprint-sha256', value=parsed['fingerprint_sha256'])
cert_object.add_attribute('x509-fingerprint-sha1', value=parsed['fingerprint_sha1'])
cert_object.add_attribute('x509-fingerprint-md5', value=parsed['fingerprint_md5'])
cert_object.add_attribute('serial-number', value=parsed['serial_number'])
cert_object.add_attribute('version', value=parsed['version'])
cert_object.add_attribute('subject', value=parsed['subject_dn'])
cert_object.add_attribute('issuer', value=parsed['issuer_dn'])
cert_object.add_attribute('validity-not-before', value=isoparse(parsed['validity']['start']))
cert_object.add_attribute('validity-not-after', value=isoparse(parsed['validity']['end']))
cert_object.add_attribute('self_signed', value=parsed['signature']['self_signed'])
cert_object.add_attribute('signature_algorithm', value=parsed['signature']['signature_algorithm']['name'])
cert_object.add_attribute('pubkey-info-algorithm', value=parsed['subject_key_info']['key_algorithm']['name'])
if 'rsa_public_key' in parsed['subject_key_info']:
pub_key = parsed['subject_key_info']['rsa_public_key']
cert_object.add_attribute('pubkey-info-size', value=pub_key['length'])
cert_object.add_attribute('pubkey-info-exponent', value=pub_key['exponent'])
hex_mod = codecs.encode(base64.b64decode(pub_key['modulus']), 'hex').decode()
cert_object.add_attribute('pubkey-info-modulus', value=hex_mod)
if "extensions" in parsed and "subject_alt_name" in parsed["extensions"]:
san = parsed["extensions"]["subject_alt_name"]
if "dns_names" in san:
for dns in san['dns_names']:
cert_object.add_attribute('dns_names', value=dns)
if "ip_addresses" in san:
for ip in san['ip_addresses']:
cert_object.add_attribute('ip', value=ip)
if "raw" in cert:
cert_object.add_attribute('raw-base64', value=cert['raw'])
cert_object.add_reference(attribute.uuid, 'associated-to')
return cert_object
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo