Skip to content

Commit

Permalink
Fix #27 by catching NoDataError when trying to parse empty BLAST resu…
Browse files Browse the repository at this point in the history
…lts files in parse_influenza_blast_results.py
  • Loading branch information
peterk87 committed Jul 7, 2023
1 parent 5bc18fa commit efb0a23
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 45 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [[3.2.1](https://github.com/CFIA-NCFAD/nf-flu/releases/tag/3.2.1)] - 2023-07-07

### Fixes

* Empty BLAST results file parsing `NoDataError` (#27) (Thanks @MatFish for reporting this issue!)

## [[3.2.0](https://github.com/CFIA-NCFAD/nf-flu/releases/tag/3.2.0)] - 2023-06-22

### Added
Expand Down
91 changes: 47 additions & 44 deletions bin/parse_influenza_blast_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,27 @@ def parse_blast_result(
top: int = 3,
pident_threshold: float = 0.85,
min_aln_length: int = 50,
) -> Optional[
Tuple[pl.DataFrame, Optional[pl.DataFrame], Optional[pl.DataFrame], Dict]
]:
) -> Optional[Tuple[pl.DataFrame, Dict]]:
logging.info(f"Parsing BLAST results from {blast_result}")

df_filtered = pl.scan_csv(
blast_result,
has_header=False,
separator="\t",
new_columns=[name for name, coltype in blast_cols],
dtypes={name: coltype for name, coltype in blast_cols},
).filter((pl.col("pident") >= (pident_threshold * 100)) &
(pl.col("length") >= min_aln_length)).collect(streaming=True)
try:
df_filtered = (
pl.scan_csv(
blast_result,
has_header=False,
separator="\t",
new_columns=[name for name, coltype in blast_cols],
dtypes=dict(blast_cols),
)
.filter(
(pl.col("pident") >= (pident_threshold * 100))
& (pl.col("length") >= min_aln_length)
)
.collect(streaming=True)
)
except pl.exceptions.NoDataError:
logging.warning(f"No BLAST results found in {blast_result}")
return None

sample_name: str = re.sub(r"^(.+)_\d$", r"\1", df_filtered["qaccver"][0])
logging.info(
Expand Down Expand Up @@ -231,22 +239,21 @@ def parse_blast_result(
)

segments = df_merge["sample_segment"].unique().sort()
dfs = []
for seg in segments:
dfs.append(df_merge.filter(pl.col("sample_segment") == seg).head(top))
dfs = [
df_merge.filter(pl.col("sample_segment") == seg).head(top)
for seg in segments
]
df_top_seg_matches = pl.concat(dfs, how="vertical")
cols = pl.Series([x for x, _ in blast_results_report_columns])
df_top_seg_matches = df_top_seg_matches.select(pl.col(cols))
subtype_results_summary = {"sample": sample_name}
if not get_top_ref:
is_iav = True
if df_top_seg_matches.select(pl.col("subtype").is_null().all())[0, 0]:
is_iav = False
is_iav = not df_top_seg_matches.select(pl.col("subtype").is_null().all())[0, 0]
H_results = None
N_results = None
if "4" in segments:
H_results = find_h_or_n_type(df_merge, "4", is_iav)
subtype_results_summary.update(H_results)
subtype_results_summary |= H_results
if "6" in segments:
N_results = find_h_or_n_type(df_merge, "6", is_iav)
subtype_results_summary.update(N_results)
Expand All @@ -258,27 +265,26 @@ def parse_blast_result(
def get_subtype_value(H_results: Optional[Dict], N_results: Optional[Dict], is_iav: bool) -> str:
subtype = ""
if not is_iav:
subtype = "N/A"
return subtype
return "N/A"
if H_results is None and N_results is None:
subtype = "-"
elif H_results is not None and N_results is None:
H: str = H_results.get("H_type", "")
subtype = f"H{H}" if H != "" else "-"
elif H_results is None and N_results is not None:
elif H_results is None:
N: str = N_results.get("N_type", "")
subtype = f"N{N}" if N != "" else "-"
else:
H: str = H_results.get("H_type", "")
N: str = N_results.get("N_type", "")
if H == "" and N == "":
subtype = "-"
else:
if H or N:
if H != "":
H = f"H{H}"
if N != "":
N = f"N{N}"
subtype = f"{H}{N}"
else:
subtype = "-"
return subtype


Expand All @@ -289,40 +295,40 @@ def find_h_or_n_type(df_merge, seg, is_iav):
], "Can only determine H or N type from segments 4 or 6, respectively!"
type_name = "H_type" if seg == "4" else "N_type"
h_or_n = type_name[0]
reg_h_or_n_type = ""
if h_or_n == "H":
reg_h_or_n_type = "[Hh]"
else:
reg_h_or_n_type = "[Nn]"
df_segment = df_merge.filter(pl.col("sample_segment") == seg)
if is_iav:
type_counts = df_segment["subtype"].value_counts(sort=True)
type_counts = type_counts.filter(~pl.col("subtype").is_null())
reg_h_or_n_type = "[Hh]" if h_or_n == "H" else "[Nn]"
df_type_counts = type_counts.with_columns(pl.lit(type_counts["subtype"].str.extract(reg_h_or_n_type + r"(\d+)").
alias(type_name)))
df_type_counts = df_type_counts.filter(~pl.col(type_name).is_null())
logging.debug(f"{df_type_counts}")
type_to_count = defaultdict(int)
for x in df_type_counts.iter_rows(named=True):
type_to_count[x[type_name]] += x["counts"]
type_to_count = [(h, c) for h, c in type_to_count.items()]
type_to_count = list(type_to_count.items())
type_to_count.sort(key=lambda x: x[1], reverse=True)
top_type, top_type_count = type_to_count[0]
total_count = type_counts["counts"].sum()
logging.info(
f"{h_or_n}{top_type} n={top_type_count}/{total_count} ({top_type_count / total_count:.1%})"
)
df_segment = df_segment.with_columns(
pl.lit(df_segment["subtype"].str.contains(r".*" + reg_h_or_n_type + top_type + r".*")
.fill_null(False)
.alias("type_mask")))
pl.lit(
df_segment["subtype"]
.str.contains(f".*{reg_h_or_n_type}" + top_type + r".*")
.fill_null(False)
.alias("type_mask")
)
)
df_seg_top_type = df_segment.filter(pl.col("type_mask") == True).drop("type_mask")
top_result: pl.Series = [r for r in df_seg_top_type.head(1).iter_rows(named=True)][0]
top_result: pl.Series = list(df_seg_top_type.head(1).iter_rows(named=True))[0]
else:
top_type = "N/A"
top_type_count = "N/A"
total_count = "N/A"
top_result: pl.Series = [r for r in df_segment.head(1).iter_rows(named=True)][0]
top_result: pl.Series = list(df_segment.head(1).iter_rows(named=True))[0]

results_summary = {
f"{h_or_n}_type": top_type if is_iav else "N/A",
Expand Down Expand Up @@ -388,7 +394,7 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
has_header=False,
separator="\t",
new_columns=[name for name, _ in md_cols],
dtypes={name: t for name, t in md_cols},
dtypes=dict(md_cols),
)

unique_subtypes = df_md.select("subtype").unique()
Expand All @@ -414,7 +420,7 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
sample = subtype_results_summary["sample"]
all_subtype_results[sample] = subtype_results_summary
df_all_blast = pd.concat(dfs_blast).rename(
columns={k: v for k, v in blast_results_report_columns}
columns=dict(blast_results_report_columns)
)
df_subtype_results = pd.DataFrame(all_subtype_results).transpose()
cols = pd.Series(subtype_results_summary_columns)
Expand Down Expand Up @@ -442,9 +448,7 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
)
else:
df_blast, subtype_results_summary = results[0]
df_blast = df_blast.rename(
mapping={k: v for k, v in blast_results_report_columns}
)
df_blast = df_blast.rename(mapping=dict(blast_results_report_columns))
df_ref_id = df_blast.select(pl.col(['Sample', 'Sample Genome Segment Number',
'Reference NCBI Accession', 'BLASTN Bitscore', 'Reference Sequence ID']))
df_ref_id = df_ref_id.with_columns(
Expand All @@ -463,10 +467,9 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
def get_col_widths(df, index=False):
"""Calculate column widths based on column headers and contents"""
if index:
idx_max = max(
yield max(
[len(str(s)) for s in df.index.values] + [len(str(df.index.name))]
)
yield idx_max
for c in df.columns:
# get max length of column contents and length of column header
yield np.max([df[c].astype(str).str.len().max() + 1, len(c) + 1])
Expand All @@ -493,7 +496,7 @@ def write_excel(
# fixed max number of characters in sheet name due to compatibility
if sheet_name_index:
max_chars = 28
fixed_sheetname = "{}_{}".format(idx, fixed_sheetname[:max_chars])
fixed_sheetname = f"{idx}_{fixed_sheetname[:max_chars]}"
else:
max_chars = 31
fixed_sheetname = fixed_sheetname[:max_chars]
Expand All @@ -506,7 +509,7 @@ def write_excel(
len(fixed_sheetname),
)

logging.info('Writing table to Excel sheet "{}"'.format(fixed_sheetname))
logging.info(f'Writing table to Excel sheet "{fixed_sheetname}"')
df.to_excel(
writer, sheet_name=fixed_sheetname, index=False, freeze_panes=(1, 1)
)
Expand Down
2 changes: 1 addition & 1 deletion nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ manifest {
description = 'Influenza A virus genome assembly pipeline'
homePage = 'https://github.com/CFIA-NCFAD/nf-flu'
author = 'Peter Kruczkiewicz, Hai Nguyen'
version = '3.2.0'
version = '3.2.1'
nextflowVersion = '>=21.10'
mainScript = 'main.nf'
doi = '10.5281/zenodo.7011213'
Expand Down

0 comments on commit efb0a23

Please sign in to comment.