Skip to content

Commit

Permalink
Merge pull request #28 from peterk87/patch/fix-27-nodataerror-blast
Browse files Browse the repository at this point in the history
Fix #27 NoDataError in parse_influenza_blast_results.py
  • Loading branch information
peterk87 committed Jul 10, 2023
2 parents 7a1b84b + d223e3b commit 080bd84
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 57 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [[3.2.1](https://github.com/CFIA-NCFAD/nf-flu/releases/tag/3.2.1)] - 2023-07-07

### Fixes

* Empty BLAST results file parsing `NoDataError` (#27) (Thanks @MatFish for reporting this issue!)

## [[3.2.0](https://github.com/CFIA-NCFAD/nf-flu/releases/tag/3.2.0)] - 2023-06-22

### Added
Expand Down
114 changes: 58 additions & 56 deletions bin/parse_influenza_blast_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
Segment 6 - neuraminidase (NA) gene
"""

from typing import Dict, List, Optional, Tuple
import re
import logging
import re
from collections import defaultdict
import multiprocessing
from typing import Dict, List, Optional, Tuple

import click
import pandas as pd
import numpy as np
import pandas as pd
import polars as pl
from rich.logging import RichHandler

Expand Down Expand Up @@ -189,19 +188,27 @@ def parse_blast_result(
top: int = 3,
pident_threshold: float = 0.85,
min_aln_length: int = 50,
) -> Optional[
Tuple[pl.DataFrame, Optional[pl.DataFrame], Optional[pl.DataFrame], Dict]
]:
) -> Optional[Tuple[pl.DataFrame, Dict]]:
logging.info(f"Parsing BLAST results from {blast_result}")

df_filtered = pl.scan_csv(
blast_result,
has_header=False,
separator="\t",
new_columns=[name for name, coltype in blast_cols],
dtypes={name: coltype for name, coltype in blast_cols},
).filter((pl.col("pident") >= (pident_threshold * 100)) &
(pl.col("length") >= min_aln_length)).collect(streaming=True)
try:
df_filtered = (
pl.scan_csv(
blast_result,
has_header=False,
separator="\t",
new_columns=[name for name, coltype in blast_cols],
dtypes=dict(blast_cols),
)
.filter(
(pl.col("pident") >= (pident_threshold * 100))
& (pl.col("length") >= min_aln_length)
)
.collect(streaming=True)
)
except pl.exceptions.NoDataError:
logging.warning(f"No BLAST results found in {blast_result}")
return None

sample_name: str = re.sub(r"^(.+)_\d$", r"\1", df_filtered["qaccver"][0])
logging.info(
Expand All @@ -222,31 +229,30 @@ def parse_blast_result(
del df_metadata
df_merge = df_merge.with_columns(
pl.when(pl.col("subtype").is_null())
.then(pl.col("subtype_from_match_title"))
.otherwise(pl.col("subtype"))
.alias("subtype")
.then(pl.col("subtype_from_match_title"))
.otherwise(pl.col("subtype"))
.alias("subtype")
)
df_merge = df_merge.sort(
by=["sample_segment", "bitscore"], descending=[False, True]
)

segments = df_merge["sample_segment"].unique().sort()
dfs = []
for seg in segments:
dfs.append(df_merge.filter(pl.col("sample_segment") == seg).head(top))
dfs = [
df_merge.filter(pl.col("sample_segment") == seg).head(top)
for seg in segments
]
df_top_seg_matches = pl.concat(dfs, how="vertical")
cols = pl.Series([x for x, _ in blast_results_report_columns])
df_top_seg_matches = df_top_seg_matches.select(pl.col(cols))
subtype_results_summary = {"sample": sample_name}
if not get_top_ref:
is_iav = True
if df_top_seg_matches.select(pl.col("subtype").is_null().all())[0, 0]:
is_iav = False
is_iav = not df_top_seg_matches.select(pl.col("subtype").is_null().all())[0, 0]
H_results = None
N_results = None
if "4" in segments:
H_results = find_h_or_n_type(df_merge, "4", is_iav)
subtype_results_summary.update(H_results)
subtype_results_summary |= H_results
if "6" in segments:
N_results = find_h_or_n_type(df_merge, "6", is_iav)
subtype_results_summary.update(N_results)
Expand All @@ -258,27 +264,26 @@ def parse_blast_result(
def get_subtype_value(H_results: Optional[Dict], N_results: Optional[Dict], is_iav: bool) -> str:
subtype = ""
if not is_iav:
subtype = "N/A"
return subtype
return "N/A"
if H_results is None and N_results is None:
subtype = "-"
elif H_results is not None and N_results is None:
H: str = H_results.get("H_type", "")
subtype = f"H{H}" if H != "" else "-"
elif H_results is None and N_results is not None:
elif H_results is None:
N: str = N_results.get("N_type", "")
subtype = f"N{N}" if N != "" else "-"
else:
H: str = H_results.get("H_type", "")
N: str = N_results.get("N_type", "")
if H == "" and N == "":
subtype = "-"
else:
if H or N:
if H != "":
H = f"H{H}"
if N != "":
N = f"N{N}"
subtype = f"{H}{N}"
else:
subtype = "-"
return subtype


Expand All @@ -289,40 +294,40 @@ def find_h_or_n_type(df_merge, seg, is_iav):
], "Can only determine H or N type from segments 4 or 6, respectively!"
type_name = "H_type" if seg == "4" else "N_type"
h_or_n = type_name[0]
reg_h_or_n_type = ""
if h_or_n == "H":
reg_h_or_n_type = "[Hh]"
else:
reg_h_or_n_type = "[Nn]"
df_segment = df_merge.filter(pl.col("sample_segment") == seg)
if is_iav:
type_counts = df_segment["subtype"].value_counts(sort=True)
type_counts = type_counts.filter(~pl.col("subtype").is_null())
reg_h_or_n_type = "[Hh]" if h_or_n == "H" else "[Nn]"
df_type_counts = type_counts.with_columns(pl.lit(type_counts["subtype"].str.extract(reg_h_or_n_type + r"(\d+)").
alias(type_name)))
df_type_counts = df_type_counts.filter(~pl.col(type_name).is_null())
logging.debug(f"{df_type_counts}")
type_to_count = defaultdict(int)
for x in df_type_counts.iter_rows(named=True):
type_to_count[x[type_name]] += x["counts"]
type_to_count = [(h, c) for h, c in type_to_count.items()]
type_to_count = list(type_to_count.items())
type_to_count.sort(key=lambda x: x[1], reverse=True)
top_type, top_type_count = type_to_count[0]
total_count = type_counts["counts"].sum()
logging.info(
f"{h_or_n}{top_type} n={top_type_count}/{total_count} ({top_type_count / total_count:.1%})"
)
df_segment = df_segment.with_columns(
pl.lit(df_segment["subtype"].str.contains(r".*" + reg_h_or_n_type + top_type + r".*")
.fill_null(False)
.alias("type_mask")))
pl.lit(
df_segment["subtype"]
.str.contains(f".*{reg_h_or_n_type}" + top_type + r".*")
.fill_null(False)
.alias("type_mask")
)
)
df_seg_top_type = df_segment.filter(pl.col("type_mask") == True).drop("type_mask")
top_result: pl.Series = [r for r in df_seg_top_type.head(1).iter_rows(named=True)][0]
top_result: pl.Series = list(df_seg_top_type.head(1).iter_rows(named=True))[0]
else:
top_type = "N/A"
top_type_count = "N/A"
total_count = "N/A"
top_result: pl.Series = [r for r in df_segment.head(1).iter_rows(named=True)][0]
top_result: pl.Series = list(df_segment.head(1).iter_rows(named=True))[0]

results_summary = {
f"{h_or_n}_type": top_type if is_iav else "N/A",
Expand Down Expand Up @@ -388,7 +393,7 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
has_header=False,
separator="\t",
new_columns=[name for name, _ in md_cols],
dtypes={name: t for name, t in md_cols},
dtypes=dict(md_cols),
)

unique_subtypes = df_md.select("subtype").unique()
Expand All @@ -414,7 +419,7 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
sample = subtype_results_summary["sample"]
all_subtype_results[sample] = subtype_results_summary
df_all_blast = pd.concat(dfs_blast).rename(
columns={k: v for k, v in blast_results_report_columns}
columns=dict(blast_results_report_columns)
)
df_subtype_results = pd.DataFrame(all_subtype_results).transpose()
cols = pd.Series(subtype_results_summary_columns)
Expand Down Expand Up @@ -442,31 +447,28 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
)
else:
df_blast, subtype_results_summary = results[0]
df_blast = df_blast.rename(
mapping={k: v for k, v in blast_results_report_columns}
)
df_blast = df_blast.rename(mapping=dict(blast_results_report_columns))
df_ref_id = df_blast.select(pl.col(['Sample', 'Sample Genome Segment Number',
'Reference NCBI Accession', 'BLASTN Bitscore', 'Reference Sequence ID']))
df_ref_id = df_ref_id.with_columns(
pl.when(pl.col("Reference NCBI Accession").is_null())
.then(pl.col("Reference Sequence ID"))
.otherwise(pl.col("Reference NCBI Accession"))
.str.strip()
.alias('Reference NCBI Accession')
.then(pl.col("Reference Sequence ID"))
.otherwise(pl.col("Reference NCBI Accession"))
.str.strip()
.alias('Reference NCBI Accession')
)
df_ref_id = df_ref_id.with_columns(
pl.col("Sample Genome Segment Number").apply(lambda x: influenza_segment[int(x)])
.alias("Sample Genome Segment Number"))
.alias("Sample Genome Segment Number"))
df_ref_id.write_csv(sample_name + ".topsegments.csv", separator=",", has_header=True)


def get_col_widths(df, index=False):
"""Calculate column widths based on column headers and contents"""
if index:
idx_max = max(
yield max(
[len(str(s)) for s in df.index.values] + [len(str(df.index.name))]
)
yield idx_max
for c in df.columns:
# get max length of column contents and length of column header
yield np.max([df[c].astype(str).str.len().max() + 1, len(c) + 1])
Expand All @@ -493,7 +495,7 @@ def write_excel(
# fixed max number of characters in sheet name due to compatibility
if sheet_name_index:
max_chars = 28
fixed_sheetname = "{}_{}".format(idx, fixed_sheetname[:max_chars])
fixed_sheetname = f"{idx}_{fixed_sheetname[:max_chars]}"
else:
max_chars = 31
fixed_sheetname = fixed_sheetname[:max_chars]
Expand All @@ -506,7 +508,7 @@ def write_excel(
len(fixed_sheetname),
)

logging.info('Writing table to Excel sheet "{}"'.format(fixed_sheetname))
logging.info(f'Writing table to Excel sheet "{fixed_sheetname}"')
df.to_excel(
writer, sheet_name=fixed_sheetname, index=False, freeze_panes=(1, 1)
)
Expand Down
2 changes: 1 addition & 1 deletion nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ manifest {
description = 'Influenza A virus genome assembly pipeline'
homePage = 'https://github.com/CFIA-NCFAD/nf-flu'
author = 'Peter Kruczkiewicz, Hai Nguyen'
version = '3.2.0'
version = '3.2.1'
nextflowVersion = '>=21.10'
mainScript = 'main.nf'
doi = '10.5281/zenodo.7011213'
Expand Down

0 comments on commit 080bd84

Please sign in to comment.