From dfab8f55bdb4495d8473f85eb197c85af85cec13 Mon Sep 17 00:00:00 2001 From: Peter Kruczkiewicz Date: Fri, 7 Jul 2023 12:31:21 -0500 Subject: [PATCH] Update Influenza ref seqs DB to use all Orthomyxoviridae viruses from NCBI FTP site --- .github/workflows/ci.yml | 46 ++- bin/parse_influenza_blast_results.py | 103 +++--- conf/base.config | 5 + conf/modules_illumina.config | 120 ++++--- conf/modules_nanopore.config | 500 ++++++++++++++------------- modules/local/misc.nf | 34 -- modules/local/zstd_decompress.nf | 30 ++ nextflow.config | 4 +- workflows/illumina.nf | 21 +- workflows/nanopore.nf | 16 +- 10 files changed, 473 insertions(+), 406 deletions(-) create mode 100644 modules/local/zstd_decompress.nf diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 10d33ae..b06d54d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,10 @@ on: env: NXF_ANSI_LOG: false + # URLs to Influenza ref data should be updated in step with nextflow.config + # default ncbi_influenza_fasta and ncbi_influenza_metadata params + FASTA_ZST_URL: https://api.figshare.com/v2/file/download/41415330 + CSV_ZST_URL: https://api.figshare.com/v2/file/download/41415333 concurrency: group: "${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}" @@ -58,23 +62,32 @@ jobs: make -j2 make install which seqtk - - name: Cache subsampled influenza.fna.gz + - name: Cache subsampled influenza.fna uses: actions/cache@v3 id: cache-influenza-fna with: - path: influenza-10k.fna.gz + path: influenza-10k.fna.zst key: influenza-fna - name: Subsample NCBI influenza.fna if: steps.cache-influenza-fna.outputs.cache-hit != 'true' run: | - curl --silent -SLk https://ftp.ncbi.nih.gov/genomes/INFLUENZA/influenza.fna.gz > influenza.fna.gz - echo "Subsample 10k seqs from influenza.fna.gz with seqtk" - seqtk sample -s 789 influenza.fna.gz 10000 | gzip -ck > influenza-10k.fna.gz + curl --silent -SLk ${FASTA_ZST_URL} | zstdcat | seqtk sample -s 789 - 10000 | zstd -ck > influenza-10k.fna.zst + - name: Cache influenza.csv + uses: actions/cache@v3 + id: cache-influenza-csv + with: + path: influenza.csv.zst + key: influenza-csv + - name: Download influenza.csv + if: steps.cache-influenza-csv.outputs.cache-hit != 'true' + run: | + curl --silent -SLk ${CSV_ZST_URL} > influenza.csv.zst - name: Run pipeline with test data run: | nextflow run ${GITHUB_WORKSPACE} \ -profile test_illumina,docker \ - --ncbi_influenza_fasta influenza-10k.fna.gz + --ncbi_influenza_fasta influenza-10k.fna.zst \ + --ncbi_influenza_metadata influenza.csv.zst - name: Upload Artifact if: success() uses: actions/upload-artifact@v1.0.0 @@ -155,25 +168,34 @@ jobs: echo "ERR6359501-10k,$(realpath reads/ERR6359501-10k.fastq)" | tee -a samplesheet.csv echo "ERR6359501,$(realpath run1)" | tee -a samplesheet.csv echo "ERR6359501,$(realpath run2)" | tee -a samplesheet.csv - - name: Cache subsampled influenza.fna.gz + - name: Cache subsampled influenza.fna uses: actions/cache@v3 id: cache-influenza-fna with: - path: influenza-10k.fna.gz + path: influenza-10k.fna.zst key: influenza-fna - name: Subsample NCBI influenza.fna if: steps.cache-influenza-fna.outputs.cache-hit != 'true' run: | - curl --silent -SLk https://ftp.ncbi.nih.gov/genomes/INFLUENZA/influenza.fna.gz > influenza.fna.gz - echo "Subsample 10k seqs from influenza.fna.gz with seqtk" - seqtk sample -s 789 influenza.fna.gz 10000 | gzip -ck > influenza-10k.fna.gz + curl --silent -SLk ${FASTA_ZST_URL} | zstdcat | seqtk sample -s 789 - 10000 | zstd -ck > influenza-10k.fna.zst + - name: Cache influenza.csv + uses: actions/cache@v3 + id: cache-influenza-csv + with: + path: influenza.csv.zst + key: influenza-csv + - name: Download influenza.csv + if: steps.cache-influenza-csv.outputs.cache-hit != 'true' + run: | + curl --silent -SLk ${CSV_ZST_URL} > influenza.csv.zst - name: Run pipeline with test data run: | nextflow run ${GITHUB_WORKSPACE} \ -profile test_nanopore,docker \ --platform nanopore \ --input samplesheet.csv \ - --ncbi_influenza_fasta influenza-10k.fna.gz + --ncbi_influenza_fasta influenza-10k.fna.zst \ + --ncbi_influenza_metadata influenza.csv.zst - name: Upload pipeline_info/ if: success() uses: actions/upload-artifact@v1.0.0 diff --git a/bin/parse_influenza_blast_results.py b/bin/parse_influenza_blast_results.py index 90881b7..c2bdc6c 100755 --- a/bin/parse_influenza_blast_results.py +++ b/bin/parse_influenza_blast_results.py @@ -59,8 +59,8 @@ blast_results_report_columns = [ ("sample", "Sample"), ("sample_segment", "Sample Genome Segment Number"), - ("accession", "Reference NCBI Accession"), - ("subtype", "Reference Subtype"), + ("#Accession", "Reference NCBI Accession"), + ("Genotype", "Reference Subtype"), ("pident", "BLASTN Percent Identity"), ("length", "BLASTN Alignment Length"), ("mismatch", "BLASTN Mismatches"), @@ -75,19 +75,17 @@ ("slen", "Reference Sequence Length"), ("qcovs", "Sample Sequence Coverage of Reference Sequence"), ("stitle", "Reference Sequence ID"), - ("segment", "Reference Genome Segment Number"), - ("virus_name", "Reference Virus Name"), - ("host", "Reference Host"), - ("country", "Reference Country"), - ("date", "Reference Collection Date"), - ("age", "Reference Patient Age"), - ("gender", "Reference Patient Gender"), - ("group_id", "Reference Group ID"), + ("Segment", "Reference Genome Segment Number"), + ("GenBank_Title", "Reference Virus Name"), + ("Host", "Reference Host"), + ("Geo_Location", "Reference Geo Location"), + ("Collection_Date", "Reference Collection Date"), + ("Release_Date", "Reference Release Date"), ] subtype_results_summary_columns = [ "sample", - "subtype", + "Genotype", "H_top_accession", "H_type", "H_virus_name", @@ -100,7 +98,7 @@ columns_H_summary_results = [ "sample", - "subtype", + "Genotype", "H_top_accession", "H_NCBI_Influenza_DB_proportion_matches", "H_NCBI_Influenza_DB_subtype_matches", @@ -121,7 +119,7 @@ columns_N_summary_results = [ "sample", - "subtype", + "Genotype", "N_top_accession", "N_NCBI_Influenza_DB_proportion_matches", "N_NCBI_Influenza_DB_subtype_matches", @@ -142,7 +140,7 @@ subtype_results_summary_final_names = { "sample": "Sample", - "subtype": "Subtype Prediction", + "Genotype": "Subtype Prediction", "N_type": "N: type prediction", "N_top_accession": "N: top match accession", "N_virus_name": "N: top match virus name", @@ -209,7 +207,7 @@ def parse_blast_result( f"and Min Alignment length > {min_aln_length}" ) df_filtered = df_filtered.with_columns([ - pl.col('saccver').str.strip().alias("accession"), + pl.col('saccver').str.strip().alias("#Accession"), pl.lit(sample_name, dtype=pl.Categorical).alias("sample"), pl.col('qaccver').str.extract(r".+_(\d)$").cast(pl.Categorical).alias("sample_segment"), pl.col("stitle").str.extract(regex_subtype_pattern).alias("subtype_from_match_title").cast(pl.Categorical) @@ -217,14 +215,14 @@ def parse_blast_result( logging.info( f"{sample_name} | Merging NCBI Influenza DB genome metadata with BLAST results on accession." ) - df_merge = df_filtered.join(df_metadata, on="accession", how="left") + df_merge = df_filtered.join(df_metadata, on="#Accession", how="left") del df_filtered del df_metadata df_merge = df_merge.with_columns( - pl.when(pl.col("subtype").is_null()) + pl.when(pl.col("Genotype").is_null()) .then(pl.col("subtype_from_match_title")) - .otherwise(pl.col("subtype")) - .alias("subtype") + .otherwise(pl.col("Genotype")) + .alias("Genotype") ) df_merge = df_merge.sort( by=["sample_segment", "bitscore"], descending=[False, True] @@ -240,7 +238,7 @@ def parse_blast_result( subtype_results_summary = {"sample": sample_name} if not get_top_ref: is_iav = True - if df_top_seg_matches.select(pl.col("subtype").is_null().all())[0, 0]: + if df_top_seg_matches.select(pl.col("Genotype").is_null().all())[0, 0]: is_iav = False H_results = None N_results = None @@ -250,7 +248,7 @@ def parse_blast_result( if "6" in segments: N_results = find_h_or_n_type(df_merge, "6", is_iav) subtype_results_summary.update(N_results) - subtype_results_summary["subtype"] = get_subtype_value(H_results, N_results, is_iav) + subtype_results_summary["Genotype"] = get_subtype_value(H_results, N_results, is_iav) return df_top_seg_matches, subtype_results_summary @@ -296,9 +294,9 @@ def find_h_or_n_type(df_merge, seg, is_iav): reg_h_or_n_type = "[Nn]" df_segment = df_merge.filter(pl.col("sample_segment") == seg) if is_iav: - type_counts = df_segment["subtype"].value_counts(sort=True) - type_counts = type_counts.filter(~pl.col("subtype").is_null()) - df_type_counts = type_counts.with_columns(pl.lit(type_counts["subtype"].str.extract(reg_h_or_n_type + r"(\d+)"). + type_counts = df_segment["Genotype"].value_counts(sort=True) + type_counts = type_counts.filter(~pl.col("Genotype").is_null()) + df_type_counts = type_counts.with_columns(pl.lit(type_counts["Genotype"].str.extract(reg_h_or_n_type + r"(\d+)"). alias(type_name))) df_type_counts = df_type_counts.filter(~pl.col(type_name).is_null()) logging.debug(f"{df_type_counts}") @@ -313,7 +311,7 @@ def find_h_or_n_type(df_merge, seg, is_iav): f"{h_or_n}{top_type} n={top_type_count}/{total_count} ({top_type_count / total_count:.1%})" ) df_segment = df_segment.with_columns( - pl.lit(df_segment["subtype"].str.contains(r".*" + reg_h_or_n_type + top_type + r".*") + pl.lit(df_segment["Genotype"].str.contains(r".*" + reg_h_or_n_type + top_type + r".*") .fill_null(False) .alias("type_mask"))) df_seg_top_type = df_segment.filter(pl.col("type_mask") == True).drop("type_mask") @@ -332,12 +330,12 @@ def find_h_or_n_type(df_merge, seg, is_iav): f"{h_or_n}_top_gaps": top_result["gapopen"], f"{h_or_n}_top_bitscore": top_result["bitscore"], f"{h_or_n}_top_align_length": top_result["length"], - f"{h_or_n}_top_accession": top_result["accession"], - f"{h_or_n}_top_host": top_result["host"], - f"{h_or_n}_top_country": top_result["country"], - f"{h_or_n}_top_date": top_result["date"], + f"{h_or_n}_top_accession": top_result["#Accession"], + f"{h_or_n}_top_host": top_result["Host"], + f"{h_or_n}_top_country": top_result["Geo_Location"], + f"{h_or_n}_top_date": top_result["Collection_Date"], f"{h_or_n}_top_seq_length": top_result["slen"], - f"{h_or_n}_virus_name": top_result["virus_name"], + f"{h_or_n}_virus_name": top_result["GenBank_Title"], f"{h_or_n}_NCBI_Influenza_DB_subtype_matches": top_type_count, f"{h_or_n}_NCBI_Influenza_DB_total_matches": total_count, f"{h_or_n}_NCBI_Influenza_DB_proportion_matches": top_type_count / total_count if is_iav else "N/A", @@ -370,33 +368,33 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold, ) logging.info(f'Parsing Influenza metadata file "{flu_metadata}"') + md_cols = [ - ("accession", str), - ("host", pl.Categorical), - ("segment", pl.Categorical), - ("subtype", str), - ("country", pl.Categorical), - ("date", pl.Categorical), - ("seq_length", pl.UInt16), - ("virus_name", pl.Categorical), - ("age", pl.Categorical), - ("gender", pl.Categorical), - ("group_id", pl.Categorical), + ("#Accession", str), + ("Release_Date", pl.Categorical), + ("Genus", pl.Categorical), + ("Length", pl.UInt16), + ("Genotype", pl.Categorical), + ("Segment", pl.Categorical), + ("Publications", str), + ("Geo_Location", pl.Categorical), + ("Host", pl.Categorical), + ("Isolation_Source", pl.Categorical), + ("Collection_Date", pl.Categorical), + ("GenBank_Title", str), ] df_md = pl.read_csv( flu_metadata, - has_header=False, - separator="\t", - new_columns=[name for name, _ in md_cols], + has_header=True, dtypes={name: t for name, t in md_cols}, ) - unique_subtypes = df_md.select("subtype").unique() - unique_subtypes = unique_subtypes.filter(~pl.col("subtype").is_null()) + unique_subtypes = df_md.select("Genotype").unique() + unique_subtypes = unique_subtypes.filter(~pl.col("Genotype").is_null()) logging.info( f"Parsed Influenza metadata file into DataFrame with n={df_md.shape[0]} rows and n={df_md.shape[1]} columns. There are {len(unique_subtypes)} unique subtypes. " ) - regex_subtype_pattern = r"\((H\d+N\d+|" + "|".join(list(unique_subtypes["subtype"])) + r")\)" + regex_subtype_pattern = r"\((H\d+N\d+|" + "|".join(list(unique_subtypes["Genotype"])) + r")\)" results = [ parse_blast_result(blast_result, df_md, regex_subtype_pattern, get_top_ref, top=top, pident_threshold=pident_threshold, @@ -445,8 +443,15 @@ def report(flu_metadata, blast_results, excel_report, top, pident_threshold, df_blast = df_blast.rename( mapping={k: v for k, v in blast_results_report_columns} ) - df_ref_id = df_blast.select(pl.col(['Sample', 'Sample Genome Segment Number', - 'Reference NCBI Accession', 'BLASTN Bitscore', 'Reference Sequence ID'])) + df_ref_id = df_blast.select( + pl.col([ + 'Sample', + 'Sample Genome Segment Number', + 'Reference NCBI Accession', + 'BLASTN Bitscore', + 'Reference Sequence ID' + ]) + ) df_ref_id = df_ref_id.with_columns( pl.when(pl.col("Reference NCBI Accession").is_null()) .then(pl.col("Reference Sequence ID")) diff --git a/conf/base.config b/conf/base.config index 8e75895..e7e0f06 100644 --- a/conf/base.config +++ b/conf/base.config @@ -16,6 +16,11 @@ process { maxErrors = '-1' // Groupable resource requirements for processes + withLabel:process_single { + cpus = 1 + memory = { check_max( 100.MB * task.attempt, 'memory' ) } + time = { check_max( 1.h * task.attempt, 'time' ) } + } withLabel:process_low { cpus = { check_max( 2 * task.attempt, 'cpus' ) } memory = { check_max( 4.GB * task.attempt, 'memory' ) } diff --git a/conf/modules_illumina.config b/conf/modules_illumina.config index d4f35d7..d6c7c65 100644 --- a/conf/modules_illumina.config +++ b/conf/modules_illumina.config @@ -1,63 +1,69 @@ - +// Illumina subworkflow process configuration process { - withName: 'IRMA' { - publishDir = [ - [ - path: { "${params.outdir}/irma"}, - mode: params.publish_dir_mode - ], - [ - path: { "${params.outdir}/consensus/irma/" }, - pattern: "*.consensus.fasta", - mode: params.publish_dir_mode - ] - ] - } + withName: 'IRMA' { + publishDir = [ + [ + path: { "${params.outdir}/irma"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ], + [ + path: { "${params.outdir}/consensus/irma/" }, + pattern: "*.consensus.fasta", + mode: params.publish_dir_mode + ] + ] + } - withName: 'BLAST_MAKEBLASTDB' { - ext.args = '-dbtype nucl' - publishDir = [ - [ - path: { "${params.outdir}/blast"}, - mode: params.publish_dir_mode - ] - ] - } + withName: 'BLAST_MAKEBLASTDB' { + ext.args = '-dbtype nucl' + publishDir = [ + [ + path: { "${params.outdir}/blast/db"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'BLAST_BLASTN' { - ext.args = '-outfmt "6 qaccver saccver pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen slen qcovs stitle" -num_alignments 1000000 -evalue 1e-6' - publishDir = [ - [ - path: { "${params.outdir}/blast"}, - mode: params.publish_dir_mode - ] - ] - } + withName: 'BLAST_BLASTN' { + ext.args = '-outfmt "6 qaccver saccver pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen slen qcovs stitle" -num_alignments 1000000 -evalue 1e-6' + publishDir = [ + [ + path: { "${params.outdir}/blast"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } + withName: 'SUBTYPING_REPORT' { + publishDir = [ + [ + path: { "${params.outdir}/"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'SUBTYPING_REPORT' { - publishDir = [ - [ - path: { "${params.outdir}/"}, - mode: params.publish_dir_mode - ] - ] - } + withName: 'ZSTD_DECOMPRESS_.*' { + publishDir = [ + [ + path: { "${params.outdir}/ncbi-influenza-db"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'GUNZIP_NCBI_FLU_FASTA' { - publishDir = [ - [ - path: { "${params.outdir}/flu_fasta"}, - mode: params.publish_dir_mode - ] - ] - } - withName: 'CAT_ILLUMINA_FASTQ' { - publishDir = [ - [ - path: { "${params.outdir}/fastq"}, - mode: params.publish_dir_mode - ] - ] - } -} \ No newline at end of file + withName: 'CAT_ILLUMINA_FASTQ' { + publishDir = [ + [ + path: { "${params.outdir}/fastq"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } +} diff --git a/conf/modules_nanopore.config b/conf/modules_nanopore.config index 4ae1c19..df0b951 100644 --- a/conf/modules_nanopore.config +++ b/conf/modules_nanopore.config @@ -1,266 +1,284 @@ - +// Nanopore subworkflow process configuration process { - withName: 'IRMA' { - publishDir = [ - [ - path: { "${params.outdir}/irma"}, - mode: params.publish_dir_mode - ], - [ - path: { "${params.outdir}/consensus/irma/" }, - pattern: "*.irma.consensus.fasta", - mode: params.publish_dir_mode - ] - ] - } + withName: 'IRMA' { + publishDir = [ + [ + path: { "${params.outdir}/irma"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ], + [ + path: { "${params.outdir}/consensus/irma/" }, + pattern: "*.irma.consensus.fasta", + mode: params.publish_dir_mode + ] + ] + } + + withName: 'BLAST_MAKEBLASTDB_NCBI' { + ext.args = '-dbtype nucl' + publishDir = [ + [ + path: { "${params.outdir}/blast/db/ncbi" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'BLAST_MAKEBLASTDB_NCBI' { - ext.args = '-dbtype nucl' - publishDir = [ - [ - path: { "${params.outdir}/blast/db/ncbi" }, - mode: params.publish_dir_mode - ] - ] - } - withName: 'BLAST_MAKEBLASTDB_REFDB' { - ext.args = '-dbtype nucl' - publishDir = [ - [ - path: { "${params.outdir}/blast/db/ref_db" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'BLAST_MAKEBLASTDB_REFDB' { + ext.args = '-dbtype nucl' + publishDir = [ + [ + path: { "${params.outdir}/blast/db/ref_db" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'BLAST_BLASTN_IRMA' { - ext.args = '-outfmt "6 qaccver saccver pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen slen qcovs stitle" -num_alignments 1000000 -evalue 1e-6' - publishDir = [ - [ - path: { "${params.outdir}/blast/blastn/irma" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'BLAST_BLASTN_IRMA' { + ext.args = '-outfmt "6 qaccver saccver pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen slen qcovs stitle" -num_alignments 1000000 -evalue 1e-6' + publishDir = [ + [ + path: { "${params.outdir}/blast/blastn/irma" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'BLAST_BLASTN_CONSENSUS' { - ext.args = '-outfmt "6 qaccver saccver pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen slen qcovs stitle" -num_alignments 1000000 -evalue 1e-6' - publishDir = [ - [ - path: { "${params.outdir}/blast/blastn/consensus" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'BLAST_BLASTN_CONSENSUS' { + ext.args = '-outfmt "6 qaccver saccver pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen slen qcovs stitle" -num_alignments 1000000 -evalue 1e-6' + publishDir = [ + [ + path: { "${params.outdir}/blast/blastn/consensus" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'BLAST_BLASTN_CONSENSUS_REF_DB' { - ext.args = '-outfmt "6 qaccver saccver pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen slen qcovs stitle" -num_alignments 1000000 -evalue 1e-6' - publishDir = [ - [ - path: { "${params.outdir}/blast/blastn/against_ref_db" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'BLAST_BLASTN_CONSENSUS_REF_DB' { + ext.args = '-outfmt "6 qaccver saccver pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen slen qcovs stitle" -num_alignments 1000000 -evalue 1e-6' + publishDir = [ + [ + path: { "${params.outdir}/blast/blastn/against_ref_db" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'BCF_CONSENSUS' { - publishDir = [ - [ - path: { "${params.outdir}/consensus/bcftools/${sample}" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'BCF_CONSENSUS' { + publishDir = [ + [ + path: { "${params.outdir}/consensus/bcftools/${sample}" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'BCFTOOLS_STATS' { - publishDir = [ - [ - path: { "${params.outdir}/variants/${sample}" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'BCFTOOLS_STATS' { + publishDir = [ + [ + path: { "${params.outdir}/variants/${sample}" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'CAT_CONSENSUS' { - publishDir = [ - [ - path: { "${params.outdir}/consensus/bcftools/"}, - pattern: "*.consensus.fasta", - mode: params.publish_dir_mode - ] - ] - } + withName: 'CAT_CONSENSUS' { + publishDir = [ + [ + path: { "${params.outdir}/consensus/bcftools/"}, + pattern: "*.consensus.fasta", + mode: params.publish_dir_mode + ] + ] + } - withName: 'COVERAGE_PLOT' { - publishDir = [ - [ - path: { "${params.outdir}/coverage_plots/${sample}" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'COVERAGE_PLOT' { + publishDir = [ + [ + path: { "${params.outdir}/coverage_plots/${sample}" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'MEDAKA' { - publishDir = [ - [ - path: { "${params.outdir}/variants/${sample}" }, - pattern: "*.{vcf,log}", - mode: params.publish_dir_mode - ], - [ - path: { "${params.outdir}/variants/${sample}/medaka"}, - mode: params.publish_dir_mode, - enable: true - ] - ] - } + withName: 'MEDAKA' { + publishDir = [ + [ + path: { "${params.outdir}/variants/${sample}" }, + pattern: "*.{vcf,log}", + mode: params.publish_dir_mode + ], + [ + path: { "${params.outdir}/variants/${sample}/medaka"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode, + enable: true + ] + ] + } - withName: 'CLAIR3' { - publishDir = [ - [ - path: { "${params.outdir}/variants/${sample}"}, - pattern: "*.{vcf.gz,log}", - mode: params.publish_dir_mode - ], - [ - path: { "${params.outdir}/variants/${sample}/clair3"}, - mode: params.publish_dir_mode, - enable: true - ] - ] - } + withName: 'CLAIR3' { + publishDir = [ + [ + path: { "${params.outdir}/variants/${sample}"}, + pattern: "*.{vcf.gz,log}", + mode: params.publish_dir_mode + ], + [ + path: { "${params.outdir}/variants/${sample}/clair3"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode, + enable: true + ] + ] + } - withName: 'MINIMAP2' { - publishDir = [ - [ - path: { "${params.outdir}/mapping/${sample}"}, - mode: params.publish_dir_mode - ] - ] - } + withName: 'MINIMAP2' { + publishDir = [ + [ + path: { "${params.outdir}/mapping/${sample}"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'MOSDEPTH_GENOME' { - publishDir = [ - [ - path: { "${params.outdir}/mosdepth/${sample}"}, - mode: params.publish_dir_mode - ] - ] - } + withName: 'MOSDEPTH_GENOME' { + publishDir = [ + [ + path: { "${params.outdir}/mosdepth/${sample}"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'PULL_TOP_REF_ID' { - publishDir = [ - [ - path: { "${params.outdir}/reference_sequences/${meta.id}"}, - pattern: "*.csv", - mode: params.publish_dir_mode - ] - ] - } + withName: 'PULL_TOP_REF_ID' { + publishDir = [ + [ + path: { "${params.outdir}/reference_sequences/${meta.id}"}, + pattern: "*.csv", + mode: params.publish_dir_mode + ] + ] + } - withName: 'CHECK_REF_FASTA' { - publishDir = [ - [ - path: { "${params.outdir}/reference_sequences/"}, - pattern: "*.fasta", - mode: params.publish_dir_mode - ] - ] - } + withName: 'CHECK_REF_FASTA' { + publishDir = [ + [ + path: { "${params.outdir}/reference_sequences/"}, + pattern: "*.fasta", + mode: params.publish_dir_mode + ] + ] + } - withName: 'SEQTK_SEQ' { - publishDir = [ - [ - path: { "${params.outdir}/reference_sequences/${sample}"}, - pattern: "*.fasta", - mode: params.publish_dir_mode - ] - ] - } + withName: 'SEQTK_SEQ' { + publishDir = [ + [ + path: { "${params.outdir}/reference_sequences/${sample}"}, + pattern: "*.fasta", + mode: params.publish_dir_mode + ] + ] + } - withName: 'SUBTYPING_REPORT_BCF_CONSENSUS' { - publishDir = [ - [ - path: { "${params.outdir}/"}, - pattern: "*.{xlsx,log}", - mode: params.publish_dir_mode - ] - ] - } + withName: 'SUBTYPING_REPORT_BCF_CONSENSUS' { + publishDir = [ + [ + path: { "${params.outdir}/"}, + pattern: "*.{xlsx,log}", + mode: params.publish_dir_mode + ] + ] + } - withName: 'BLASTN_REPORT' { - publishDir = [ - [ - path: { "${params.outdir}/mismatch_report"}, - pattern: "*.{xlsx}", - mode: params.publish_dir_mode - ] - ] - } + withName: 'BLASTN_REPORT' { + publishDir = [ + [ + path: { "${params.outdir}/mismatch_report"}, + pattern: "*.{xlsx}", + mode: params.publish_dir_mode + ] + ] + } - withName: 'SUBTYPING_REPORT_IRMA_CONSENSUS' { - publishDir = [ - [ - path: { "${params.outdir}/irma"}, - pattern: "*.{xlsx,log}", - mode: params.publish_dir_mode - ] - ] - } + withName: 'SUBTYPING_REPORT_IRMA_CONSENSUS' { + publishDir = [ + [ + path: { "${params.outdir}/irma"}, + pattern: "*.{xlsx,log}", + mode: params.publish_dir_mode + ] + ] + } - withName: 'VCF_FILTER_FRAMESHIFT' { - publishDir = [ - [ - path: { "${params.outdir}/variants/${sample}" }, - pattern: "*.vcf", - mode: params.publish_dir_mode - ] - ] - } + withName: 'VCF_FILTER_FRAMESHIFT' { + publishDir = [ + [ + path: { "${params.outdir}/variants/${sample}" }, + pattern: "*.vcf", + mode: params.publish_dir_mode + ] + ] + } - withName: 'GUNZIP_NCBI_FLU_FASTA' { - publishDir = [ - [ - path: { "${params.outdir}/flu_fasta" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'ZSTD_DECOMPRESS_.*' { + publishDir = [ + [ + path: { "${params.outdir}/ncbi-influenza-db"}, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'READ_COUNT_FAIL_TSV' { - publishDir = [ - [ - path: { "${params.outdir}/read_count" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'READ_COUNT_FAIL_TSV' { + publishDir = [ + [ + path: { "${params.outdir}/read_count" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'READ_COUNT_PASS_TSV' { - publishDir = [ - [ - path: { "${params.outdir}/read_count" }, - mode: params.publish_dir_mode - ] - ] - } + withName: 'READ_COUNT_PASS_TSV' { + publishDir = [ + [ + path: { "${params.outdir}/read_count" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } - withName: 'SOFTWARE_VERSIONS' { - publishDir = [ - [ - path: { "${params.outdir}/pipeline_info" }, - pattern: "software_versions.yml", - mode: params.publish_dir_mode - ] - ] - } + withName: 'SOFTWARE_VERSIONS' { + publishDir = [ + [ + path: { "${params.outdir}/pipeline_info" }, + pattern: "software_versions.yml", + mode: params.publish_dir_mode + ] + ] + } - withName: 'MULTIQC' { - publishDir = [ - [ - path: { "${params.outdir}/MultiQC" }, - mode: params.publish_dir_mode - ] - ] - } -} \ No newline at end of file + withName: 'MULTIQC' { + publishDir = [ + [ + path: { "${params.outdir}/MultiQC" }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, + mode: params.publish_dir_mode + ] + ] + } +} diff --git a/modules/local/misc.nf b/modules/local/misc.nf index 61c3a68..16a1090 100644 --- a/modules/local/misc.nf +++ b/modules/local/misc.nf @@ -58,40 +58,6 @@ process CAT_DB { """ } -process GUNZIP_NCBI_FLU_FASTA { - tag "$archive" - label 'process_low' - - conda (params.enable_conda ? "conda-forge::sed=4.7" : null) - if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) { - container "https://containers.biocontainers.pro/s3/SingImgsRepo/biocontainers/v1.2.0_cv1/biocontainers_v1.2.0_cv1.img" - } else { - container "biocontainers/biocontainers:v1.2.0_cv1" - } - - input: - path archive - - output: - path "*.fna", emit: fna - path "versions.yml" , emit: versions - - script: - def software = getSoftwareName(task.process) - // replace FASTA headers - // >gi|{gi}|gb|{accession}|{description} - // with - // >{accession} {description} - // for easier parsing and processing - """ - zcat $archive | sed -E 's/^>gi\\|[0-9]+\\|gb\\|(\\w+)\\|(.*)/>\\1 \\2/' > influenza.fna - cat <<-END_VERSIONS > versions.yml - "${task.process}": - zcat: \$(echo \$(zcat --version 2>&1) | sed 's/^.*(gzip) //; s/ Copyright.*\$//') - END_VERSIONS - """ -} - process CAT_CONSENSUS { tag "$sample" conda (params.enable_conda ? 'bioconda::shiptv=0.4.0' : null) diff --git a/modules/local/zstd_decompress.nf b/modules/local/zstd_decompress.nf new file mode 100644 index 0000000..aecbe87 --- /dev/null +++ b/modules/local/zstd_decompress.nf @@ -0,0 +1,30 @@ +process ZSTD_DECOMPRESS { + + conda 'conda-forge::zstd=1.5.2' + // TODO: using clair3 container here for zstd and since it might be used if running the Nanopore workflow, but should move to multi-package-container with just zstd and maybe curl to combine data fetch functionality + if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) { + container 'https://depot.galaxyproject.org/singularity/clair3:1.0.3--py39h8492097_0' + } else { + container 'quay.io/biocontainers/clair3:1.0.3--py39h8492097_0' + } + + input: + path(zstd_file, stageAs: "input*/*") + val(filename) + + output: + path(decompressed_file), emit: file + path('versions.yml'), emit: versions + + script: + def basename = file(zstd_file).getBaseName() + decompressed_file = filename ? "${basename}-${filename}" : basename + """ + zstdcat $zstd_file > $decompressed_file + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + zstd: \$(echo \$(zstd --version 2>&1) | sed 's/^.* v//; s/,.*//') + END_VERSIONS + """ +} diff --git a/nextflow.config b/nextflow.config index b3bd9bf..fccf72f 100644 --- a/nextflow.config +++ b/nextflow.config @@ -23,8 +23,8 @@ params { min_aln_length = 700 max_top_blastn = 3 // reference data - ncbi_influenza_fasta = 'https://ftp.ncbi.nih.gov/genomes/INFLUENZA/influenza.fna.gz' - ncbi_influenza_metadata = 'https://ftp.ncbi.nih.gov/genomes/INFLUENZA/genomeset.dat.gz' + ncbi_influenza_fasta = 'https://api.figshare.com/v2/file/download/41415330' + ncbi_influenza_metadata = 'https://api.figshare.com/v2/file/download/41415333' // Slurm scheduler options slurm_queue = '' slurm_queue_size = 100 diff --git a/workflows/illumina.nf b/workflows/illumina.nf index 427f554..34f51f9 100644 --- a/workflows/illumina.nf +++ b/workflows/illumina.nf @@ -20,6 +20,8 @@ include { GUNZIP_NCBI_FLU_FASTA } from '../modules/local/misc' include { BLAST_MAKEBLASTDB } from '../modules/local/blast_makeblastdb' include { BLAST_BLASTN } from '../modules/local/blastn' include { CAT_ILLUMINA_FASTQ } from '../modules/local/cat_illumina_fastq' +include { FETCH_INFLUENZA_REF_DB } from '../modules/local/fetch_influenza_ref_db' +include { ZSTD_DECOMPRESS as ZSTD_DECOMPRESS_FASTA; ZSTD_DECOMPRESS as ZSTD_DECOMPRESS_CSV } from '../modules/local/zstd_decompress' //============================================================================= // Workflow Params Setup @@ -35,9 +37,14 @@ if (params.irma_module) { //============================================================================= workflow ILLUMINA { - - GUNZIP_NCBI_FLU_FASTA(ch_influenza_db_fasta) - BLAST_MAKEBLASTDB(GUNZIP_NCBI_FLU_FASTA.out.fna) + ch_versions = Channel.empty() + // Decompress reference data + ZSTD_DECOMPRESS_FASTA(ch_influenza_db_fasta, "influenza.fasta") + ch_versions = ch_versions.mix(ZSTD_DECOMPRESS_FASTA.out.versions) + ZSTD_DECOMPRESS_CSV(ch_influenza_metadata, "influenza.csv") + ch_versions = ch_versions.mix(ZSTD_DECOMPRESS_CSV.out.versions) + BLAST_MAKEBLASTDB(ZSTD_DECOMPRESS_FASTA.out.file) + ch_versions = ch_versions.mix(BLAST_MAKEBLASTDB.out.versions) CHECK_SAMPLE_SHEET(Channel.fromPath( params.input, checkIfExists: true)) .splitCsv(header: ['sample', 'fastq1', 'fastq2', 'single_end'], sep: ',', skip: 1) @@ -71,11 +78,17 @@ workflow ILLUMINA { // Credit to nf-core/viralrecon. Source: https://github.com/nf-core/viralrecon/blob/a85d5969f9025409e3618d6c280ef15ce417df65/workflows/illumina.nf#L221 // Concatenate FastQ files from same sample if required CAT_ILLUMINA_FASTQ(ch_input) + ch_versions = ch_versions.mix(CAT_ILLUMINA_FASTQ.out.versions) IRMA(CAT_ILLUMINA_FASTQ.out.reads, irma_module) + ch_versions = ch_versions.mix(IRMA.out.versions) BLAST_BLASTN(IRMA.out.consensus, BLAST_MAKEBLASTDB.out.db) + ch_versions = ch_versions.mix(BLAST_BLASTN.out.versions) ch_blast = BLAST_BLASTN.out.txt.collect({ it[1] }) - SUBTYPING_REPORT(ch_influenza_metadata, ch_blast) + SUBTYPING_REPORT(ZSTD_DECOMPRESS_CSV.out, ch_blast) + ch_versions = ch_versions.mix(SUBTYPING_REPORT.out.versions) + + SOFTWARE_VERSIONS(ch_versions.unique().collectFile(name: 'collated_versions.yml')) } diff --git a/workflows/nanopore.nf b/workflows/nanopore.nf index 0d13d9a..2dd21bd 100644 --- a/workflows/nanopore.nf +++ b/workflows/nanopore.nf @@ -19,7 +19,7 @@ include { BCF_CONSENSUS; BCFTOOLS_STATS } from '../modules include { CLAIR3 } from '../modules/local/clair3' include { MOSDEPTH_GENOME } from '../modules/local/mosdepth' include { CAT_NANOPORE_FASTQ } from '../modules/local/misc' -include { GUNZIP_NCBI_FLU_FASTA } from '../modules/local/misc' +include { ZSTD_DECOMPRESS as ZSTD_DECOMPRESS_FASTA; ZSTD_DECOMPRESS as ZSTD_DECOMPRESS_CSV } from '../modules/local/zstd_decompress' include { CAT_DB } from '../modules/local/misc' include { CAT_CONSENSUS } from '../modules/local/misc' include { SEQTK_SEQ } from '../modules/local/seqtk_seq' @@ -131,10 +131,12 @@ workflow NANOPORE { .map { sample, fqgz, fq, count -> [ [id: sample], fqgz, fq ] } .set { ch_reads } - GUNZIP_NCBI_FLU_FASTA(ch_influenza_db_fasta) - ch_versions = ch_versions.mix(GUNZIP_NCBI_FLU_FASTA.out.versions) + ZSTD_DECOMPRESS_FASTA(ch_influenza_db_fasta, "influenza.fasta") + ch_versions = ch_versions.mix(ZSTD_DECOMPRESS_FASTA.out.versions) + ZSTD_DECOMPRESS_CSV(ch_influenza_metadata, "influenza.csv") + ch_versions = ch_versions.mix(ZSTD_DECOMPRESS_CSV.out.versions) - ch_input_ref_db = GUNZIP_NCBI_FLU_FASTA.out.fna + ch_input_ref_db = ZSTD_DECOMPRESS_FASTA.out.file if (params.ref_db){ ch_ref_fasta = file(params.ref_db, type: 'file') @@ -159,11 +161,11 @@ workflow NANOPORE { //Generate suptype prediction report if (!params.skip_irma_subtyping_report){ ch_blast_irma = BLAST_BLASTN_IRMA.out.txt.collect({ it[1] }) - SUBTYPING_REPORT_IRMA_CONSENSUS(ch_influenza_metadata, ch_blast_irma) + SUBTYPING_REPORT_IRMA_CONSENSUS(ZSTD_DECOMPRESS_CSV.out.file, ch_blast_irma) } // Prepare top ncbi accession id for each segment of each sample sample (id which has top bitscore) - PULL_TOP_REF_ID(BLAST_BLASTN_IRMA.out.txt, ch_influenza_metadata) + PULL_TOP_REF_ID(BLAST_BLASTN_IRMA.out.txt, ZSTD_DECOMPRESS_CSV.out.file) ch_versions = ch_versions.mix(PULL_TOP_REF_ID.out.versions) PULL_TOP_REF_ID.out.accession_id @@ -242,7 +244,7 @@ workflow NANOPORE { ch_versions = ch_versions.mix(BLAST_BLASTN_CONSENSUS.out.versions) ch_blastn_consensus = BLAST_BLASTN_CONSENSUS.out.txt.collect({ it[1] }) - SUBTYPING_REPORT_BCF_CONSENSUS(ch_influenza_metadata, ch_blastn_consensus) + SUBTYPING_REPORT_BCF_CONSENSUS(ZSTD_DECOMPRESS_CSV.out.file, ch_blastn_consensus) ch_versions = ch_versions.mix(SUBTYPING_REPORT_BCF_CONSENSUS.out.versions) if (params.ref_db){