-
Notifications
You must be signed in to change notification settings - Fork 8
/
utils.py
67 lines (48 loc) · 1.78 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import json
import sys
import os
import logging
from dataclasses import is_dataclass, asdict
from datetime import datetime, date
from solnlib.log import Logs
from mappers import datetime_to_str
from urllib.parse import urlparse
app_name = "bitwarden_event_logs_beta"
def read_session_token() -> str:
session_token = sys.stdin.readline(5000).strip()
if session_token is None or session_token == '':
raise Exception('Session token not found')
return session_token
def is_splunk_environment():
return os.environ.get("SPLUNK_HOME") is not None
def set_logging_level(logging_level: str):
get_logger().setLevel(logging_level)
def get_logger() -> logging.Logger:
if not is_splunk_environment():
logger = logging.Logger(app_name)
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)
else:
Logs.set_context(log_level=logging.INFO)
logger = Logs().get_logger(app_name)
return logger
def obj_to_json(obj):
def json_serial(obj2):
if isinstance(obj2, (datetime, date)):
return datetime_to_str(obj2)
raise TypeError("Type %s not serializable" % type(obj2))
if is_dataclass(obj):
obj_dict = asdict(obj)
elif isinstance(obj, dict):
obj_dict = obj
else:
raise Exception("Object of type %s is not json serializable", type(obj))
return json.dumps(obj_dict,
default=json_serial,
separators=(",", ":"))
def secure_url(url: str):
result = urlparse(url, scheme='https')
if result.scheme == 'http':
raise Exception("URLs starting with 'http://' is considered insecure and not allowed in Splunk. "
"Please use 'https://' instead.")
return result.geturl()