Skip to content

Commit

Permalink
First version of the merging
Browse files Browse the repository at this point in the history
  • Loading branch information
tcezard committed Oct 25, 2023
1 parent defe06c commit 3d03aa4
Show file tree
Hide file tree
Showing 3 changed files with 368 additions and 37 deletions.
63 changes: 26 additions & 37 deletions eva_submission/eload_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,15 @@ def ingest(
do_variant_load = 'variant_load' in tasks
annotation_only = 'annotation' in tasks and not do_variant_load

if instance_id:
self.eload_cfg.set(self.config_section, 'accession', 'instance_id', value=instance_id)
if do_accession:
self.update_config_with_hold_date(self.project_accession)

if do_accession or do_variant_load or annotation_only:
self.fill_vep_versions(vep_cache_assembly_name)
vcf_files_to_ingest = self._generate_csv_mappings_to_ingest()

if do_accession:
self.eload_cfg.set(self.config_section, 'accession', 'instance_id', value=instance_id)
self.update_config_with_hold_date(self.project_accession)
self.run_accession_workflow(vcf_files_to_ingest, resume=resume)
self.insert_browsable_files()
self.update_browsable_files_with_date()
self.update_files_with_ftp_path()
self.refresh_study_browser()
self.run_accession_and_load_workflow(vcf_files_to_ingest, annotation_only, resume=resume)

if 'optional_remap_and_cluster' in tasks:
self.eload_cfg.set(self.config_section, 'clustering', 'instance_id', value=clustering_instance_id)
Expand All @@ -103,9 +100,12 @@ def ingest(
else:
self.warning(f'Could not find any current supported assembly for the submission, skipping clustering')

if do_variant_load or annotation_only:
self.run_variant_load_workflow(vcf_files_to_ingest, annotation_only, resume=resume)
self.update_loaded_assembly_in_browsable_files()
# update metadata
self.update_loaded_assembly_in_browsable_files()
self.insert_browsable_files()
self.update_browsable_files_with_date()
self.update_files_with_ftp_path()
self.refresh_study_browser()

def fill_vep_versions(self, vep_cache_assembly_name=None):
analyses = self.eload_cfg.query('brokering', 'analyses')
Expand Down Expand Up @@ -305,46 +305,35 @@ def _generate_csv_mappings_to_ingest(self):
self.warning(f"VCF files for analysis {analysis_alias} not found")
return vcf_files_to_ingest

def run_accession_workflow(self, vcf_files_to_ingest, resume):
def run_accession_and_load_workflow(self, vcf_files_to_ingest, annotation_only, resume):
instance_id = self.eload_cfg.query(self.config_section, 'accession', 'instance_id')
output_dir = os.path.join(self.project_dir, project_dirs['accessions'])
accession_properties_file = self.create_accession_properties(instance_id=instance_id,
output_file_path=os.path.join(output_dir, 'accession.properties'))
accession_config = {
'valid_vcfs': vcf_files_to_ingest,
'project_accession': self.project_accession,
'instance_id': instance_id,
'accession_job_props': accession_properties_file,
'public_ftp_dir': cfg['public_ftp_dir'],
'accessions_dir': os.path.join(self.project_dir, project_dirs['accessions']),
'public_dir': os.path.join(self.project_dir, project_dirs['public']),
'logs_dir': os.path.join(self.project_dir, project_dirs['logs']),
'executable': cfg['executable'],
'jar': cfg['jar'],
'taxonomy': self.taxonomy
}
self.run_nextflow('accession', accession_config, resume)

def run_variant_load_workflow(self, vcf_files_to_ingest, annotation_only, resume):
accession_properties_file = self.create_accession_properties(
instance_id=instance_id, output_file_path=os.path.join(output_dir, 'accession.properties'))
variant_load_properties_file = self.create_variant_load_properties(
output_file_path=os.path.join(self.project_dir, 'variant_load.properties'))
accession_import_properties_file = self.create_accession_import_properties(
output_file_path=os.path.join(self.project_dir, 'accession_import.properties'))

load_config = {
accession_config = {
'valid_vcfs': vcf_files_to_ingest,
'vep_path': cfg['vep_path'],
'load_job_props': variant_load_properties_file,
'acc_import_job_props': accession_import_properties_file,
'project_accession': self.project_accession,
'instance_id': instance_id,
'vep_path': cfg['vep_path'],
'project_dir': str(self.project_dir),
'public_ftp_dir': cfg['public_ftp_dir'],
'accessions_dir': os.path.join(self.project_dir, project_dirs['accessions']),
'public_dir': os.path.join(self.project_dir, project_dirs['public']),
'logs_dir': os.path.join(self.project_dir, project_dirs['logs']),
'executable': cfg['executable'],
'jar': cfg['jar'],
'taxonomy': self.taxonomy,
'annotation_only': annotation_only,
'taxonomy': self.taxonomy
'accession_job_props': accession_properties_file,
'load_job_props': variant_load_properties_file,
'acc_import_job_props': accession_import_properties_file
}
self.run_nextflow('variant_load', load_config, resume)
self.run_nextflow('accession_and_load', accession_config, resume)

def run_remap_and_cluster_workflow(self, target_assembly, resume):
clustering_instance = self.eload_cfg.query(self.config_section, 'clustering', 'instance_id')
Expand Down
Loading

0 comments on commit 3d03aa4

Please sign in to comment.