Skip to content

Commit

Permalink
EVA-3427 - Normalisation before ingestion (#188)
Browse files Browse the repository at this point in the history
* add normalisation
* Add version migration script
* Add test for config upgrade
Co-authored-by: April Shen <april.tuesday@gmail.com>
  • Loading branch information
tcezard authored Dec 1, 2023
1 parent 212a598 commit 923822b
Show file tree
Hide file tree
Showing 15 changed files with 368 additions and 57 deletions.
15 changes: 15 additions & 0 deletions eva_submission/config_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@
logger = log_cfg.get_logger(__name__)


def upgrade_version_1_14_to_1_15(eload_cfg):
"""
Upgrades a version 1.14 config to version 1.15 to change the path to ingestion nextflow directories
"""
accession_nexflow_dir = eload_cfg.query('ingestion', 'accession', 'nextflow_dir')
variant_load_nexflow_dir = eload_cfg.query('ingestion', 'variant_load', 'nextflow_dir')
if accession_nexflow_dir:
eload_cfg.set('ingestion', 'accession_and_load', 'nextflow_dir', 'accession', value=accession_nexflow_dir)
if variant_load_nexflow_dir:
eload_cfg.set('ingestion', 'accession_and_load', 'nextflow_dir', 'variant_load', value=variant_load_nexflow_dir)

# Set version once we've successfully upgraded
eload_cfg.set('version', value=__version__)


def upgrade_version_0_1(eload_cfg, analysis_alias=None):
"""
Upgrades a version 0.x config to version 1, using the provided analysis alias for all files.
Expand Down
59 changes: 37 additions & 22 deletions eva_submission/eload_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import yaml
from cached_property import cached_property
from ebi_eva_common_pyutils import command_utils
from ebi_eva_common_pyutils.assembly.assembly import get_supported_asm_from_ensembl
from ebi_eva_common_pyutils.config import cfg
from ebi_eva_common_pyutils.ena_utils import get_assembly_name_and_taxonomy_id
from ebi_eva_common_pyutils.ncbi_utils import get_species_name_from_ncbi
Expand Down Expand Up @@ -282,8 +281,8 @@ def _generate_csv_mappings_to_ingest(self):
vcf_files_to_ingest = os.path.join(self.eload_dir, 'vcf_files_to_ingest.csv')
with open(vcf_files_to_ingest, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['vcf_file', 'assembly_accession', 'fasta', 'report', 'analysis_accession', 'db_name',
'vep_version', 'vep_cache_version', 'vep_species', 'aggregation'])
writer.writerow(['vcf_file', 'csi_file', 'assembly_accession', 'fasta', 'report', 'analysis_accession',
'db_name', 'vep_version', 'vep_cache_version', 'vep_species', 'aggregation'])
analyses = self.eload_cfg.query('brokering', 'analyses')
for analysis_alias, analysis_data in analyses.items():
assembly_accession = analysis_data['assembly_accession']
Expand All @@ -302,7 +301,8 @@ def _generate_csv_mappings_to_ingest(self):
aggregation = self.eload_cfg.query(self.config_section, 'aggregation', analysis_accession)
if analysis_data['vcf_files']:
for vcf_file in analysis_data['vcf_files']:
writer.writerow([vcf_file, assembly_accession, fasta, report, analysis_accession, db_name,
csi_file = analysis_data['vcf_files'][vcf_file]['csi']
writer.writerow([vcf_file, csi_file, assembly_accession, fasta, report, analysis_accession, db_name,
vep_version, vep_cache_version, vep_species, aggregation])
else:
self.warning(f"VCF files for analysis {analysis_alias} not found")
Expand Down Expand Up @@ -335,8 +335,8 @@ def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only,
'accession_job_props': accession_properties_file,
'load_job_props': variant_load_properties_file,
'acc_import_job_props': accession_import_properties_file,
'tasks': tasks
}
tasks = [task for task in tasks if task in ['accession', 'variant_load', 'annotation']]
self.run_nextflow('accession_and_load', accession_config, resume, tasks)

def run_remap_and_cluster_workflow(self, target_assembly, resume):
Expand Down Expand Up @@ -375,7 +375,7 @@ def run_remap_and_cluster_workflow(self, target_assembly, resume):
}
for part in ['executable', 'nextflow', 'jar']:
remap_cluster_config[part] = cfg[part]
self.run_nextflow('remap_and_cluster', remap_cluster_config, resume)
self.run_nextflow('remap_and_cluster', remap_cluster_config, resume, tasks=['optional_remap_and_cluster'])

def _get_supported_assembly_from_evapro(self, tax_id: int = None):
tax_id = tax_id or self.taxonomy
Expand Down Expand Up @@ -630,31 +630,45 @@ def refresh_study_browser(self):
def valid_vcf_filenames(self):
return list(self.project_dir.joinpath(project_dirs['valid']).glob('*.vcf.gz'))

def run_nextflow(self, workflow_name, params, resume, tasks=None):
def run_nextflow(self, workflow_name, params, resume, tasks=all_tasks):
"""
Runs a Nextflow workflow using the provided parameters.
This will create a Nextflow work directory and delete it if the process completes successfully.
If the process fails, the work directory is preserved and the process can be resumed.
"""
work_dir = None
if tasks and tasks is not self.all_tasks:
# The subsection name combine the workflow and the tasks to ensure resumability only applies to a workflow
# and its tasks
subsection_name = workflow_name + '_' + '_'.join(sorted(tasks))
else:
subsection_name = workflow_name
if resume:
work_dir = self.eload_cfg.query(self.config_section, subsection_name, 'nextflow_dir')
if work_dir == self.nextflow_complete_value:
self.info(f'Nextflow {workflow_name} pipeline already completed, skipping.')
return
if not work_dir or not os.path.exists(work_dir):
completed_tasks = [
task for task in tasks
if self.eload_cfg.query(self.config_section, workflow_name, 'nextflow_dir', task) == self.nextflow_complete_value
]
if completed_tasks:
self.info(f'Task(s) {", ".join(completed_tasks)} already completed, skipping.')
# Remove completed tasks
for task in completed_tasks:
tasks.remove(task)
# Retrieve the work directory for the remaining tasks
work_dirs = [
self.eload_cfg.query(self.config_section, workflow_name, 'nextflow_dir', task)
for task in tasks
]
# Remove None and non-existing directories because they are not started
work_dirs = set([w for w in work_dirs if w and os.path.exists(w)])
if len(work_dirs) == 1:
work_dir = work_dirs.pop()
else:
self.warning(f'Work directory for {workflow_name} not found, will start from scratch.')
work_dir = None
if not resume or not work_dir:
work_dir = self.create_nextflow_temp_output_directory(base=self.project_dir)
self.eload_cfg.set(self.config_section, subsection_name, 'nextflow_dir', value=work_dir)

for task in tasks:
self.eload_cfg.set(self.config_section, workflow_name, 'nextflow_dir', task, value=work_dir)
# Set the tasks for nextflow to perform
if tasks:
params['tasks'] = tasks
else:
# No tasks to perform, skip running nextflow altogether
return
params_file = os.path.join(self.project_dir, f'{workflow_name}_params.yaml')
with open(params_file, 'w') as open_file:
yaml.safe_dump(params, open_file)
Expand All @@ -672,8 +686,9 @@ def run_nextflow(self, workflow_name, params, resume, tasks=None):
))
)
shutil.rmtree(work_dir)
self.eload_cfg.set(self.config_section, str(subsection_name), 'nextflow_dir',
value=self.nextflow_complete_value)
for task in tasks:
self.eload_cfg.set(self.config_section, workflow_name, 'nextflow_dir', task,
value=self.nextflow_complete_value)
except subprocess.CalledProcessError as e:
error_msg = f'Nextflow {workflow_name} pipeline failed: results might not be complete.'
error_msg += (f"See Nextflow logs in {self.eload_dir}/.nextflow.log or pipeline logs "
Expand Down
14 changes: 11 additions & 3 deletions eva_submission/eload_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from ebi_eva_common_pyutils.logger import AppLogger
from ebi_eva_common_pyutils.logger import logging_config as log_cfg
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle
from packaging import version

from eva_submission import __version__
from eva_submission.config_migration import upgrade_version_0_1
from eva_submission.config_migration import upgrade_version_0_1, upgrade_version_1_14_to_1_15
from eva_submission.eload_utils import get_hold_date_from_ena
from eva_submission.submission_config import EloadConfig
from eva_submission.xlsx.xlsx_parser_eva import EvaXlsxReader, EvaXlsxWriter
Expand Down Expand Up @@ -92,13 +93,20 @@ def create_log_file(self):

def upgrade_config_if_needed(self, analysis_alias=None):
"""
Upgrades configs to the current version, making a backup first and using the provided analysis alias for all
vcf files. Currently doesn't perform any other version upgrades.
Upgrades configs to the current version, it supports:
- making a backup
- using the provided analysis alias for all vcf files (pre version 1)
- reformat nextflow directories in the config (pre version 1.15)
"""
if 'version' not in self.eload_cfg:
self.debug(f'No version found in config, upgrading to version {__version__}.')
self.eload_cfg.backup()
upgrade_version_0_1(self.eload_cfg, analysis_alias)
upgrade_version_1_14_to_1_15(self.eload_cfg)
elif version.parse(self.eload_cfg.query('version')) < version.parse("1.15"):
self.debug(f'Pre version 1.15, upgrading to version from {version} to {__version__}.')
self.eload_cfg.backup()
upgrade_version_1_14_to_1_15(self.eload_cfg)
else:
self.debug(f"Config is version {self.eload_cfg.query('version')}, not upgrading.")

Expand Down
81 changes: 69 additions & 12 deletions eva_submission/nextflow/accession_and_load.nf
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ human study:
- Initialize csi_vcfs with values enabling them to start the processes "csi_index_vcf".
*/
workflow {

fasta_channel = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.fasta), file(row.report), row.assembly_accession, file(row.vcf_file))}
.groupTuple(by: [0, 1, 2])

prepare_genome(fasta_channel)

assembly_and_vcf_channel = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(row.assembly_accession, file(row.vcf_file), file(row.csi_file))}
.combine(prepare_genome.out.custom_fasta, by: 0) // Join based on the assembly
.map{tuple(it[1].name, it[3], it[1], it[2])} // vcf_filename, fasta_file, vcf_file, csi_file

normalise_vcf(assembly_and_vcf_channel)
all_accession_complete = null
is_human_study = (params.taxonomy == 9606)
if ("accession" in params.ingestion_tasks) {
Expand All @@ -103,10 +118,12 @@ workflow {
.map{row -> tuple(file(row.vcf_file))}
accessioned_files_to_rm = Channel.empty()
} else {
valid_vcfs = Channel.fromPath(params.valid_vcfs)
normalised_vcfs_ch = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file), row.assembly_accession, row.aggregation, file(row.fasta), file(row.report))}
accession_vcf(valid_vcfs)
.map{row -> tuple(file(row.vcf_file).name, file(row.vcf_file), row.assembly_accession, row.aggregation, file(row.fasta), file(row.report))}
.combine(normalise_vcf.out.vcf_tuples, by:0) // Join based on the vcf_filename
.map {tuple(it[0], it[7], it[2], it[3], it[4], it[5])} // vcf_filename, normalised vcf, assembly_accession, aggregation, fasta, report
accession_vcf(normalised_vcfs_ch)
sort_and_compress_vcf(accession_vcf.out.accession_done)
csi_vcfs = sort_and_compress_vcf.out.compressed_vcf
accessioned_files_to_rm = accession_vcf.out.accessioned_filenames
Expand All @@ -116,10 +133,12 @@ workflow {
copy_to_ftp(csi_index_vcf.out.csi_indexed_vcf.toList(), accessioned_files_to_rm.toList())
}
if ("variant_load" in params.ingestion_tasks || "annotation" in params.ingestion_tasks) {
annotated_vcfs = Channel.fromPath(params.valid_vcfs)
normalised_vcfs_ch = Channel.fromPath(params.valid_vcfs)
.splitCsv(header:true)
.map{row -> tuple(file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)}
load_vcf(annotated_vcfs)
.map{row -> tuple(file(row.vcf_file).name, file(row.vcf_file), file(row.fasta), row.analysis_accession, row.db_name, row.vep_version, row.vep_cache_version, row.vep_species, row.aggregation)}
.combine(normalise_vcf.out.vcf_tuples, by:0)
.map{tuple(it[0], it[9], it[2], it[3], it[4], it[5], it[6], it[7], it[8])} // vcf_filename, normalised vcf, fasta, analysis_accession, db_name, vep_version, vep_cache_version, vep_species, aggregation
load_vcf(normalised_vcfs_ch)

if (!is_human_study) {
vcf_files_dbname = Channel.fromPath(params.valid_vcfs)
Expand All @@ -135,6 +154,46 @@ workflow {
}


/*
* Convert the genome to the same naming convention as the VCF
*/
process prepare_genome {

input:
tuple path(fasta), path(report), val(assembly_accession), path(vcf_files)

output:
tuple val(assembly_accession), path("${fasta.getSimpleName()}_custom.fa"), emit: custom_fasta

script:
"""
export PYTHONPATH="$params.executable.python.script_path"
$params.executable.python.interpreter -m eva_submission.steps.rename_contigs_from_insdc_in_assembly \
--assembly_accession $assembly_accession --assembly_fasta $fasta --custom_fasta ${fasta.getSimpleName()}_custom.fa \
--assembly_report $report --vcf_files $vcf_files
"""
}


/*
* Normalise the VCF files
*/
process normalise_vcf {
input:
tuple val(vcf_filename), path(fasta), path(vcf_file), path(csi_file)

output:
tuple val(vcf_filename), path("normalised_vcfs/*.gz"), path("normalised_vcfs/*.csi"), emit: vcf_tuples

script:
"""
mkdir normalised_vcfs
$params.executable.bcftools norm --no-version -cw -f $fasta -O z -o normalised_vcfs/$vcf_file $vcf_file 2> normalised_vcfs/${vcf_file.getBaseName()}_bcftools_norm.log
$params.executable.bcftools index -c normalised_vcfs/$vcf_file
"""
}


/*
* Accession VCFs
*/
Expand All @@ -146,7 +205,7 @@ process accession_vcf {
memory '6.7 GB'

input:
tuple val(vcf_file), val(assembly_accession), val(aggregation), val(fasta), val(report)
tuple val(vcf_filename), val(vcf_file), val(assembly_accession), val(aggregation), val(fasta), val(report)

output:
val accessioned_filename, emit: accessioned_filenames
Expand All @@ -160,7 +219,6 @@ process accession_vcf {
pipeline_parameters += " --parameters.assemblyReportUrl=file:" + report.toString()
pipeline_parameters += " --parameters.vcf=" + vcf_file.toString()

vcf_filename = vcf_file.getFileName().toString()
accessioned_filename = vcf_filename.take(vcf_filename.indexOf(".vcf")) + ".accessioned.vcf"
log_filename = "accessioning.${vcf_filename}"

Expand Down Expand Up @@ -250,13 +308,12 @@ process csi_index_vcf {
*/
process load_vcf {
clusterOptions {
log_filename = vcf_file.getFileName().toString()
return "-o $params.logs_dir/pipeline.${log_filename}.log \
-e $params.logs_dir/pipeline.${log_filename}.err"
return "-o $params.logs_dir/pipeline.${vcf_filename}.log \
-e $params.logs_dir/pipeline.${vcf_filename}.err"
}

input:
tuple val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation)
tuple val(vcf_filename), val(vcf_file), val(fasta), val(analysis_accession), val(db_name), val(vep_version), val(vep_cache_version), val(vep_species), val(aggregation)

output:
val true, emit: variant_load_complete
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ pysam
pyyaml
requests
retry
packaging
1 change: 1 addition & 0 deletions tests/nextflow-tests/bin/fake_bcftools.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ elif [[ $1 == "sort" ]]; then
touch $filename
elif [[ $1 == "norm" ]]; then
filename=${*: -2}
printf $filename
touch $filename
else
filename=$3
Expand Down
3 changes: 3 additions & 0 deletions tests/nextflow-tests/test_ingestion_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ executable:
genome_downloader: ../../../bin/fake_genome_downloader.py
custom_assembly: ../../../bin/fake_custom_assembly.py
python_activate: ../../../bin/venv_activate
python:
script_path: ../../../../../
interpreter: '`which python`'

nextflow:
remapping: ../../../bin/fake_remapping_pipeline.nf
Expand Down
4 changes: 4 additions & 0 deletions tests/nextflow-tests/test_ingestion_config_human.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ executable:
bcftools: ../../../bin/fake_bcftools.sh
bgzip: ../../../bin/fake_bgzip.sh
tabix: ../../../bin/fake_tabix.sh
python:
script_path: ../../../../../
interpreter: '`which python`'

jar:
accession_pipeline: ../../../java/accession.jar
eva_pipeline: ../../../java/variant-load.jar
8 changes: 4 additions & 4 deletions tests/nextflow-tests/vcf_files_to_ingest.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
vcf_file,assembly_accession,fasta,report,analysis_accession,db_name,vep_version,vep_cache_version,vep_secies,aggregation
vcfs/test1.vcf.gz,GCA_000001000.1,/path/to/fasta.fa,/path/to/assembly_report.txt,ERZ12345,db1,100,100,homo_sapiens,none
vcfs/test2.vcf.gz,GCA_000001000.1,/path/to/fasta.fa,/path/to/assembly_report.txt,ERZ12345,db1,100,100,homo_sapiens,none
vcfs/test3.vcf.gz,GCA_000001000.1,/path/to/fasta.fa,/path/to/assembly_report.txt,ERZ12346,db2,,,,none
vcf_file,csi_file,assembly_accession,fasta,report,analysis_accession,db_name,vep_version,vep_cache_version,vep_secies,aggregation
vcfs/test1.vcf.gz,vcfs/test1.vcf.gz.csi,GCA_000001000.1,GCA_000002945.2/GCA_000002945.2.fa,GCA_000002945.2/GCA_000002945.2_assembly_report.txt,ERZ12345,db1,100,100,homo_sapiens,none
vcfs/test2.vcf.gz,vcfs/test2.vcf.gz.csi,GCA_000001000.1,GCA_000002945.2/GCA_000002945.2.fa,GCA_000002945.2/GCA_000002945.2_assembly_report.txt,ERZ12345,db1,100,100,homo_sapiens,none
vcfs/test3.vcf.gz,vcfs/test3.vcf.gz.csi,GCA_000001000.1,GCA_000002945.2/GCA_000002945.2.fa,GCA_000002945.2/GCA_000002945.2_assembly_report.txt,ERZ12346,db2,,,,none
Loading

0 comments on commit 923822b

Please sign in to comment.