-
Notifications
You must be signed in to change notification settings - Fork 0
/
endpoint_query.py
232 lines (200 loc) · 10.3 KB
/
endpoint_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# -*- coding: utf-8 -*-
import hashlib
import hmac
import email
import time
import json
from typing import Optional
import requests
import logging
from datetime import datetime, timedelta, timezone
from pprint import pprint
from stix.core import STIXPackage
from io import BytesIO, StringIO
from cabby import create_client
class APIRequestHandler(object):
"""Standard class to call FireEye REST API
:var str URL: initial value: 'https://api.isightpartners.com'
:var str public_key: initial value: ''
:var str private_key: initial value: ''
:var str accept_version: initial value: '2.5'
:var logging.Logger logger: initial value: ``logging.Logger``
:var requests.Session session: initial value: ``requests.Session()``
"""
def __init__(self):
self.URL: str = 'https://api.isightpartners.com'
self.public_key: str = ''
self.private_key: str = ''
self.accept_version: str = '2.5'
self.session: requests.Session = requests.Session()
logger: logging.Logger = logging.getLogger(__name__)
log_level: int = logging.INFO
logger.setLevel(log_level)
handler = logging.StreamHandler()
handler.setLevel(log_level)
if logger.handlers:
logger.handlers = []
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
self.logger = logger
def init(self, URL: str = "https://api.isightpartners.com", public_key: Optional[str] = None, private_key: Optional[str] = None, accept_version: str = '2.5', logger: Optional[logging.Logger] = None, session: requests.Session = requests.Session()):
"""Initializes class with optional attributes
:param str URL: URL for FireEye iSight. Defaults to 'https://api.isightpartners.com'
:param str public_key: Public API Key. Defaults to ''
:param str private_key: Private API Key. Defaults to ''
:param str accept_version: FireEye iSight version. Defaults to '2.5'
:param ``logging.Logger`` logger: Logger object. Defaults to logging.getLogger(__name__)
:param ``requests.Session`` session: Requests session. Defaults to requests.Session()
"""
self.URL = URL if URL != None else 'https://api.isightpartners.com'
self.public_key = public_key if public_key != None else ''
self.private_key = private_key if private_key != None else ''
self.accept_version = accept_version
self.logger = logger if logger != None else self.logger
self.session = session
def prepare_headers(self, endpoint: str, accept: str):
"""Attaches headers to handler session
:param str endpoint: endpoint to hit for hashing
:param str accept: content type to get for acceptance
"""
# create timestamp for headers
time_stamp = email.utils.formatdate(localtime=True)
new_data = endpoint + self.accept_version + accept + time_stamp
self.logger.debug("message=\"Header info\", endpoint=\"{}\", accept=\"{}\", timestamp=\"{}\", accept_version=\"{}\"".format(
endpoint, accept, time_stamp, self.accept_version
))
# create hash for headers
key = bytearray()
key.extend(map(ord, self.private_key))
hashed = hmac.new(key, new_data.encode('utf-8'), hashlib.sha256)
self.logger.debug("message=\"Hash header for request\", hash=\"" + str(hashed.hexdigest()) + "\"")
# set header info
headers = {
'Accept': accept,
'Accept-Version': self.accept_version,
'X-Auth': self.public_key,
'X-Auth-Hash': hashed.hexdigest(),
'Date': time_stamp,
}
self.session.headers.update(headers)
def returnHelper(self, response: requests.Response):
if response.status_code == 200:
try:
return_json = response.json()
except:
self.logger.error("message=\"" + str(response.content) + "\"")
return response
return return_json
else:
self.logger.error("message=\"" + str(response.content) + "\"")
return response
def getIocs(self, startDate: datetime = datetime.now() - timedelta(days=7), endDate: datetime = datetime.now(), accept_header: str = 'application/json'):
"""Gets IoCs in a timerange from FireEye iSight API
:param startDate datetime: Start time. Defaults to ``datetime.now()-timedelta(days=7)``.
:param endDate datetime: End time. Defaults to ``datetime.now()``.
:param accept_header str: Mimetype return format. Defaults to 'application/json'.
:return: Depending on ``accept_header``, returns either dictionary json data or request response
:rtype: dict OR :class:`request.Response`
"""
# format endpoint
startDate = int((startDate - datetime(1970,1,1)).total_seconds()) # type: ignore
endDate = int((endDate - datetime(1970,1,1)).total_seconds()) # type: ignore
ENDPOINT = '/view/iocs?startDate='+str(startDate)+'&endDate='+str(endDate)
# get header info for future requests
self.prepare_headers(ENDPOINT, accept_header)
self.logger.info("message=\"Executing request\", startDate=\""+str(startDate) + "\", endDate=\"" + str(endDate) + "\", endpoint=\"" + ENDPOINT + "\"")
# execute request
r = self.session.get(self.URL + ENDPOINT)
return self.returnHelper(r)
def getReport(self, reportId: str, accept_header: str = "application/stix"):
"""Gets report endpoint for IoCs
:param str reportId: ID of report
:param str accept_header: header to accept different datatypes returned. Defaults to "application/stix".
:return: Report request response
:rtype: ``requests.Response``
"""
ENDPOINT = "/report/" + reportId
self.logger.info("message=\"Getting endpoint " + ENDPOINT + "\"")
self.prepare_headers(ENDPOINT, accept_header)
r = self.session.get(self.URL + ENDPOINT)
if accept_header == "application/stix":
return r.text
return self.returnHelper(r)
def uploadFile(self, filename: str, FILE: StringIO):
"""Upload file to search list
:param str filename: Name of the file you're uploading
:param StringIO FILE: File-like object
:return: Return the response
:rtype: :class:`request.Response`:
"""
ENDPOINT = '/search/list'
files = {'file' : (filename, FILE)}
self.prepare_headers(ENDPOINT, 'application/json')
r = requests.post(self.URL + ENDPOINT, files=files)
return self.returnHelper(r)
def taxiiPush(self, **kwargs):
"""Pushes taxii data to a taxii server
``jwt_auth_url`` is required for JWT based authentication. If
it is not specified but ``username`` and ``password`` are provided,
client will configure Basic authentication.
SSL authentication can be combined with JWT and Basic
authentication.
:param str ca_cert: a path to CA SSL certificate file
:param str cert_file: a path to SSL certificate file
:param str key_file: a path to SSL key file
:param str username: username, used in basic auth or JWT auth
:param str password: password, used in basic auth or JWT auth
:param str key_password: same argument as in
``ssl.SSLContext.load_cert_chain`` - may be a function to call
to get the password for decrypting the private key or
string/bytes/bytearray. It will only be called if the private
key is encrypted and a password is necessary.
:param str jwt_auth_url: URL used to obtain JWT token
:param bool/str verify_ssl: set to False to skip checking host's SSL
certificate. Set to True to check certificate against public CAs or
set to filepath to check against custom CA bundle.
:param str content: content to push
:param content_binding: content binding for a content
:type content_binding: string or
:py:class:`cabby.entities.ContentBinding`
:param list collection_names:
destination collection names
:param datetime timestamp: timestamp label of the content block
(current UTC time by default)
:param str uri: URI path to a specific Inbox Service
:raises ValueError:
if URI provided is invalid or schema is not supported
:raises `cabby.exceptions.HTTPError`:
if HTTP error happened
:raises `cabby.exceptions.UnsuccessfulStatusError`:
if Status Message received and status_type is not `SUCCESS`
:raises `cabby.exceptions.ServiceNotFoundError`:
if no service found
:raises `cabby.exceptions.AmbiguousServicesError`:
more than one service with type specified
:raises `cabby.exceptions.NoURIProvidedError`:
no URI provided and client can't discover services
:return: STIX object from python-stix
:rtype: ``stix.core.stix_package.STIXPackage``
"""
client = create_client()
content = kwargs.get("content")
if 'username' in kwargs:
self.logger.debug("message=\"Using basic auth\"")
client.set_auth(username=kwargs.get("username"), password=kwargs.get("password"), jwt_auth_url=kwargs.get("jwt_auth_url"), verify_ssl=kwargs.get("verify_ssl"))
elif 'cert_file' in kwargs:
self.logger.debug("message=\"Using cert auth\"")
client.set_auth(ca_cert=kwargs.get("ca_cert"), cert_file=kwargs.get("cert_file"), key_file=kwargs.get("key_file"), key_password=kwargs.get("key_password"), verify_ssl=kwargs.get("verify_ssl"))
content_io = StringIO(content)
stix = STIXPackage().from_xml(content_io)
file_name = stix.id_
self.logger.info("message=\"Pushing STIX " + file_name + "\"")
client.push(
content=content,
content_binding=kwargs.get("content_binding"),
collection_names=kwargs.get("collection_names"),
timestamp=kwargs.get("timestamp"),
uri=kwargs.get("uri")
)
return stix