Skip to content

Commit

Permalink
Slice CSV into smaller chunks.
Browse files Browse the repository at this point in the history
  • Loading branch information
asllop committed Dec 14, 2023
1 parent 341f68a commit 0586cc1
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions src/newrelic_logging/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class SalesforceApiException(Exception):
"SELECT+Id,+EventType,+CreatedDate,+LogDate,+Interval,+LogFile,+Sequence+From+EventLogFile+Where+LogDate>={" \
"from_timestamp}+AND+LogDate<{to_timestamp}+AND+Interval='{log_interval_type}' "

CSV_SLICE_SIZE = 1000

def base64_url_encode(json_obj):
json_str = json.dumps(json_obj)
Expand Down Expand Up @@ -296,7 +297,7 @@ def fetch_logs_from_single_req(self, session, query):
if 'LogFile' in record:
log = self.build_log_from_logfile(session, record)
if log is not None:
logs.append(log)
logs.extend(log)
else:
logs = self.build_log_from_event(records)

Expand All @@ -308,10 +309,7 @@ def is_logfile_response(self, records):
else:
return True

# TODO: buffer logs and then send them in batches

# TODO: set timestamps taken from event/logfile data.
# TODO: Use "actualTimestamp" attribute, to avoid API limits (48 hours old)
# TODO: Use alternative timestamp attribute to avoid API limits (48 hours old)

def build_log_from_event(self, records):
log_entries = []
Expand Down Expand Up @@ -351,13 +349,28 @@ def build_log_from_logfile(self, session, record):

csv_rows = self.parse_csv(download_response, record_id, record_event_type, cached_messages)

log_entries = []
print("CSV ROWS = ", len(csv_rows))

#TODO: split logs in groups of maximum 500 logs to avoid hitting API limits
# Split CSV rows into smaller chunks to avoid hitting API payload limits
logs = []
row_offset = 0
while True:
part_rows = self.extract_csv_slice(csv_rows)
part_rows_len = len(part_rows)
if part_rows_len > 0:
logs.append(self.pack_csv_into_log(record, row_offset, part_rows))
row_offset += part_rows_len
else:
break

return logs

print("CSV ROWS = ", len(csv_rows))
def pack_csv_into_log(self, record, row_offset, csv_rows):
record_id = str(record['Id'])
record_event_type = record['EventType']

for i, row in enumerate(csv_rows):
log_entries = []
for row_index, row in enumerate(csv_rows):
message = {}
if record_event_type in self.event_type_fields_mapping:
for field in self.event_type_fields_mapping[record_event_type]:
Expand All @@ -376,7 +389,7 @@ def build_log_from_logfile(self, session, record):
message['timestamp'] = int(timestamp)

log_entries.append({
'message': "LogFile " + record_id + " row " + str(i),
'message': "LogFile " + record_id + " row " + str(row_index + row_offset),
'attributes': message,
'timestamp': int(timestamp)
})
Expand All @@ -388,3 +401,14 @@ def build_log_from_logfile(self, session, record):
'LogDate': record['LogDate'],
'log_entries': log_entries
}

# Slice CSV into smaller groups
def extract_csv_slice(self, csv_rows):
part_rows = []
i = 0
while len(csv_rows) > 0:
part_rows.append(csv_rows.pop())
i += 1
if i >= CSV_SLICE_SIZE:
break
return part_rows

0 comments on commit 0586cc1

Please sign in to comment.