Skip to content

Commit

Permalink
Update Influenza ref seqs DB to use all Orthomyxoviridae viruses from…
Browse files Browse the repository at this point in the history
… NCBI FTP site
  • Loading branch information
peterk87 committed Jul 7, 2023
1 parent 5bc18fa commit dfab8f5
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 406 deletions.
46 changes: 34 additions & 12 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ on:

env:
NXF_ANSI_LOG: false
# URLs to Influenza ref data should be updated in step with nextflow.config
# default ncbi_influenza_fasta and ncbi_influenza_metadata params
FASTA_ZST_URL: https://api.figshare.com/v2/file/download/41415330
CSV_ZST_URL: https://api.figshare.com/v2/file/download/41415333

concurrency:
group: "${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}"
Expand Down Expand Up @@ -58,23 +62,32 @@ jobs:
make -j2
make install
which seqtk
- name: Cache subsampled influenza.fna.gz
- name: Cache subsampled influenza.fna
uses: actions/cache@v3
id: cache-influenza-fna
with:
path: influenza-10k.fna.gz
path: influenza-10k.fna.zst
key: influenza-fna
- name: Subsample NCBI influenza.fna
if: steps.cache-influenza-fna.outputs.cache-hit != 'true'
run: |
curl --silent -SLk https://ftp.ncbi.nih.gov/genomes/INFLUENZA/influenza.fna.gz > influenza.fna.gz
echo "Subsample 10k seqs from influenza.fna.gz with seqtk"
seqtk sample -s 789 influenza.fna.gz 10000 | gzip -ck > influenza-10k.fna.gz
curl --silent -SLk ${FASTA_ZST_URL} | zstdcat | seqtk sample -s 789 - 10000 | zstd -ck > influenza-10k.fna.zst
- name: Cache influenza.csv
uses: actions/cache@v3
id: cache-influenza-csv
with:
path: influenza.csv.zst
key: influenza-csv
- name: Download influenza.csv
if: steps.cache-influenza-csv.outputs.cache-hit != 'true'
run: |
curl --silent -SLk ${CSV_ZST_URL} > influenza.csv.zst
- name: Run pipeline with test data
run: |
nextflow run ${GITHUB_WORKSPACE} \
-profile test_illumina,docker \
--ncbi_influenza_fasta influenza-10k.fna.gz
--ncbi_influenza_fasta influenza-10k.fna.zst \
--ncbi_influenza_metadata influenza.csv.zst
- name: Upload Artifact
if: success()
uses: actions/upload-artifact@v1.0.0
Expand Down Expand Up @@ -155,25 +168,34 @@ jobs:
echo "ERR6359501-10k,$(realpath reads/ERR6359501-10k.fastq)" | tee -a samplesheet.csv
echo "ERR6359501,$(realpath run1)" | tee -a samplesheet.csv
echo "ERR6359501,$(realpath run2)" | tee -a samplesheet.csv
- name: Cache subsampled influenza.fna.gz
- name: Cache subsampled influenza.fna
uses: actions/cache@v3
id: cache-influenza-fna
with:
path: influenza-10k.fna.gz
path: influenza-10k.fna.zst
key: influenza-fna
- name: Subsample NCBI influenza.fna
if: steps.cache-influenza-fna.outputs.cache-hit != 'true'
run: |
curl --silent -SLk https://ftp.ncbi.nih.gov/genomes/INFLUENZA/influenza.fna.gz > influenza.fna.gz
echo "Subsample 10k seqs from influenza.fna.gz with seqtk"
seqtk sample -s 789 influenza.fna.gz 10000 | gzip -ck > influenza-10k.fna.gz
curl --silent -SLk ${FASTA_ZST_URL} | zstdcat | seqtk sample -s 789 - 10000 | zstd -ck > influenza-10k.fna.zst
- name: Cache influenza.csv
uses: actions/cache@v3
id: cache-influenza-csv
with:
path: influenza.csv.zst
key: influenza-csv
- name: Download influenza.csv
if: steps.cache-influenza-csv.outputs.cache-hit != 'true'
run: |
curl --silent -SLk ${CSV_ZST_URL} > influenza.csv.zst
- name: Run pipeline with test data
run: |
nextflow run ${GITHUB_WORKSPACE} \
-profile test_nanopore,docker \
--platform nanopore \
--input samplesheet.csv \
--ncbi_influenza_fasta influenza-10k.fna.gz
--ncbi_influenza_fasta influenza-10k.fna.zst \
--ncbi_influenza_metadata influenza.csv.zst
- name: Upload pipeline_info/
if: success()
uses: actions/upload-artifact@v1.0.0
Expand Down
103 changes: 54 additions & 49 deletions bin/parse_influenza_blast_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
blast_results_report_columns = [
("sample", "Sample"),
("sample_segment", "Sample Genome Segment Number"),
("accession", "Reference NCBI Accession"),
("subtype", "Reference Subtype"),
("#Accession", "Reference NCBI Accession"),
("Genotype", "Reference Subtype"),
("pident", "BLASTN Percent Identity"),
("length", "BLASTN Alignment Length"),
("mismatch", "BLASTN Mismatches"),
Expand All @@ -75,19 +75,17 @@
("slen", "Reference Sequence Length"),
("qcovs", "Sample Sequence Coverage of Reference Sequence"),
("stitle", "Reference Sequence ID"),
("segment", "Reference Genome Segment Number"),
("virus_name", "Reference Virus Name"),
("host", "Reference Host"),
("country", "Reference Country"),
("date", "Reference Collection Date"),
("age", "Reference Patient Age"),
("gender", "Reference Patient Gender"),
("group_id", "Reference Group ID"),
("Segment", "Reference Genome Segment Number"),
("GenBank_Title", "Reference Virus Name"),
("Host", "Reference Host"),
("Geo_Location", "Reference Geo Location"),
("Collection_Date", "Reference Collection Date"),
("Release_Date", "Reference Release Date"),
]

subtype_results_summary_columns = [
"sample",
"subtype",
"Genotype",
"H_top_accession",
"H_type",
"H_virus_name",
Expand All @@ -100,7 +98,7 @@

columns_H_summary_results = [
"sample",
"subtype",
"Genotype",
"H_top_accession",
"H_NCBI_Influenza_DB_proportion_matches",
"H_NCBI_Influenza_DB_subtype_matches",
Expand All @@ -121,7 +119,7 @@

columns_N_summary_results = [
"sample",
"subtype",
"Genotype",
"N_top_accession",
"N_NCBI_Influenza_DB_proportion_matches",
"N_NCBI_Influenza_DB_subtype_matches",
Expand All @@ -142,7 +140,7 @@

subtype_results_summary_final_names = {
"sample": "Sample",
"subtype": "Subtype Prediction",
"Genotype": "Subtype Prediction",
"N_type": "N: type prediction",
"N_top_accession": "N: top match accession",
"N_virus_name": "N: top match virus name",
Expand Down Expand Up @@ -209,22 +207,22 @@ def parse_blast_result(
f"and Min Alignment length > {min_aln_length}"
)
df_filtered = df_filtered.with_columns([
pl.col('saccver').str.strip().alias("accession"),
pl.col('saccver').str.strip().alias("#Accession"),
pl.lit(sample_name, dtype=pl.Categorical).alias("sample"),
pl.col('qaccver').str.extract(r".+_(\d)$").cast(pl.Categorical).alias("sample_segment"),
pl.col("stitle").str.extract(regex_subtype_pattern).alias("subtype_from_match_title").cast(pl.Categorical)
])
logging.info(
f"{sample_name} | Merging NCBI Influenza DB genome metadata with BLAST results on accession."
)
df_merge = df_filtered.join(df_metadata, on="accession", how="left")
df_merge = df_filtered.join(df_metadata, on="#Accession", how="left")
del df_filtered
del df_metadata
df_merge = df_merge.with_columns(
pl.when(pl.col("subtype").is_null())
pl.when(pl.col("Genotype").is_null())
.then(pl.col("subtype_from_match_title"))
.otherwise(pl.col("subtype"))
.alias("subtype")
.otherwise(pl.col("Genotype"))
.alias("Genotype")
)
df_merge = df_merge.sort(
by=["sample_segment", "bitscore"], descending=[False, True]
Expand All @@ -240,7 +238,7 @@ def parse_blast_result(
subtype_results_summary = {"sample": sample_name}
if not get_top_ref:
is_iav = True
if df_top_seg_matches.select(pl.col("subtype").is_null().all())[0, 0]:
if df_top_seg_matches.select(pl.col("Genotype").is_null().all())[0, 0]:
is_iav = False
H_results = None
N_results = None
Expand All @@ -250,7 +248,7 @@ def parse_blast_result(
if "6" in segments:
N_results = find_h_or_n_type(df_merge, "6", is_iav)
subtype_results_summary.update(N_results)
subtype_results_summary["subtype"] = get_subtype_value(H_results, N_results, is_iav)
subtype_results_summary["Genotype"] = get_subtype_value(H_results, N_results, is_iav)

return df_top_seg_matches, subtype_results_summary

Expand Down Expand Up @@ -296,9 +294,9 @@ def find_h_or_n_type(df_merge, seg, is_iav):
reg_h_or_n_type = "[Nn]"
df_segment = df_merge.filter(pl.col("sample_segment") == seg)
if is_iav:
type_counts = df_segment["subtype"].value_counts(sort=True)
type_counts = type_counts.filter(~pl.col("subtype").is_null())
df_type_counts = type_counts.with_columns(pl.lit(type_counts["subtype"].str.extract(reg_h_or_n_type + r"(\d+)").
type_counts = df_segment["Genotype"].value_counts(sort=True)
type_counts = type_counts.filter(~pl.col("Genotype").is_null())
df_type_counts = type_counts.with_columns(pl.lit(type_counts["Genotype"].str.extract(reg_h_or_n_type + r"(\d+)").
alias(type_name)))
df_type_counts = df_type_counts.filter(~pl.col(type_name).is_null())
logging.debug(f"{df_type_counts}")
Expand All @@ -313,7 +311,7 @@ def find_h_or_n_type(df_merge, seg, is_iav):
f"{h_or_n}{top_type} n={top_type_count}/{total_count} ({top_type_count / total_count:.1%})"
)
df_segment = df_segment.with_columns(
pl.lit(df_segment["subtype"].str.contains(r".*" + reg_h_or_n_type + top_type + r".*")
pl.lit(df_segment["Genotype"].str.contains(r".*" + reg_h_or_n_type + top_type + r".*")
.fill_null(False)
.alias("type_mask")))
df_seg_top_type = df_segment.filter(pl.col("type_mask") == True).drop("type_mask")
Expand All @@ -332,12 +330,12 @@ def find_h_or_n_type(df_merge, seg, is_iav):
f"{h_or_n}_top_gaps": top_result["gapopen"],
f"{h_or_n}_top_bitscore": top_result["bitscore"],
f"{h_or_n}_top_align_length": top_result["length"],
f"{h_or_n}_top_accession": top_result["accession"],
f"{h_or_n}_top_host": top_result["host"],
f"{h_or_n}_top_country": top_result["country"],
f"{h_or_n}_top_date": top_result["date"],
f"{h_or_n}_top_accession": top_result["#Accession"],
f"{h_or_n}_top_host": top_result["Host"],
f"{h_or_n}_top_country": top_result["Geo_Location"],
f"{h_or_n}_top_date": top_result["Collection_Date"],
f"{h_or_n}_top_seq_length": top_result["slen"],
f"{h_or_n}_virus_name": top_result["virus_name"],
f"{h_or_n}_virus_name": top_result["GenBank_Title"],
f"{h_or_n}_NCBI_Influenza_DB_subtype_matches": top_type_count,
f"{h_or_n}_NCBI_Influenza_DB_total_matches": total_count,
f"{h_or_n}_NCBI_Influenza_DB_proportion_matches": top_type_count / total_count if is_iav else "N/A",
Expand Down Expand Up @@ -370,33 +368,33 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
)

logging.info(f'Parsing Influenza metadata file "{flu_metadata}"')

md_cols = [
("accession", str),
("host", pl.Categorical),
("segment", pl.Categorical),
("subtype", str),
("country", pl.Categorical),
("date", pl.Categorical),
("seq_length", pl.UInt16),
("virus_name", pl.Categorical),
("age", pl.Categorical),
("gender", pl.Categorical),
("group_id", pl.Categorical),
("#Accession", str),
("Release_Date", pl.Categorical),
("Genus", pl.Categorical),
("Length", pl.UInt16),
("Genotype", pl.Categorical),
("Segment", pl.Categorical),
("Publications", str),
("Geo_Location", pl.Categorical),
("Host", pl.Categorical),
("Isolation_Source", pl.Categorical),
("Collection_Date", pl.Categorical),
("GenBank_Title", str),
]
df_md = pl.read_csv(
flu_metadata,
has_header=False,
separator="\t",
new_columns=[name for name, _ in md_cols],
has_header=True,
dtypes={name: t for name, t in md_cols},
)

unique_subtypes = df_md.select("subtype").unique()
unique_subtypes = unique_subtypes.filter(~pl.col("subtype").is_null())
unique_subtypes = df_md.select("Genotype").unique()
unique_subtypes = unique_subtypes.filter(~pl.col("Genotype").is_null())
logging.info(
f"Parsed Influenza metadata file into DataFrame with n={df_md.shape[0]} rows and n={df_md.shape[1]} columns. There are {len(unique_subtypes)} unique subtypes. "
)
regex_subtype_pattern = r"\((H\d+N\d+|" + "|".join(list(unique_subtypes["subtype"])) + r")\)"
regex_subtype_pattern = r"\((H\d+N\d+|" + "|".join(list(unique_subtypes["Genotype"])) + r")\)"
results = [
parse_blast_result(blast_result, df_md, regex_subtype_pattern, get_top_ref, top=top,
pident_threshold=pident_threshold,
Expand Down Expand Up @@ -445,8 +443,15 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold,
df_blast = df_blast.rename(
mapping={k: v for k, v in blast_results_report_columns}
)
df_ref_id = df_blast.select(pl.col(['Sample', 'Sample Genome Segment Number',
'Reference NCBI Accession', 'BLASTN Bitscore', 'Reference Sequence ID']))
df_ref_id = df_blast.select(
pl.col([
'Sample',
'Sample Genome Segment Number',
'Reference NCBI Accession',
'BLASTN Bitscore',
'Reference Sequence ID'
])
)
df_ref_id = df_ref_id.with_columns(
pl.when(pl.col("Reference NCBI Accession").is_null())
.then(pl.col("Reference Sequence ID"))
Expand Down
5 changes: 5 additions & 0 deletions conf/base.config
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ process {
maxErrors = '-1'

// Groupable resource requirements for processes
withLabel:process_single {
cpus = 1
memory = { check_max( 100.MB * task.attempt, 'memory' ) }
time = { check_max( 1.h * task.attempt, 'time' ) }
}
withLabel:process_low {
cpus = { check_max( 2 * task.attempt, 'cpus' ) }
memory = { check_max( 4.GB * task.attempt, 'memory' ) }
Expand Down
Loading

0 comments on commit dfab8f5

Please sign in to comment.