Skip to content

Commit

Permalink
Merge pull request #44 from CFIA-NCFAD/dev
Browse files Browse the repository at this point in the history
Release 3.3.3
  • Loading branch information
peterk87 committed Aug 17, 2023
2 parents 39b6908 + 3b85079 commit aaeb177
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 97 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [[3.3.3](https://github.com/CFIA-NCFAD/nf-flu/releases/tag/3.3.3)] - 2023-08-16

This release fixes issues with subtype report generation script (`parse_influenza_blast_results.py`), primarily subtype predictions being `N/A` for samples where the top BLAST hits are user-specified sequences for the HA and NA segments.

### Fixes

* subtype prediction based off majority H/N prediction of all BLAST hits instead of just the top X matches (#40)
* the top hit for H/N can also be a user-specified sequence without subtype information
* top segment matches are now sorted by sample name, segment name and BLAST bitscore
* output concatenated Nanopore FASTQ to `${outdir}/fastq` by default (#43)
* Handle ambiguous bases in reference sequences by having Clair3 not convert those positions to N and Bcftools produce a warning instead of an error (#42)

### Changes

* subtyping report results are now ordered in the same order as the input `samplesheet.csv`, that is the order of the samples in the report is the same as the order of the samples in the `samplesheet.csv` file

## [[3.3.2](https://github.com/CFIA-NCFAD/nf-flu/releases/tag/3.3.2)] - 2023-08-03

This patch release fixes an IBV subtype/genotype parsing issue when generating subtyping report using the new metadata format introduced in 3.3.0 ([#32](https://github.com/CFIA-NCFAD/nf-flu/issues/32)).
Expand Down
218 changes: 130 additions & 88 deletions bin/parse_influenza_blast_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@

import logging
import re
import sys
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import click
import numpy as np
import pandas as pd
import polars as pl
from rich.logging import RichHandler
from typing import Dict, List, Optional, Tuple

LOG_FORMAT = "%(asctime)s %(levelname)s: %(message)s [in %(filename)s:%(lineno)d]"
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)

pl.enable_string_cache(True)

influenza_segment = {
SEGMENT_NAMES = {
1: "1_PB2",
2: "2_PB1",
3: "3_PA",
Expand All @@ -35,27 +37,42 @@
8: "8_NS",
}

METADATA_COLUMNS = [
("#Accession", str),
("Release_Date", pl.Categorical),
("Genus", pl.Categorical),
("Length", pl.UInt16),
("Genotype", str),
("Segment", pl.Categorical),
("Publications", str),
("Geo_Location", pl.Categorical),
("Host", pl.Categorical),
("Isolation_Source", pl.Categorical),
("Collection_Date", pl.Categorical),
("GenBank_Title", str),
]

# Column names/types/final report names
blast_cols = [
BLASTN_COLUMNS = [
("qaccver", str),
("saccver", str),
("pident", float),
("length", pl.UInt16),
("mismatch", pl.UInt16),
("gapopen", pl.UInt16),
("qstart", pl.UInt16),
("qend", pl.UInt16),
("sstart", pl.UInt16),
("send", pl.UInt16),
("length", pl.UInt32),
("mismatch", pl.UInt32),
("gapopen", pl.UInt32),
("qstart", pl.UInt32),
("qend", pl.UInt32),
("sstart", pl.UInt32),
("send", pl.UInt32),
("evalue", pl.Float32),
("bitscore", pl.Float32),
("qlen", pl.UInt16),
("slen", pl.UInt16),
("qlen", pl.UInt32),
("slen", pl.UInt32),
("qcovs", pl.Float32),
("stitle", str),
]

blast_results_report_columns = [
BLAST_RESULTS_REPORT_COLUMNS = [
("sample", "Sample"),
("sample_segment", "Sample Genome Segment Number"),
("#Accession", "Reference NCBI Accession"),
Expand Down Expand Up @@ -83,7 +100,7 @@
("Release_Date", "Reference Release Date"),
]

subtype_results_summary_columns = [
SUBTYPE_RESULTS_SUMMARY_COLUMNS = [
"sample",
"Genotype",
"H_top_accession",
Expand All @@ -96,7 +113,7 @@
"N_NCBI_Influenza_DB_proportion_matches",
]

columns_H_summary_results = [
H_COLUMNS = [
"sample",
"Genotype",
"H_top_accession",
Expand All @@ -117,7 +134,7 @@
"H_virus_name",
]

columns_N_summary_results = [
N_COLUMNS = [
"sample",
"Genotype",
"N_top_accession",
Expand All @@ -138,7 +155,7 @@
"N_virus_name",
]

subtype_results_summary_final_names = {
SUBTYPE_RESULTS_SUMMARY_FINAL_NAMES = {
"sample": "Sample",
"Genotype": "Subtype Prediction",
"N_type": "N: type prediction",
Expand Down Expand Up @@ -196,8 +213,8 @@ def parse_blast_result(
blast_result,
has_header=False,
separator="\t",
new_columns=[name for name, coltype in blast_cols],
dtypes=dict(blast_cols),
new_columns=[name for name, coltype in BLASTN_COLUMNS],
dtypes=dict(BLASTN_COLUMNS),
)
.filter(
(pl.col("pident") >= (pident_threshold * 100))
Expand Down Expand Up @@ -242,11 +259,11 @@ def parse_blast_result(
for seg in segments
]
df_top_seg_matches = pl.concat(dfs, how="vertical")
cols = pl.Series([x for x, _ in blast_results_report_columns])
cols = pl.Series([x for x, _ in BLAST_RESULTS_REPORT_COLUMNS])
df_top_seg_matches = df_top_seg_matches.select(pl.col(cols))
subtype_results_summary = {"sample": sample_name}
if not get_top_ref:
df_genotype_genus = df_top_seg_matches.select(pl.col(["Genotype", "Genus"]))
df_genotype_genus = df_merge.select(pl.col(["Genotype", "Genus"]))
# where the genus is not IAV, set the genotype to "Not IAV"
df_genotype_genus = df_genotype_genus.with_columns(
pl.when(pl.col("Genus") == "Alphainfluenzavirus")
Expand Down Expand Up @@ -322,22 +339,12 @@ def find_h_or_n_type(df_merge, seg, is_iav):
logging.info(
f"{h_or_n}{top_type} n={top_type_count}/{total_count} ({top_type_count / total_count:.1%})"
)
df_segment = df_segment.with_columns(
pl.lit(
df_segment["Genotype"]
.str.contains(f".*{reg_h_or_n_type}" + top_type + r".*")
.fill_null(False)
.alias("type_mask")
)
)
df_seg_top_type = df_segment.filter(pl.col("type_mask") == True).drop("type_mask")
top_result: pl.Series = list(df_seg_top_type.head(1).iter_rows(named=True))[0]
else:
top_type = "N/A"
top_type_count = "N/A"
total_count = "N/A"
top_result: pl.Series = list(df_segment.head(1).iter_rows(named=True))[0]

top_result: pl.Series = list(df_segment.head(1).iter_rows(named=True))[0]
results_summary = {
f"{h_or_n}_type": top_type if is_iav else "N/A",
f"{h_or_n}_sample_segment_length": top_result["qlen"],
Expand Down Expand Up @@ -370,6 +377,7 @@ def find_h_or_n_type(df_merge, seg, is_iav):
@click.option('--min-aln-length', default=50, help="Min BLAST alignment length threshold")
@click.option("--get-top-ref", default=False, help="Get top ref accession id from ncbi database.")
@click.option("--sample-name", default="", help="Sample Name.")
@click.option("--samplesheet", default="", help="samplesheet.csv to get order of samples.")
@click.argument("blast_results", nargs=-1)
def report(
flu_metadata,
Expand All @@ -379,9 +387,20 @@ def report(
pident_threshold,
min_aln_length,
get_top_ref,
sample_name
sample_name,
samplesheet,
):
init_logging()
if not blast_results:
logging.error("No BLAST results files specified!")
sys.exit(1)

ordered_samples: Optional[List[str]] = None
if samplesheet:
samplesheet_path = Path(samplesheet)
if samplesheet_path.resolve().exists():
ordered_samples = pl.read_csv(samplesheet_path)['sample'].to_list()
logging.info(f"Using samplesheet to order samples: {ordered_samples}")

logging.info(f'Parsing Influenza metadata file "{flu_metadata}"')

Expand All @@ -407,88 +426,111 @@ def report(
for blast_result in blast_results
]

if not get_top_ref:
if get_top_ref:
df_top_seg_matches, subtype_results_summary = results[0]
write_top_segment_matches(df_top_seg_matches, subtype_results_summary, sample_name)
else:
dfs_blast = []
all_subtype_results = {}
for parsed_result in results:
if parsed_result is None:
continue
df_blast, subtype_results_summary = parsed_result
if df_blast is not None:
dfs_blast.append(df_blast.to_pandas())
dfs_blast.append(df_blast)
sample = subtype_results_summary["sample"]
all_subtype_results[sample] = subtype_results_summary
df_all_blast = pd.concat(dfs_blast).rename(
columns=dict(blast_results_report_columns)
)
df_all_blast = pl.concat(dfs_blast, how='vertical')
df_subtype_results = pd.DataFrame(all_subtype_results).transpose()
cols = pd.Series(subtype_results_summary_columns)
ordered_sample_to_idx = {sample: idx for idx, sample in enumerate(ordered_samples)} if ordered_samples else None

cols = pd.Series(SUBTYPE_RESULTS_SUMMARY_COLUMNS)
cols = cols[cols.isin(df_subtype_results.columns)]
df_subtype_predictions = df_subtype_results[cols].rename(
columns=subtype_results_summary_final_names
)
cols = pd.Series(columns_H_summary_results)
df_subtype_results = df_subtype_results[cols]

if ordered_samples and ordered_sample_to_idx:
df_subtype_results = df_subtype_results.sort_values(
'sample',
ascending=True,
key=lambda x: x.map(ordered_sample_to_idx)
)
else:
df_subtype_results = df_subtype_results.sort_values('sample', ascending=True)
cols = pd.Series(H_COLUMNS)
cols = cols[cols.isin(df_subtype_results.columns)]
df_H = df_subtype_results[cols].rename(columns=subtype_results_summary_final_names)
cols = pd.Series(columns_N_summary_results)
df_H = df_subtype_results[cols]
cols = pd.Series(N_COLUMNS)
cols = cols[cols.isin(df_subtype_results.columns)]
df_N = df_subtype_results[cols].rename(columns=subtype_results_summary_final_names)
# Add segment name for more informative
df_all_blast["Sample Genome Segment Number"] = df_all_blast["Sample Genome Segment Number"]. \
apply(lambda x: influenza_segment[int(x)])
df_N = df_subtype_results[cols]

# cast all categorical columns to string so that they can be sorted/ordered in a sensible way
df_all_blast = df_all_blast.with_columns(
pl.col(pl.Categorical).cast(pl.Utf8),
)
# apply custom sort order to sample column if samplesheet is provided
sample_col = pl.col('sample').map_dict(
ordered_sample_to_idx) if ordered_samples and ordered_sample_to_idx else pl.col('sample')
# Sort by sample names, segment numbers and bitscore
df_all_blast = df_all_blast.sort(
sample_col,
pl.col('sample_segment'),
pl.col('bitscore'),
descending=[False, False, True]
)
df_all_blast_pandas: pd.DataFrame = df_all_blast.to_pandas()
# Convert segment number to segment name (1 -> "1_PB2")
df_all_blast_pandas["sample_segment"] = df_all_blast_pandas["sample_segment"]. \
apply(lambda x: SEGMENT_NAMES[int(x)])
# Rename columns to more human-readable names
df_all_blast_pandas.rename(
columns=dict(BLAST_RESULTS_REPORT_COLUMNS)
)
df_subtype_predictions = df_subtype_results[cols].rename(
columns=SUBTYPE_RESULTS_SUMMARY_FINAL_NAMES
)
df_H = df_H.rename(columns=SUBTYPE_RESULTS_SUMMARY_FINAL_NAMES)
df_N = df_N.rename(columns=SUBTYPE_RESULTS_SUMMARY_FINAL_NAMES)
# Write each dataframe to a separate sheet in the excel report
write_excel(
[
("Subtype Predictions", df_subtype_predictions),
("Top Segment Matches", df_all_blast),
("Top Segment Matches", df_all_blast_pandas),
("H Segment Results", df_H),
("N Segment Results", df_N),
],
output_dest=excel_report,
)
else:
df_blast, subtype_results_summary = results[0]
df_blast = df_blast.rename(mapping=dict(blast_results_report_columns))
df_ref_id = df_blast.select(
pl.col([
'Sample',
'Sample Genome Segment Number',
'Reference NCBI Accession',
'BLASTN Bitscore',
'Reference Sequence ID'
])
)
df_ref_id = df_ref_id.with_columns(
pl.when(pl.col("Reference NCBI Accession").is_null())
.then(pl.col("Reference Sequence ID"))
.otherwise(pl.col("Reference NCBI Accession"))
.str.strip()
.alias('Reference NCBI Accession')
)
df_ref_id = df_ref_id.with_columns(
pl.col("Sample Genome Segment Number").apply(lambda x: influenza_segment[int(x)])
.alias("Sample Genome Segment Number"))
df_ref_id.write_csv(sample_name + ".topsegments.csv", separator=",", has_header=True)


def write_top_segment_matches(df_top_seg_matches, subtype_results_summary, sample_name):
df_blast = df_top_seg_matches.rename(mapping=dict(BLAST_RESULTS_REPORT_COLUMNS))
df_ref_id = df_blast.select(
pl.col([
'Sample',
'Sample Genome Segment Number',
'Reference NCBI Accession',
'BLASTN Bitscore',
'Reference Sequence ID'
])
)
df_ref_id = df_ref_id.with_columns(
pl.when(pl.col("Reference NCBI Accession").is_null())
.then(pl.col("Reference Sequence ID"))
.otherwise(pl.col("Reference NCBI Accession"))
.str.strip()
.alias('Reference NCBI Accession')
)
df_ref_id = df_ref_id.with_columns(
pl.col("Sample Genome Segment Number").apply(lambda x: SEGMENT_NAMES[int(x)])
.alias("Sample Genome Segment Number"))
df_ref_id.write_csv(sample_name + ".topsegments.csv", separator=",", has_header=True)


def read_refseq_metadata(flu_metadata):
md_cols = [
("#Accession", str),
("Release_Date", pl.Categorical),
("Genus", pl.Categorical),
("Length", pl.UInt16),
("Genotype", str),
("Segment", pl.Categorical),
("Publications", str),
("Geo_Location", pl.Categorical),
("Host", pl.Categorical),
("Isolation_Source", pl.Categorical),
("Collection_Date", pl.Categorical),
("GenBank_Title", str),
]
return pl.read_csv(
flu_metadata,
has_header=True,
dtypes=dict(md_cols),
dtypes=dict(METADATA_COLUMNS),
)


Expand Down
Loading

0 comments on commit aaeb177

Please sign in to comment.