From 0586cc1b8867f08cb13bcb87465482c120e5e7df Mon Sep 17 00:00:00 2001 From: Andreu Date: Thu, 14 Dec 2023 10:52:41 +0100 Subject: [PATCH] Slice CSV into smaller chunks. --- src/newrelic_logging/salesforce.py | 44 +++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/src/newrelic_logging/salesforce.py b/src/newrelic_logging/salesforce.py index aac0c94..378ffab 100644 --- a/src/newrelic_logging/salesforce.py +++ b/src/newrelic_logging/salesforce.py @@ -27,6 +27,7 @@ class SalesforceApiException(Exception): "SELECT+Id,+EventType,+CreatedDate,+LogDate,+Interval,+LogFile,+Sequence+From+EventLogFile+Where+LogDate>={" \ "from_timestamp}+AND+LogDate<{to_timestamp}+AND+Interval='{log_interval_type}' " +CSV_SLICE_SIZE = 1000 def base64_url_encode(json_obj): json_str = json.dumps(json_obj) @@ -296,7 +297,7 @@ def fetch_logs_from_single_req(self, session, query): if 'LogFile' in record: log = self.build_log_from_logfile(session, record) if log is not None: - logs.append(log) + logs.extend(log) else: logs = self.build_log_from_event(records) @@ -308,10 +309,7 @@ def is_logfile_response(self, records): else: return True - # TODO: buffer logs and then send them in batches - - # TODO: set timestamps taken from event/logfile data. - # TODO: Use "actualTimestamp" attribute, to avoid API limits (48 hours old) + # TODO: Use alternative timestamp attribute to avoid API limits (48 hours old) def build_log_from_event(self, records): log_entries = [] @@ -351,13 +349,28 @@ def build_log_from_logfile(self, session, record): csv_rows = self.parse_csv(download_response, record_id, record_event_type, cached_messages) - log_entries = [] + print("CSV ROWS = ", len(csv_rows)) - #TODO: split logs in groups of maximum 500 logs to avoid hitting API limits + # Split CSV rows into smaller chunks to avoid hitting API payload limits + logs = [] + row_offset = 0 + while True: + part_rows = self.extract_csv_slice(csv_rows) + part_rows_len = len(part_rows) + if part_rows_len > 0: + logs.append(self.pack_csv_into_log(record, row_offset, part_rows)) + row_offset += part_rows_len + else: + break + + return logs - print("CSV ROWS = ", len(csv_rows)) + def pack_csv_into_log(self, record, row_offset, csv_rows): + record_id = str(record['Id']) + record_event_type = record['EventType'] - for i, row in enumerate(csv_rows): + log_entries = [] + for row_index, row in enumerate(csv_rows): message = {} if record_event_type in self.event_type_fields_mapping: for field in self.event_type_fields_mapping[record_event_type]: @@ -376,7 +389,7 @@ def build_log_from_logfile(self, session, record): message['timestamp'] = int(timestamp) log_entries.append({ - 'message': "LogFile " + record_id + " row " + str(i), + 'message': "LogFile " + record_id + " row " + str(row_index + row_offset), 'attributes': message, 'timestamp': int(timestamp) }) @@ -388,3 +401,14 @@ def build_log_from_logfile(self, session, record): 'LogDate': record['LogDate'], 'log_entries': log_entries } + + # Slice CSV into smaller groups + def extract_csv_slice(self, csv_rows): + part_rows = [] + i = 0 + while len(csv_rows) > 0: + part_rows.append(csv_rows.pop()) + i += 1 + if i >= CSV_SLICE_SIZE: + break + return part_rows