Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3430 - Add bcftools norm before ingesting #25

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 56 additions & 17 deletions covid19dp_submission/nextflow/submission_workflow.nf
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
nextflow.enable.dsl=2

params.NORMALISED_VCF_DIR = "${params.submission.download_target_dir}/normalised_vcfs"
params.REFSEQ_FASTA = "${params.submission.download_target_dir}/refseq_fasta.fa"

// This is needed because "bcftools norm" step requires a FASTA
// but the VCFs we get from Covid19 data team only have RefSeq contigs
process create_refseq_fasta {
output:
val true, emit: create_refseq_fasta_success

script:
// TODO: Doing this seems like a bad idea but this seems to work well for now
"""
sed s/MN908947.3/NC_045512.2/g $params.submission.assembly_fasta > $params.REFSEQ_FASTA
"""
}

process validate_vcfs {

input:
path vcf_files

output:
val true, emit: validate_vcfs_success

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
Expand Down Expand Up @@ -48,10 +64,10 @@ process bgzip_and_index {
val flag1
val flag2
path vcf_files

output:
val true, emit: bgzip_and_index_success

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
Expand All @@ -67,16 +83,16 @@ process bgzip_and_index {
process vertical_concat {
input:
val flag

output:
val true, emit: vertical_concat_success

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
($params.executable.python.interpreter \
-m steps.vcf_vertical_concat.run_vcf_vertical_concat_pipeline \
--toplevel-vcf-dir $params.submission.download_target_dir \
--toplevel-vcf-dir $params.NORMALISED_VCF_DIR \
--concat-processing-dir $params.submission.concat_processing_dir \
--concat-chunk-size $params.submission.concat_chunk_size \
--bcftools-binary $params.executable.bcftools \
Expand All @@ -86,22 +102,45 @@ process vertical_concat {
"""
}

process normalise_concat_vcf {

input:
val flag1

output:
val true, emit: normalise_concat_vcf_success

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
($params.executable.python.interpreter \
-m steps.normalise_vcfs \
--vcf-files $params.submission.concat_result_file \
--input-dir `dirname ${params.submission.concat_result_file}` \
--output-dir $params.NORMALISED_VCF_DIR \
--bcftools-binary $params.executable.bcftools \
--refseq-fasta-file $params.REFSEQ_FASTA \
) >> $params.submission.log_dir/normalise_concat_vcf.log 2>&1
"""
}

process accession_vcf {
clusterOptions "-g /accession/$params.submission.accessioning_instance"

input:
val flag

output:
val true, emit: accession_vcf_success

script:
//Accessioning properties file passed via command line should already be populated with project and assembly accessions
"""
export PYTHONPATH="$params.executable.python.script_path"
export NORMALISED_CONCAT_VCF=("${params.NORMALISED_VCF_DIR}/"`basename ${params.submission.concat_result_file}`)
($params.executable.python.interpreter \
-m steps.accession_vcf \
--vcf-file $params.submission.concat_result_file \
--vcf-file \$NORMALISED_CONCAT_VCF \
--accessioning-jar-file $params.jar.accession_pipeline \
--accessioning-properties-file $params.submission.accessioning_properties_file \
--accessioning-instance $params.submission.accessioning_instance \
Expand All @@ -116,10 +155,10 @@ process sync_accessions_to_public_ftp {

input:
val flag

output:
val true, emit: sync_accessions_to_public_ftp_success

script:
"""
mkdir -p $params.submission.public_ftp_dir
Expand All @@ -133,10 +172,10 @@ process cluster_assembly {

input:
val flag

output:
val true, emit: cluster_assembly_success

script:
//Clustering properties file passed via command line should already be populated with project and assembly accessions
"""
Expand Down Expand Up @@ -173,6 +212,7 @@ process incremental_release {

workflow {
main:
create_refseq_fasta()
Channel.fromPath("$params.submission.download_file_list")
.splitCsv(header:false)
.map(row -> row[0])
Expand All @@ -181,7 +221,6 @@ workflow {
validate_vcfs(vcf_files_list)
asm_check_vcfs(vcf_files_list)
bgzip_and_index(validate_vcfs.out.validate_vcfs_success, asm_check_vcfs.out.asm_check_vcfs_success, vcf_files_list)

vertical_concat(bgzip_and_index.out.bgzip_and_index_success.collect()) | \
accession_vcf | sync_accessions_to_public_ftp | cluster_assembly | incremental_release
normalise_concat_vcf | accession_vcf | sync_accessions_to_public_ftp | cluster_assembly | incremental_release
}
71 changes: 71 additions & 0 deletions covid19dp_submission/steps/normalise_vcfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2023 EMBL - European Bioinformatics Institute
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import os

from ebi_eva_common_pyutils.command_utils import run_command_with_output
from ebi_eva_common_pyutils.logger import logging_config
from covid19dp_submission.steps.bgzip_and_index_vcf import _get_vcf_filename_without_extension

logger = logging_config.get_logger(__name__)


def normalise(input_dir: str, vcf_file: str, output_file: str, bcftools_binary: str, refseq_fasta_file: str) -> str:
vcf_file_name_no_ext = _get_vcf_filename_without_extension(vcf_file)
vcf_file_name_no_ext_and_path = os.path.basename(vcf_file_name_no_ext)
# See here: https://github.com/EBIvariation/eva-submission/blob/bb85922fffb4f29fdce501af036ea79ec8712121/eva_submission/nextflow/prepare_brokering.nf#L141
commands = [f'cd {input_dir}',
f'{bcftools_binary} norm --check-ref w --fasta-ref {refseq_fasta_file} --output-type z --output '
f'{output_file} {vcf_file} ',
f'{bcftools_binary} index --force --csi {output_file}'
]
normalise_command = ' && '.join(commands)
run_command_with_output(f"Normalising {vcf_file_name_no_ext_and_path}...", normalise_command)
return f"{vcf_file_name_no_ext}.vcf.gz"


def normalise_all(input_dir:str, vcf_files: list, output_dir: str, bcftools_binary: str,
refseq_fasta_file: str) -> list:
output_vcf_files = []
os.makedirs(name=output_dir, exist_ok=True)
for vcf_file in vcf_files:
vcf_file_name_no_ext = _get_vcf_filename_without_extension(vcf_file)
output_file = f"{output_dir}/{os.path.basename(vcf_file_name_no_ext)}.vcf.gz"
normalise(input_dir, vcf_file, output_file, bcftools_binary, refseq_fasta_file)
output_vcf_files.append(output_file)
return output_vcf_files


def main():
parser = argparse.ArgumentParser(description='Normalise VCF file',
formatter_class=argparse.RawTextHelpFormatter, add_help=False)
parser.add_argument("--vcf-files", help="Path to the VCF file (ex: /path/to/file.vcf)",
nargs='+', required=True)
parser.add_argument("--input-dir", help="Path to the directory that will contain the input files",
required=True)
parser.add_argument("--output-dir", help="Path to the directory that will contain the output files",
required=True)
parser.add_argument("--bcftools-binary", help="Full path to the bcftools binary (ex: /path/to/bcftools)",
default="bcftools", required=False)
parser.add_argument("--refseq-fasta-file", help="Path to the RefSeq FASTA file (ex: /path/to/refseq_fasta.fa)",
required=True)
args = parser.parse_args()
logging_config.add_stdout_handler()

normalise_all(args.input_dir, args.vcf_files, args.output_dir, args.bcftools_binary, args.refseq_fasta_file)


if __name__ == "__main__":
main()
Loading
Loading