Skip to content

Commit

Permalink
Fix DXT stack trace integration (#15)
Browse files Browse the repository at this point in the history
* Fix DXT conflict with Recorder
* Fix DXT merge conflict
* Cut redundant code
* Bug Fix in DXT Analysis

---------
Co-authored-by: hammad45 <hather45@gmail.com>
  • Loading branch information
onewbiek authored May 3, 2024
1 parent 0ab1ded commit 3f83dcc
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 53 deletions.
117 changes: 107 additions & 10 deletions drishti/handlers/handle_darshan.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ def handler():

information = darshanll.log_get_job(log)

log_version = information['metadata']['lib_ver']
if 'log_ver' in information:
log_version = information['log_ver']
else:
log_version = information['metadata']['lib_ver']
library_version = darshanll.get_lib_version()

# Make sure log format is of the same version
Expand Down Expand Up @@ -144,6 +147,100 @@ def handler():
df_mpiio = None

total_size_mpiio = 0

dxt_posix = None
dxt_posix_read_data = None
dxt_posix_write_data = None
dxt_mpiio = None

df_lustre = None
if "LUSTRE" in report.records:
df_lustre = report.records['LUSTRE'].to_df()

if args.backtrace:
if "DXT_POSIX" in report.records:
dxt_posix = report.records["DXT_POSIX"].to_df()
dxt_posix = pd.DataFrame(dxt_posix)
if "address_line_mapping" not in dxt_posix:
args.backtrace = False
else:
read_id = []
read_rank = []
read_length = []
read_offsets = []
read_end_time = []
read_start_time = []
read_operation = []

write_id = []
write_rank = []
write_length = []
write_offsets = []
write_end_time = []
write_start_time = []
write_operation = []

for r in zip(dxt_posix['rank'], dxt_posix['read_segments'], dxt_posix['write_segments'], dxt_posix['id']):
if not r[1].empty:
read_id.append([r[3]] * len((r[1]['length'].to_list())))
read_rank.append([r[0]] * len((r[1]['length'].to_list())))
read_length.append(r[1]['length'].to_list())
read_end_time.append(r[1]['end_time'].to_list())
read_start_time.append(r[1]['start_time'].to_list())
read_operation.append(['read'] * len((r[1]['length'].to_list())))
read_offsets.append(r[1]['offset'].to_list())

if not r[2].empty:
write_id.append([r[3]] * len((r[2]['length'].to_list())))
write_rank.append([r[0]] * len((r[2]['length'].to_list())))
write_length.append(r[2]['length'].to_list())
write_end_time.append(r[2]['end_time'].to_list())
write_start_time.append(r[2]['start_time'].to_list())
write_operation.append(['write'] * len((r[2]['length'].to_list())))
write_offsets.append(r[2]['offset'].to_list())

read_id = [element for nestedlist in read_id for element in nestedlist]
read_rank = [element for nestedlist in read_rank for element in nestedlist]
read_length = [element for nestedlist in read_length for element in nestedlist]
read_offsets = [element for nestedlist in read_offsets for element in nestedlist]
read_end_time = [element for nestedlist in read_end_time for element in nestedlist]
read_operation = [element for nestedlist in read_operation for element in nestedlist]
read_start_time = [element for nestedlist in read_start_time for element in nestedlist]

write_id = [element for nestedlist in write_id for element in nestedlist]
write_rank = [element for nestedlist in write_rank for element in nestedlist]
write_length = [element for nestedlist in write_length for element in nestedlist]
write_offsets = [element for nestedlist in write_offsets for element in nestedlist]
write_end_time = [element for nestedlist in write_end_time for element in nestedlist]
write_operation = [element for nestedlist in write_operation for element in nestedlist]
write_start_time = [element for nestedlist in write_start_time for element in nestedlist]

dxt_posix_read_data = pd.DataFrame(
{
'id': read_id,
'rank': read_rank,
'length': read_length,
'end_time': read_end_time,
'start_time': read_start_time,
'operation': read_operation,
'offsets': read_offsets,
})

dxt_posix_write_data = pd.DataFrame(
{
'id': write_id,
'rank': write_rank,
'length': write_length,
'end_time': write_end_time,
'start_time': write_start_time,
'operation': write_operation,
'offsets': write_offsets,
})

if "DXT_MPIIO" in report.records:
dxt_mpiio = report.records["DXT_MPIIO"].to_df()
dxt_mpiio = pd.DataFrame(dxt_mpiio)


# Since POSIX will capture both POSIX-only accesses and those comming from MPI-IO, we can subtract those
if total_size_posix > 0 and total_size_posix >= total_size_mpiio:
Expand Down Expand Up @@ -262,7 +359,7 @@ def handler():
detected_files.columns = ['id', 'total_reads', 'total_writes']
detected_files.loc[:, 'id'] = detected_files.loc[:, 'id'].astype(str)

check_small_operation(total_reads, total_reads_small, total_writes, total_writes_small, detected_files, modules, file_map, df_posix)
check_small_operation(total_reads, total_reads_small, total_writes, total_writes_small, detected_files, modules, file_map, dxt_posix, dxt_posix_read_data, dxt_posix_write_data)

#########################################################################################################################################################################

Expand All @@ -271,7 +368,7 @@ def handler():
total_mem_not_aligned = df['counters']['POSIX_MEM_NOT_ALIGNED'].sum()
total_file_not_aligned = df['counters']['POSIX_FILE_NOT_ALIGNED'].sum()

check_misaligned(total_operations, total_mem_not_aligned, total_file_not_aligned, modules)
check_misaligned(total_operations, total_mem_not_aligned, total_file_not_aligned, modules, file_map, df_lustre, dxt_posix, dxt_posix_read_data)

#########################################################################################################################################################################

Expand All @@ -280,7 +377,7 @@ def handler():
max_read_offset = df['counters']['POSIX_MAX_BYTE_READ'].max()
max_write_offset = df['counters']['POSIX_MAX_BYTE_WRITTEN'].max()

check_traffic(max_read_offset, total_read_size, max_write_offset, total_written_size)
check_traffic(max_read_offset, total_read_size, max_write_offset, total_written_size, dxt_posix, dxt_posix_read_data, dxt_posix_write_data)

#########################################################################################################################################################################

Expand All @@ -305,7 +402,7 @@ def handler():
write_random = total_writes - write_consecutive - write_sequential
#print('WRITE Random: {} ({:.2f}%)'.format(write_random, write_random / total_writes * 100))

check_random_operation(read_consecutive, read_sequential, read_random, total_reads, write_consecutive, write_sequential, write_random, total_writes)
check_random_operation(read_consecutive, read_sequential, read_random, total_reads, write_consecutive, write_sequential, write_random, total_writes, dxt_posix, dxt_posix_read_data, dxt_posix_write_data)

#########################################################################################################################################################################

Expand Down Expand Up @@ -385,7 +482,7 @@ def handler():

column_names = ['id', 'data_imbalance']
detected_files = pd.DataFrame(detected_files, columns=column_names)
check_shared_data_imblance(stragglers_count, detected_files, file_map)
check_shared_data_imblance(stragglers_count, detected_files, file_map, dxt_posix, dxt_posix_read_data, dxt_posix_write_data)

# POSIX_F_FASTEST_RANK_TIME
# POSIX_F_SLOWEST_RANK_TIME
Expand Down Expand Up @@ -442,7 +539,7 @@ def handler():

column_names = ['id', 'write_imbalance']
detected_files = pd.DataFrame(detected_files, columns=column_names)
check_individual_write_imbalance(imbalance_count, detected_files, file_map)
check_individual_write_imbalance(imbalance_count, detected_files, file_map, dxt_posix, dxt_posix_write_data)

imbalance_count = 0

Expand All @@ -458,7 +555,7 @@ def handler():

column_names = ['id', 'read_imbalance']
detected_files = pd.DataFrame(detected_files, columns=column_names)
check_individual_read_imbalance(imbalance_count, detected_files, file_map)
check_individual_read_imbalance(imbalance_count, detected_files, file_map, dxt_posix, dxt_posix_read_data)

#########################################################################################################################################################################

Expand Down Expand Up @@ -493,7 +590,7 @@ def handler():
column_names = ['id', 'absolute_indep_reads', 'percent_indep_reads']
detected_files = pd.DataFrame(detected_files, columns=column_names)

check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, detected_files, file_map)
check_mpi_collective_read_operation(mpiio_coll_reads, mpiio_indep_reads, total_mpiio_read_operations, detected_files, file_map, dxt_mpiio)

df_mpiio_collective_writes = df_mpiio['counters'] #.loc[(df_mpiio['counters']['MPIIO_COLL_WRITES'] > 0)]

Expand All @@ -518,7 +615,7 @@ def handler():
column_names = ['id', 'absolute_indep_writes', 'percent_indep_writes']
detected_files = pd.DataFrame(detected_files, columns=column_names)

check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, detected_files, file_map)
check_mpi_collective_write_operation(mpiio_coll_writes, mpiio_indep_writes, total_mpiio_write_operations, detected_files, file_map, dxt_mpiio)

#########################################################################################################################################################################

Expand Down
Loading

0 comments on commit 3f83dcc

Please sign in to comment.