From 3f83dcc0a3c26de9b80e8b24b33c7637ea80dda5 Mon Sep 17 00:00:00 2001 From: Yankun Xia <129240889+onewbiek@users.noreply.github.com> Date: Fri, 3 May 2024 16:17:46 -0400 Subject: [PATCH] Fix DXT stack trace integration (#15) * Fix DXT conflict with Recorder * Fix DXT merge conflict * Cut redundant code * Bug Fix in DXT Analysis --------- Co-authored-by: hammad45 --- drishti/handlers/handle_darshan.py | 117 ++++++++++++++++++++++++++--- drishti/includes/module.py | 88 +++++++++++----------- 2 files changed, 152 insertions(+), 53 deletions(-) diff --git a/drishti/handlers/handle_darshan.py b/drishti/handlers/handle_darshan.py index 6c592ec..d47fbea 100644 --- a/drishti/handlers/handle_darshan.py +++ b/drishti/handlers/handle_darshan.py @@ -81,7 +81,10 @@ def handler(): information = darshanll.log_get_job(log) - log_version = information['metadata']['lib_ver'] + if 'log_ver' in information: + log_version = information['log_ver'] + else: + log_version = information['metadata']['lib_ver'] library_version = darshanll.get_lib_version() # Make sure log format is of the same version @@ -144,6 +147,100 @@ def handler(): df_mpiio = None total_size_mpiio = 0 + + dxt_posix = None + dxt_posix_read_data = None + dxt_posix_write_data = None + dxt_mpiio = None + + df_lustre = None + if "LUSTRE" in report.records: + df_lustre = report.records['LUSTRE'].to_df() + + if args.backtrace: + if "DXT_POSIX" in report.records: + dxt_posix = report.records["DXT_POSIX"].to_df() + dxt_posix = pd.DataFrame(dxt_posix) + if "address_line_mapping" not in dxt_posix: + args.backtrace = False + else: + read_id = [] + read_rank = [] + read_length = [] + read_offsets = [] + read_end_time = [] + read_start_time = [] + read_operation = [] + + write_id = [] + write_rank = [] + write_length = [] + write_offsets = [] + write_end_time = [] + write_start_time = [] + write_operation = [] + + for r in zip(dxt_posix['rank'], dxt_posix['read_segments'], dxt_posix['write_segments'], dxt_posix['id']): + if not r[1].empty: + read_id.append([r[3]] * len((r[1]['length'].to_list()))) + read_rank.append([r[0]] * len((r[1]['length'].to_list()))) + read_length.append(r[1]['length'].to_list()) + read_end_time.append(r[1]['end_time'].to_list()) + read_start_time.append(r[1]['start_time'].to_list()) + read_operation.append(['read'] * len((r[1]['length'].to_list()))) + read_offsets.append(r[1]['offset'].to_list()) + + if not r[2].empty: + write_id.append([r[3]] * len((r[2]['length'].to_list()))) + write_rank.append([r[0]] * len((r[2]['length'].to_list()))) + write_length.append(r[2]['length'].to_list()) + write_end_time.append(r[2]['end_time'].to_list()) + write_start_time.append(r[2]['start_time'].to_list()) + write_operation.append(['write'] * len((r[2]['length'].to_list()))) + write_offsets.append(r[2]['offset'].to_list()) + + read_id = [element for nestedlist in read_id for element in nestedlist] + read_rank = [element for nestedlist in read_rank for element in nestedlist] + read_length = [element for nestedlist in read_length for element in nestedlist] + read_offsets = [element for nestedlist in read_offsets for element in nestedlist] + read_end_time = [element for nestedlist in read_end_time for element in nestedlist] + read_operation = [element for nestedlist in read_operation for element in nestedlist] + read_start_time = [element for nestedlist in read_start_time for element in nestedlist] + + write_id = [element for nestedlist in write_id for element in nestedlist] + write_rank = [element for nestedlist in write_rank for element in nestedlist] + write_length = [element for nestedlist in write_length for element in nestedlist] + write_offsets = [element for nestedlist in write_offsets for element in nestedlist] + write_end_time = [element for nestedlist in write_end_time for element in nestedlist] + write_operation = [element for nestedlist in write_operation for element in nestedlist] + write_start_time = [element for nestedlist in write_start_time for element in nestedlist] + + dxt_posix_read_data = pd.DataFrame( + { + 'id': read_id, + 'rank': read_rank, + 'length': read_length, + 'end_time': read_end_time, + 'start_time': read_start_time, + 'operation': read_operation, + 'offsets': read_offsets, + }) + + dxt_posix_write_data = pd.DataFrame( + { + 'id': write_id, + 'rank': write_rank, + 'length': write_length, + 'end_time': write_end_time, + 'start_time': write_start_time, + 'operation': write_operation, + 'offsets': write_offsets, + }) + + if "DXT_MPIIO" in report.records: + dxt_mpiio = report.records["DXT_MPIIO"].to_df() + dxt_mpiio = pd.DataFrame(dxt_mpiio) + # Since POSIX will capture both POSIX-only accesses and those comming from MPI-IO, we can subtract those if total_size_posix > 0 and total_size_posix >= total_size_mpiio: @@ -262,7 +359,7 @@ def handler(): detected_files.columns = ['id', 'total_reads', 'total_writes'] detected_files.loc[:, 'id'] = detected_files.loc[:, 'id'].astype(str) - check_small_operation(total_reads, total_reads_small, total_writes, total_writes_small, detected_files, modules, file_map, df_posix) + check_small_operation(total_reads, total_reads_small, total_writes, total_writes_small, detected_files, modules, file_map, dxt_posix, dxt_posix_read_data, dxt_posix_write_data) ######################################################################################################################################################################### @@ -271,7 +368,7 @@ def handler(): total_mem_not_aligned = df['counters']['POSIX_MEM_NOT_ALIGNED'].sum() total_file_not_aligned = df['counters']['POSIX_FILE_NOT_ALIGNED'].sum() - check_misaligned(total_operations, total_mem_not_aligned, total_file_not_aligned, modules) + check_misaligned(total_operations, total_mem_not_aligned, total_file_not_aligned, modules, file_map, df_lustre, dxt_posix, dxt_posix_read_data) ######################################################################################################################################################################### @@ -280,7 +377,7 @@ def handler(): max_read_offset = df['counters']['POSIX_MAX_BYTE_READ'].max() max_write_offset = df['counters']['POSIX_MAX_BYTE_WRITTEN'].max() - check_traffic(max_read_offset, total_read_size, max_write_offset, total_written_size) + check_traffic(max_read_offset, total_read_size, max_write_offset, total_written_size, dxt_posix, dxt_posix_read_data, dxt_posix_write_data) ######################################################################################################################################################################### @@ -305,7 +402,7 @@ def handler(): write_random = total_writes - write_consecutive - write_sequential #print('WRITE Random: {} ({:.2f}%)'.format(write_random, write_random / total_writes * 100)) - check_random_operation(read_consecutive, read_sequential, read_random, total_reads, write_consecutive, write_sequential, write_random, total_writes) + check_random_operation(read_consecutive, read_sequential, read_random, total_reads, write_consecutive, write_sequential, write_random, total_writes, dxt_posix, dxt_posix_read_data, dxt_posix_write_data) ######################################################################################################################################################################### @@ -385,7 +482,7 @@ def handler(): column_names = ['id', 'data_imbalance'] detected_files = pd.DataFrame(detected_files, columns=column_names) - check_shared_data_imblance(stragglers_count, detected_files, file_map) + check_shared_data_imblance(stragglers_count, detected_files, file_map, dxt_posix, dxt_posix_read_data, dxt_posix_write_data) # POSIX_F_FASTEST_RANK_TIME # POSIX_F_SLOWEST_RANK_TIME @@ -442,7 +539,7 @@ def handler(): column_names = ['id', 'write_imbalance'] detected_files = pd.DataFrame(detected_files, columns=column_names) - check_individual_write_imbalance(imbalance_count, detected_files, file_map) + check_individual_write_imbalance(imbalance_count, detected_files, file_map, dxt_posix, dxt_posix_write_data) imbalance_count = 0 @@ -458,7 +555,7 @@ def handler(): column_names = ['id', 'read_imbalance'] detected_files = pd.DataFrame(detected_files, columns=column_names) - check_individual_read_imbalance(imbalance_count, detected_files, file_map) + check_individual_read_imbalance(imbalance_count, detected_files, file_map, dxt_posix, dxt_posix_read_data) ######################################################################################################################################################################### @@ -493,7 +590,7 @@ def handler(): column_names = ['id', 'absolute_indep_reads', 'percent_indep_reads'] detected_files = pd.DataFrame(detected_files, columns=column_names) - check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, detected_files, file_map) + check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, detected_files, file_map, dxt_mpiio) df_mpiio_collective_writes = df_mpiio['counters'] #.loc[(df_mpiio['counters']['MPIIO_COLL_WRITES'] > 0)] @@ -518,7 +615,7 @@ def handler(): column_names = ['id', 'absolute_indep_writes', 'percent_indep_writes'] detected_files = pd.DataFrame(detected_files, columns=column_names) - check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, detected_files, file_map) + check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, detected_files, file_map, dxt_mpiio) ######################################################################################################################################################################### diff --git a/drishti/includes/module.py b/drishti/includes/module.py index c5ab9cc..dedaa09 100644 --- a/drishti/includes/module.py +++ b/drishti/includes/module.py @@ -3,6 +3,7 @@ import datetime import csv import time +import pandas as pd from rich import box from rich.syntax import Syntax from drishti.includes.config import * @@ -129,7 +130,7 @@ def check_size_intensive(total_size, total_read_size, total_written_size): ) -def check_small_operation(total_reads, total_reads_small, total_writes, total_writes_small, detected_files, modules, file_map, df_posix=None): +def check_small_operation(total_reads, total_reads_small, total_writes, total_writes_small, detected_files, modules, file_map, dxt_posix=None, dxt_posix_read_data=None, dxt_posix_write_data=None): ''' Check whether application has performed an excessive number of small operations @@ -154,7 +155,7 @@ def check_small_operation(total_reads, total_reads_small, total_writes, total_wr detail = [] recommendation = [] - + file_count = 0 dxt_trigger_time = 0 for index, row in detected_files.iterrows(): @@ -179,17 +180,17 @@ def check_small_operation(total_reads, total_reads_small, total_writes, total_wr if not temp_df.empty: temp_df = temp_df.loc[temp_df['length'] < thresholds['small_requests'][0]] small_read_requests_ranks = temp_df['rank'].unique() - - if int(small_read_requests_ranks[0]) == 0 and len(small_read_requests_ranks) > 1: - rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[1]))] - else: - rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[0]))] + if len(small_read_requests_ranks) > 0: + if len(small_read_requests_ranks) > 1 and int(small_read_requests_ranks[0]) == 0: + rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[1]))] + else: + rank_df = temp.loc[(temp['rank'] == int(small_read_requests_ranks[0]))] - rank_df = rank_df['read_segments'].iloc[0] - rank_addresses = rank_df['stack_memory_addresses'].iloc[0] - address = dxt_posix.iloc[0]['address_line_mapping']['address'] - res = set(list(address)) & set(rank_addresses) - backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] + rank_df = rank_df['read_segments'].iloc[0] + rank_addresses = rank_df['stack_memory_addresses'].iloc[0] + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(rank_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] if len(small_read_requests_ranks) > 0: detail.append( @@ -263,7 +264,7 @@ def check_small_operation(total_reads, total_reads_small, total_writes, total_wr detail = [] recommendation = [] - + file_count = 0 for index, row in detected_files.iterrows(): if row['total_writes'] > (total_writes * thresholds['small_requests'][0] / 2): detail.append( @@ -286,19 +287,19 @@ def check_small_operation(total_reads, total_reads_small, total_writes, total_wr if not temp_df.empty: temp_df = temp_df.loc[temp_df['length'] < thresholds['small_requests'][0]] small_write_requests_ranks = temp_df['rank'].unique() - - if int(small_write_requests_ranks[0]) == 0 and len(small_write_requests_ranks) > 1: - rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[1]))] - else: - rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[0]))] + if len(small_write_requests_ranks) > 0: + if int(small_write_requests_ranks[0]) == 0 and len(small_write_requests_ranks) > 1: + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[1]))] + else: + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[0]))] + + rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[0]))] + rank_df = rank_df['write_segments'].iloc[0] + rank_addresses = rank_df['stack_memory_addresses'].iloc[0] + address = dxt_posix.iloc[0]['address_line_mapping']['address'] + res = set(list(address)) & set(rank_addresses) + backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] - rank_df = temp.loc[(temp['rank'] == int(small_write_requests_ranks[0]))] - rank_df = rank_df['write_segments'].iloc[0] - rank_addresses = rank_df['stack_memory_addresses'].iloc[0] - address = dxt_posix.iloc[0]['address_line_mapping']['address'] - res = set(list(address)) & set(rank_addresses) - backtrace = dxt_posix.iloc[0]['address_line_mapping'].loc[dxt_posix.iloc[0]['address_line_mapping']['address'].isin(res)] - if len(small_write_requests_ranks) > 0: detail.append( { @@ -363,7 +364,7 @@ def check_small_operation(total_reads, total_reads_small, total_writes, total_wr ) -def check_misaligned(total_operations, total_mem_not_aligned, total_file_not_aligned, modules): +def check_misaligned(total_operations, total_mem_not_aligned, total_file_not_aligned, modules, file_map=None, df_lustre=None, dxt_posix=None, dxt_posix_read_data=None): ''' Check whether application has excessive misaligned operations @@ -412,7 +413,6 @@ def check_misaligned(total_operations, total_mem_not_aligned, total_file_not_ali # DXT Analysis if args.backtrace: start = time.time() - df_lustre = report.records['LUSTRE'].to_df() if not df_lustre['counters']['LUSTRE_STRIPE_SIZE'].empty: stripe_size = df_lustre['counters']['LUSTRE_STRIPE_SIZE'].iloc[0] @@ -491,7 +491,7 @@ def check_misaligned(total_operations, total_mem_not_aligned, total_file_not_ali ) -def check_traffic(max_read_offset, total_read_size, max_write_offset, total_written_size): +def check_traffic(max_read_offset, total_read_size, max_write_offset, total_written_size, dxt_posix=None, dxt_posix_read_data=None, dxt_posix_write_data=None): ''' Check whether application has redundant read or write traffic @@ -516,7 +516,7 @@ def check_traffic(max_read_offset, total_read_size, max_write_offset, total_writ if file_count < thresholds['backtrace'][0]: temp = dxt_posix.loc[dxt_posix['id'] == id] - redundant_ranks_ind = -1 + random_ranks_ind = -1 temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == id] updated_offsets = (temp_df["offsets"].to_numpy()).tolist() @@ -528,6 +528,7 @@ def check_traffic(max_read_offset, total_read_size, max_write_offset, total_writ if random_ranks_ind != -1: random_rank = temp_df.iloc[redundant_ranks_ind]['rank'] random_offsets = temp_df.iloc[redundant_ranks_ind]['offsets'] + random_start_time = temp_df.iloc[random_ranks_ind]['start_time'] temp_random_rank = temp.loc[(temp['rank'] == int(random_rank))] temp_random_rank = temp_random_rank['read_segments'].iloc[0] @@ -585,7 +586,7 @@ def check_traffic(max_read_offset, total_read_size, max_write_offset, total_writ if file_count < thresholds['backtrace'][0]: temp = dxt_posix.loc[dxt_posix['id'] == id] - redundant_ranks_ind = -1 + random_ranks_ind = -1 temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == id] updated_offsets = (temp_df["offsets"].to_numpy()).tolist() for i in range(len(updated_offsets)): @@ -596,6 +597,7 @@ def check_traffic(max_read_offset, total_read_size, max_write_offset, total_writ if random_ranks_ind != -1: random_rank = temp_df.iloc[redundant_ranks_ind]['rank'] random_offsets = temp_df.iloc[redundant_ranks_ind]['offsets'] + random_start_time = temp_df.iloc[random_ranks_ind]['start_time'] temp_random_rank = temp.loc[(temp['rank'] == int(random_rank))] temp_random_rank = temp_random_rank['write_segments'].iloc[0] @@ -643,7 +645,7 @@ def check_traffic(max_read_offset, total_read_size, max_write_offset, total_writ ) -def check_random_operation(read_consecutive, read_sequential, read_random, total_reads, write_consecutive, write_sequential, write_random, total_writes): +def check_random_operation(read_consecutive, read_sequential, read_random, total_reads, write_consecutive, write_sequential, write_random, total_writes, dxt_posix=None, dxt_posix_read_data=None, dxt_posix_write_data=None): ''' Check whether application has performed excessive random operations @@ -941,7 +943,7 @@ def check_long_metadata(count_long_metadata, modules): ) -def check_shared_data_imblance(stragglers_count, detected_files, file_map): +def check_shared_data_imblance(stragglers_count, detected_files, file_map, dxt_posix=None, dxt_posix_read_data=None, dxt_posix_write_data=None): ''' Check how many shared files containing data transfer imbalance @@ -977,9 +979,9 @@ def check_shared_data_imblance(stragglers_count, detected_files, file_map): if args.backtrace: start = time.time() if file_count < thresholds['backtrace'][0]: - temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] - temp_df_1 = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(file[0])] - temp_df_2 = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(file[0])] + temp = dxt_posix.loc[dxt_posix['id'] == int(row['id'])] + temp_df_1 = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(row['id'])] + temp_df_2 = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(row['id'])] df_merged = pd.concat([temp_df_1, temp_df_2], ignore_index=True, sort=False) df_merged['duration'] = df_merged['end_time'] - df_merged['start_time'] @@ -1156,7 +1158,7 @@ def check_shared_time_imbalance_split(slowest_rank_time, fastest_rank_time, tota ) -def check_individual_write_imbalance(imbalance_count, detected_files, file_map): +def check_individual_write_imbalance(imbalance_count, detected_files, file_map, dxt_posix=None, dxt_posix_write_data=None): ''' Check how many write imbalance when accessing individual files @@ -1191,8 +1193,8 @@ def check_individual_write_imbalance(imbalance_count, detected_files, file_map): if args.backtrace: start = time.time() if file_count < thresholds['backtrace'][0]: - temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] - temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(file[0])] + temp = dxt_posix.loc[dxt_posix['id'] == int(row['id'])] + temp_df = dxt_posix_write_data.loc[dxt_posix_write_data['id'] == int(row['id'])] maxClm = temp_df['length'].max() temp_df = temp_df.loc[(temp_df['length'] == maxClm)] @@ -1297,7 +1299,7 @@ def check_individual_write_imbalance_split(max_bytes_written, min_bytes_written) ) -def check_individual_read_imbalance(imbalance_count, detected_files, file_map): +def check_individual_read_imbalance(imbalance_count, detected_files, file_map, dxt_posix=None, dxt_posix_read_data=None): ''' Check how many read imbalance when accessing individual files @@ -1332,8 +1334,8 @@ def check_individual_read_imbalance(imbalance_count, detected_files, file_map): if args.backtrace: start = time.time() if file_count < thresholds['backtrace'][0]: - temp = dxt_posix.loc[dxt_posix['id'] == int(file[0])] - temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(file[0])] + temp = dxt_posix.loc[dxt_posix['id'] == int(row['id'])] + temp_df = dxt_posix_read_data.loc[dxt_posix_read_data['id'] == int(row['id'])] maxClm = temp_df['length'].max() temp_df = temp_df.loc[(temp_df['length'] == maxClm)] @@ -1440,7 +1442,7 @@ def check_individual_read_imbalance_split(max_bytes_read, min_bytes_read): # MPIIO level check -def check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, detected_files, file_map): +def check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, detected_files, file_map, dxt_mpiio=None): ''' Check whether application uses collective mpi read calls @@ -1533,7 +1535,7 @@ def check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, tot ) -def check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, detected_files, file_map): +def check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, detected_files, file_map, dxt_mpiio=None): ''' Check whether application uses collective mpi write calls