From 8e4c04d3cf615646aec57ade94a4f8d060d2587a Mon Sep 17 00:00:00 2001 From: v-sabiraj Date: Mon, 9 Dec 2024 14:55:37 +0530 Subject: [PATCH] grouping events based on types --- .../AzureFunctionOCILogs/main.py | 2 +- .../sentinel_connector.py | 44 ++++++++++--------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/main.py b/Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/main.py index 913895278e..a830590dcc 100644 --- a/Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/main.py +++ b/Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/main.py @@ -164,7 +164,7 @@ def process_events(client: oci.streaming.StreamClient, stream_id, initial_cursor if event["data"]["stateChange"] is not None and "current" in event["data"]["stateChange"] : event["data"]["stateChange"]["current"] = json.dumps( event["data"]["stateChange"]["current"]) - sentinel.send(event) + sentinel.send(event, log_type) sentinel.flush() if check_if_script_runs_too_long(start_ts): diff --git a/Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/sentinel_connector.py b/Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/sentinel_connector.py index 744857b00c..060987658c 100644 --- a/Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/sentinel_connector.py +++ b/Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/sentinel_connector.py @@ -9,42 +9,47 @@ class AzureSentinelConnector: - def __init__(self, log_analytics_uri, workspace_id, shared_key, log_type, queue_size=200, queue_size_bytes=25 * (2**20)): + def __init__(self, log_analytics_uri, workspace_id, shared_key, queue_size=200, queue_size_bytes=25 * (2**20)): self.log_analytics_uri = log_analytics_uri self.workspace_id = workspace_id self.shared_key = shared_key - self.log_type = log_type self.queue_size = queue_size - self.bulks_number = 1 self.queue_size_bytes = queue_size_bytes self._queue = [] self._bulks_list = [] self.successfull_sent_events_number = 0 + self.failed_sent_events_number = 0 - def send(self, event): - self._queue.append(event) + def send(self, event, log_type): + self._queue.append((event, log_type)) if len(self._queue) >= self.queue_size: self.flush(force=False) def flush(self, force=True): - self._bulks_list.append(self._queue) - if force: + if force or len(self._queue) >= self.queue_size: + self._bulks_list.append(self._queue) self._flush_bulks() - else: - if len(self._bulks_list) >= self.bulks_number: - self._flush_bulks() - - self._queue = [] + self._queue = [] def _flush_bulks(self): for queue in self._bulks_list: if queue: - queue_list = self._split_big_request(queue) - for q in queue_list: - self._post_data(self.workspace_id, self.shared_key, q, self.log_type) - + grouped_events = self._group_events_by_log_type(queue) + for log_type, events in grouped_events.items(): + queue_list = self._split_big_request(events) + for q in queue_list: + self._post_data(self.workspace_id, + self.shared_key, q, log_type) self._bulks_list = [] + def _group_events_by_log_type(self, queue): + grouped_events = {} + for event, log_type in queue: + if log_type not in grouped_events: + grouped_events[log_type] = [] + grouped_events[log_type].append(event) + return grouped_events + def is_empty(self): return not self._queue and not self._bulks_list @@ -91,19 +96,18 @@ def _post_data(self, workspace_id, shared_key, body, log_type): time.sleep(try_number) try_number += 1 else: - logging.error(str(err)) + logging.error(f"Failed after retries. Error: {err}") self.failed_sent_events_number += events_number raise err else: - logging.info('{} Log type value before posting the data'.format(log_type)) - logging.info('{} events have been successfully sent to Azure Sentinel'.format(events_number)) + logging.info(f"{events_number} events successfully sent with log type: {log_type}") self.successfull_sent_events_number += events_number break def _make_request(self, uri, body, headers): response = requests.post(uri, data=body, headers=headers) if not (200 <= response.status_code <= 299): - raise Exception("Error during sending events to Azure Sentinel. Response code: {}".format(response.status)) + raise Exception(f"Error sending events to Azure Sentinel. Response code: {response.status}") def _check_size(self, queue): data_bytes_len = len(json.dumps(queue).encode())