forked from ThreatConnect-Inc/tcex-requests-tc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tc_session.py
126 lines (102 loc) · 4.33 KB
/
tc_session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""TcEx Framework Module"""
# standard library
import logging
# third-party
import urllib3
from requests import Response # TYPE-CHECKING
from requests import Session, adapters
from urllib3.util.retry import Retry
from ..util.requests_to_curl import RequestsToCurl # type: ignore # pylint: disable=import-error
from ..util.util import Util # type: ignore # pylint: disable=import-error
from .auth.hmac_auth import HmacAuth
from .auth.tc_auth import TcAuth
from .auth.token_auth import TokenAuth
# get logger
_logger = logging.getLogger(__name__.split('.', maxsplit=1)[0])
# disable ssl warning message
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # type: ignore
class TcSession(Session):
"""ThreatConnect REST API Requests Session"""
def __init__(
self,
auth: HmacAuth | TokenAuth | TcAuth,
base_url: str | None = None,
log_curl: bool | None = False,
proxies: dict[str, str] | None = None,
proxies_enabled: bool | None = False,
user_agent: dict | None = None,
verify: bool | str | None = True,
):
"""Initialize the Class properties."""
super().__init__()
self.base_url = base_url.strip('/') if base_url is not None else base_url
self.log = _logger
self.log_curl = log_curl
# properties
self.requests_to_curl = RequestsToCurl()
self.util = Util()
# configure auth
self.auth = auth
# configure optional headers
if user_agent:
self.headers.update(user_agent)
# configure proxy
if proxies and proxies_enabled:
self.proxies = proxies
# configure verify
self.verify = verify
# Add Retry
self.retry()
def _log_curl(self, response: Response):
"""Log the curl equivalent command."""
# don't show curl message for logging commands
if response.request.url is not None and '/v2/logs/app' not in response.request.url:
# APP-79 - adding logging of request as curl commands
if not response.ok or self.log_curl:
try:
self.log.debug(
self.requests_to_curl.convert(
response.request, proxies=self.proxies, verify=self.verify
)
)
except Exception: # nosec
pass # logging curl command is best effort
def request(self, method, url, **kwargs): # pylint: disable=arguments-differ
"""Override request method disabling verify on token renewal if disabled on session."""
response = super().request(method, self.url(url), **kwargs)
# retry request in case we encountered a race condition with token renewal monitor
if response.status_code == 401:
self.log.debug(
f'Unexpected response received while attempting to send a request using internal '
f'session object. Retrying request. feature=tc-session, '
f'request-url={response.request.url}, status-code={response.status_code}'
)
response = super().request(method, self.url(url), **kwargs)
# optionally log the curl command
self._log_curl(response)
# log request and response data
self.log.debug(
f'feature=tc-session, method={method}, request-url={response.request.url}, '
f'status-code={response.status_code}, elapsed={response.elapsed}'
)
return response
def retry(self, retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504)):
"""Add retry to Requests Session
https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.retry.Retry
"""
retries = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor, # type: ignore
status_forcelist=status_forcelist,
)
# mount all https requests
self.mount('https://', adapters.HTTPAdapter(max_retries=retries))
def url(self, url: str) -> str:
"""Return appropriate URL string.
The method allows the session to accept the URL Path or the full URL.
"""
if not url.startswith('https'):
return f'{self.base_url}{url}'
return url