Skip to content

Commit

Permalink
grouping events based on types
Browse files Browse the repository at this point in the history
  • Loading branch information
v-sabiraj committed Dec 9, 2024
1 parent 0bbbb65 commit 8e4c04d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def process_events(client: oci.streaming.StreamClient, stream_id, initial_cursor
if event["data"]["stateChange"] is not None and "current" in event["data"]["stateChange"] :
event["data"]["stateChange"]["current"] = json.dumps(
event["data"]["stateChange"]["current"])
sentinel.send(event)
sentinel.send(event, log_type)

sentinel.flush()
if check_if_script_runs_too_long(start_ts):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,47 @@


class AzureSentinelConnector:
def __init__(self, log_analytics_uri, workspace_id, shared_key, log_type, queue_size=200, queue_size_bytes=25 * (2**20)):
def __init__(self, log_analytics_uri, workspace_id, shared_key, queue_size=200, queue_size_bytes=25 * (2**20)):
self.log_analytics_uri = log_analytics_uri
self.workspace_id = workspace_id
self.shared_key = shared_key
self.log_type = log_type
self.queue_size = queue_size
self.bulks_number = 1
self.queue_size_bytes = queue_size_bytes
self._queue = []
self._bulks_list = []
self.successfull_sent_events_number = 0
self.failed_sent_events_number = 0

def send(self, event):
self._queue.append(event)
def send(self, event, log_type):
self._queue.append((event, log_type))
if len(self._queue) >= self.queue_size:
self.flush(force=False)

def flush(self, force=True):
self._bulks_list.append(self._queue)
if force:
if force or len(self._queue) >= self.queue_size:
self._bulks_list.append(self._queue)
self._flush_bulks()
else:
if len(self._bulks_list) >= self.bulks_number:
self._flush_bulks()

self._queue = []
self._queue = []

def _flush_bulks(self):
for queue in self._bulks_list:
if queue:
queue_list = self._split_big_request(queue)
for q in queue_list:
self._post_data(self.workspace_id, self.shared_key, q, self.log_type)

grouped_events = self._group_events_by_log_type(queue)
for log_type, events in grouped_events.items():
queue_list = self._split_big_request(events)
for q in queue_list:
self._post_data(self.workspace_id,
self.shared_key, q, log_type)
self._bulks_list = []

def _group_events_by_log_type(self, queue):
grouped_events = {}
for event, log_type in queue:
if log_type not in grouped_events:
grouped_events[log_type] = []
grouped_events[log_type].append(event)
return grouped_events

def is_empty(self):
return not self._queue and not self._bulks_list

Expand Down Expand Up @@ -91,19 +96,18 @@ def _post_data(self, workspace_id, shared_key, body, log_type):
time.sleep(try_number)
try_number += 1
else:
logging.error(str(err))
logging.error(f"Failed after retries. Error: {err}")
self.failed_sent_events_number += events_number
raise err
else:
logging.info('{} Log type value before posting the data'.format(log_type))
logging.info('{} events have been successfully sent to Azure Sentinel'.format(events_number))
logging.info(f"{events_number} events successfully sent with log type: {log_type}")
self.successfull_sent_events_number += events_number
break

def _make_request(self, uri, body, headers):
response = requests.post(uri, data=body, headers=headers)
if not (200 <= response.status_code <= 299):
raise Exception("Error during sending events to Azure Sentinel. Response code: {}".format(response.status))
raise Exception(f"Error sending events to Azure Sentinel. Response code: {response.status}")

def _check_size(self, queue):
data_bytes_len = len(json.dumps(queue).encode())
Expand Down

0 comments on commit 8e4c04d

Please sign in to comment.