From 1daad991ce15bb4a8f305c5ab65cdc6ef26cbd75 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 8 Jan 2024 23:40:35 +0000 Subject: [PATCH 1/4] New jobs to run only the variant load --- .../eva/pipeline/configuration/BeanNames.java | 2 + ...regatedVariantLoadVcfJobConfiguration.java | 85 ++++++++++++++++++ ...notypedVariantLoadVcfJobConfiguration.java | 87 +++++++++++++++++++ 3 files changed, 174 insertions(+) create mode 100644 src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java create mode 100644 src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java index d56a4fd1..f0afe4eb 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java @@ -62,9 +62,11 @@ public class BeanNames { public static final String ACCESSION_IMPORT_STEP = "accession-import-step"; public static final String AGGREGATED_VCF_JOB = "aggregated-vcf-job"; + public static final String AGGREGATED_VARIANT_LOAD_VCF_JOB = "aggregated-variant-load-vcf-job"; public static final String ANNOTATE_VARIANTS_JOB = "annotate-variants-job"; public static final String INIT_DATABASE_JOB = "init-database-job"; public static final String GENOTYPED_VCF_JOB = "genotyped-vcf-job"; + public static final String GENOTYPED_VARIANT_LOAD_VCF_JOB = "genotyped-variant-load-vcf-job"; public static final String CALCULATE_STATISTICS_JOB = "calculate-statistics-job"; public static final String DROP_STUDY_JOB = "drop-study-job"; public static final String ACCESSION_IMPORT_JOB = "accession-import-job"; diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java new file mode 100644 index 00000000..c17affad --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java @@ -0,0 +1,85 @@ +/* + * Copyright 2015-2017 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.ac.ebi.eva.pipeline.configuration.jobs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.job.builder.FlowJobBuilder; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.flow.Flow; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Scope; +import uk.ac.ebi.eva.pipeline.configuration.jobs.flows.AnnotationFlowOptionalConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadFileStepConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadVariantsStepConfiguration; +import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; +import uk.ac.ebi.eva.pipeline.parameters.validation.job.AggregatedVcfJobParametersValidator; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.*; + +/** + * Complete pipeline workflow for aggregated VCF. Aggregated statistics are provided in the VCF instead of the + * genotypes. + *

+ * load --> (optionalAnnotationFlow: variantsAnnotGenerateInput --> (annotationCreate --> annotationLoad)) + *

+ * Steps in () are optional + */ +@Configuration +@EnableBatchProcessing +@Import({LoadVariantsStepConfiguration.class, LoadFileStepConfiguration.class, AnnotationFlowOptionalConfiguration.class}) +public class AggregatedVariantLoadVcfJobConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(AggregatedVariantLoadVcfJobConfiguration.class); + + @Autowired + @Qualifier(VEP_ANNOTATION_OPTIONAL_FLOW) + private Flow annotationFlowOptional; + + @Autowired + @Qualifier(LOAD_VARIANTS_STEP) + private Step variantLoaderStep; + + @Autowired + @Qualifier(LOAD_FILE_STEP) + private Step loadFileStep; + + @Bean(AGGREGATED_VARIANT_LOAD_VCF_JOB) + @Scope("prototype") + public Job aggregatedVcfJob(JobBuilderFactory jobBuilderFactory) { + logger.debug("Building '" + AGGREGATED_VARIANT_LOAD_VCF_JOB + "'"); + + JobBuilder jobBuilder = jobBuilderFactory + .get(AGGREGATED_VARIANT_LOAD_VCF_JOB) + .incrementer(new NewJobIncrementer()) + .validator(new AggregatedVcfJobParametersValidator()); + FlowJobBuilder builder = jobBuilder + .flow(variantLoaderStep) + .next(loadFileStep) + .end(); + + return builder.build(); + } +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java new file mode 100644 index 00000000..dee69705 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java @@ -0,0 +1,87 @@ +/* + * Copyright 2015-2017 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.ac.ebi.eva.pipeline.configuration.jobs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.job.builder.FlowJobBuilder; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.job.flow.Flow; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Scope; +import uk.ac.ebi.eva.pipeline.configuration.jobs.flows.ParallelStatisticsAndAnnotationFlowConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadFileStepConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadVariantsStepConfiguration; +import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; +import uk.ac.ebi.eva.pipeline.parameters.validation.job.GenotypedVcfJobParametersValidator; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.*; + +/** + * Variant load pipeline workflow: + *

+ * |--> (optionalStatisticsFlow: statsCreate --> statsLoad) + * transform ---> load -+ + * |--> (optionalAnnotationFlow: variantsAnnotGenerateInput --> (annotationCreate --> annotationLoad)) + *

+ * Steps in () are optional + */ +@Configuration +@EnableBatchProcessing +@Import({LoadVariantsStepConfiguration.class, LoadFileStepConfiguration.class, ParallelStatisticsAndAnnotationFlowConfiguration.class}) +public class GenotypedVariantLoadVcfJobConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(GenotypedVariantLoadVcfJobConfiguration.class); + + @Autowired + @Qualifier(PARALLEL_STATISTICS_AND_ANNOTATION) + private Flow parallelStatisticsAndAnnotation; + + @Autowired + @Qualifier(LOAD_VARIANTS_STEP) + private Step variantLoaderStep; + + @Autowired + @Qualifier(LOAD_FILE_STEP) + private Step loadFileStep; + + @Bean(GENOTYPED_VARIANT_LOAD_VCF_JOB) + @Scope("prototype") + public Job genotypedVcfJob(JobBuilderFactory jobBuilderFactory) { + logger.debug("Building '" + GENOTYPED_VARIANT_LOAD_VCF_JOB + "'"); + + JobBuilder jobBuilder = jobBuilderFactory + .get(GENOTYPED_VARIANT_LOAD_VCF_JOB) + .incrementer(new NewJobIncrementer()) + .validator(new GenotypedVcfJobParametersValidator()); + FlowJobBuilder builder = jobBuilder + .flow(variantLoaderStep) + .next(loadFileStep) + .end(); + + return builder.build(); + } + +} From bf0662ac1f1d79a0b57fe418cde3edac5371747b Mon Sep 17 00:00:00 2001 From: tcezard Date: Tue, 9 Jan 2024 11:24:33 +0000 Subject: [PATCH 2/4] Add validators --- ...regatedVariantLoadVcfJobConfiguration.java | 4 +- ...notypedVariantLoadVcfJobConfiguration.java | 3 +- .../VariantLoadVcfJobParametersValidator.java | 52 +++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/VariantLoadVcfJobParametersValidator.java diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java index c17affad..b62953be 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java @@ -35,7 +35,7 @@ import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadFileStepConfiguration; import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadVariantsStepConfiguration; import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; -import uk.ac.ebi.eva.pipeline.parameters.validation.job.AggregatedVcfJobParametersValidator; +import uk.ac.ebi.eva.pipeline.parameters.validation.job.VariantLoadVcfJobParametersValidator; import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.*; @@ -74,7 +74,7 @@ public Job aggregatedVcfJob(JobBuilderFactory jobBuilderFactory) { JobBuilder jobBuilder = jobBuilderFactory .get(AGGREGATED_VARIANT_LOAD_VCF_JOB) .incrementer(new NewJobIncrementer()) - .validator(new AggregatedVcfJobParametersValidator()); + .validator(new VariantLoadVcfJobParametersValidator()); FlowJobBuilder builder = jobBuilder .flow(variantLoaderStep) .next(loadFileStep) diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java index dee69705..e772fec5 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java @@ -36,6 +36,7 @@ import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadVariantsStepConfiguration; import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; import uk.ac.ebi.eva.pipeline.parameters.validation.job.GenotypedVcfJobParametersValidator; +import uk.ac.ebi.eva.pipeline.parameters.validation.job.VariantLoadVcfJobParametersValidator; import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.*; @@ -75,7 +76,7 @@ public Job genotypedVcfJob(JobBuilderFactory jobBuilderFactory) { JobBuilder jobBuilder = jobBuilderFactory .get(GENOTYPED_VARIANT_LOAD_VCF_JOB) .incrementer(new NewJobIncrementer()) - .validator(new GenotypedVcfJobParametersValidator()); + .validator(new VariantLoadVcfJobParametersValidator()); FlowJobBuilder builder = jobBuilder .flow(variantLoaderStep) .next(loadFileStep) diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/VariantLoadVcfJobParametersValidator.java b/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/VariantLoadVcfJobParametersValidator.java new file mode 100644 index 00000000..3e72f3c9 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/VariantLoadVcfJobParametersValidator.java @@ -0,0 +1,52 @@ +/* + * Copyright 2017 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.parameters.validation.job; + +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.JobParametersValidator; +import org.springframework.batch.core.job.CompositeJobParametersValidator; +import org.springframework.batch.core.job.DefaultJobParametersValidator; +import uk.ac.ebi.eva.pipeline.configuration.jobs.GenotypedVariantLoadVcfJobConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.AggregatedVcfJobConfiguration; +import uk.ac.ebi.eva.pipeline.parameters.validation.step.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Validates the job parameters necessary to execute an {@link GenotypedVariantLoadVcfJobConfiguration} + * or an {@link AggregatedVcfJobConfiguration} + */ +public class VariantLoadVcfJobParametersValidator extends DefaultJobParametersValidator { + + @Override + public void validate(JobParameters parameters) throws JobParametersInvalidException { + compositeJobParametersValidator(parameters).validate(parameters); + } + + private CompositeJobParametersValidator compositeJobParametersValidator(JobParameters jobParameters) { + List jobParametersValidators = new ArrayList<>(); + + jobParametersValidators.add(new LoadVariantsStepParametersValidator()); + jobParametersValidators.add(new LoadFileStepParametersValidator()); + + CompositeJobParametersValidator compositeJobParametersValidator = new CompositeJobParametersValidator(); + compositeJobParametersValidator.setValidators(jobParametersValidators); + return compositeJobParametersValidator; + } + +} From bf9e16d1570f185955c07f61062e89e605075e9b Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Thu, 11 Jan 2024 23:29:34 +0000 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: April Shen --- .../uk/ac/ebi/eva/pipeline/configuration/BeanNames.java | 2 +- .../jobs/AggregatedVariantLoadVcfJobConfiguration.java | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java index f0afe4eb..8e5a4448 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java @@ -62,7 +62,7 @@ public class BeanNames { public static final String ACCESSION_IMPORT_STEP = "accession-import-step"; public static final String AGGREGATED_VCF_JOB = "aggregated-vcf-job"; - public static final String AGGREGATED_VARIANT_LOAD_VCF_JOB = "aggregated-variant-load-vcf-job"; + public static final String AGGREGATED_VCF_LOAD_JOB = "aggregated-vcf-load-job"; public static final String ANNOTATE_VARIANTS_JOB = "annotate-variants-job"; public static final String INIT_DATABASE_JOB = "init-database-job"; public static final String GENOTYPED_VCF_JOB = "genotyped-vcf-job"; diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java index b62953be..7f2bc9e3 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java @@ -49,15 +49,11 @@ */ @Configuration @EnableBatchProcessing -@Import({LoadVariantsStepConfiguration.class, LoadFileStepConfiguration.class, AnnotationFlowOptionalConfiguration.class}) +@Import({LoadVariantsStepConfiguration.class, LoadFileStepConfiguration.class}) public class AggregatedVariantLoadVcfJobConfiguration { private static final Logger logger = LoggerFactory.getLogger(AggregatedVariantLoadVcfJobConfiguration.class); - @Autowired - @Qualifier(VEP_ANNOTATION_OPTIONAL_FLOW) - private Flow annotationFlowOptional; - @Autowired @Qualifier(LOAD_VARIANTS_STEP) private Step variantLoaderStep; From b4478b79f6dd865b8847c870ea02c976083dec89 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 15 Jan 2024 10:17:53 +0000 Subject: [PATCH 4/4] Simplify and rename the variant load job and add test --- .../eva/pipeline/configuration/BeanNames.java | 3 +- ...notypedVariantLoadVcfJobConfiguration.java | 88 ---------- ...tion.java => LoadVcfJobConfiguration.java} | 28 +-- ...ava => LoadVcfJobParametersValidator.java} | 8 +- .../configuration/jobs/LoadVcfJobTest.java | 164 ++++++++++++++++++ 5 files changed, 182 insertions(+), 109 deletions(-) delete mode 100644 src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java rename src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/{AggregatedVariantLoadVcfJobConfiguration.java => LoadVcfJobConfiguration.java} (72%) rename src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/{VariantLoadVcfJobParametersValidator.java => LoadVcfJobParametersValidator.java} (82%) create mode 100644 src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobTest.java diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java index 8e5a4448..8e7c7e84 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java @@ -62,11 +62,10 @@ public class BeanNames { public static final String ACCESSION_IMPORT_STEP = "accession-import-step"; public static final String AGGREGATED_VCF_JOB = "aggregated-vcf-job"; - public static final String AGGREGATED_VCF_LOAD_JOB = "aggregated-vcf-load-job"; public static final String ANNOTATE_VARIANTS_JOB = "annotate-variants-job"; public static final String INIT_DATABASE_JOB = "init-database-job"; public static final String GENOTYPED_VCF_JOB = "genotyped-vcf-job"; - public static final String GENOTYPED_VARIANT_LOAD_VCF_JOB = "genotyped-variant-load-vcf-job"; + public static final String LOAD_VCF_JOB = "load-vcf-job"; public static final String CALCULATE_STATISTICS_JOB = "calculate-statistics-job"; public static final String DROP_STUDY_JOB = "drop-study-job"; public static final String ACCESSION_IMPORT_JOB = "accession-import-job"; diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java deleted file mode 100644 index e772fec5..00000000 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/GenotypedVariantLoadVcfJobConfiguration.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2015-2017 EMBL - European Bioinformatics Institute - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package uk.ac.ebi.eva.pipeline.configuration.jobs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.Step; -import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; -import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; -import org.springframework.batch.core.job.builder.FlowJobBuilder; -import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.job.flow.Flow; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; -import org.springframework.context.annotation.Scope; -import uk.ac.ebi.eva.pipeline.configuration.jobs.flows.ParallelStatisticsAndAnnotationFlowConfiguration; -import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadFileStepConfiguration; -import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadVariantsStepConfiguration; -import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; -import uk.ac.ebi.eva.pipeline.parameters.validation.job.GenotypedVcfJobParametersValidator; -import uk.ac.ebi.eva.pipeline.parameters.validation.job.VariantLoadVcfJobParametersValidator; - -import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.*; - -/** - * Variant load pipeline workflow: - *

- * |--> (optionalStatisticsFlow: statsCreate --> statsLoad) - * transform ---> load -+ - * |--> (optionalAnnotationFlow: variantsAnnotGenerateInput --> (annotationCreate --> annotationLoad)) - *

- * Steps in () are optional - */ -@Configuration -@EnableBatchProcessing -@Import({LoadVariantsStepConfiguration.class, LoadFileStepConfiguration.class, ParallelStatisticsAndAnnotationFlowConfiguration.class}) -public class GenotypedVariantLoadVcfJobConfiguration { - - private static final Logger logger = LoggerFactory.getLogger(GenotypedVariantLoadVcfJobConfiguration.class); - - @Autowired - @Qualifier(PARALLEL_STATISTICS_AND_ANNOTATION) - private Flow parallelStatisticsAndAnnotation; - - @Autowired - @Qualifier(LOAD_VARIANTS_STEP) - private Step variantLoaderStep; - - @Autowired - @Qualifier(LOAD_FILE_STEP) - private Step loadFileStep; - - @Bean(GENOTYPED_VARIANT_LOAD_VCF_JOB) - @Scope("prototype") - public Job genotypedVcfJob(JobBuilderFactory jobBuilderFactory) { - logger.debug("Building '" + GENOTYPED_VARIANT_LOAD_VCF_JOB + "'"); - - JobBuilder jobBuilder = jobBuilderFactory - .get(GENOTYPED_VARIANT_LOAD_VCF_JOB) - .incrementer(new NewJobIncrementer()) - .validator(new VariantLoadVcfJobParametersValidator()); - FlowJobBuilder builder = jobBuilder - .flow(variantLoaderStep) - .next(loadFileStep) - .end(); - - return builder.build(); - } - -} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobConfiguration.java similarity index 72% rename from src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java rename to src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobConfiguration.java index 7f2bc9e3..1cabcf3a 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/AggregatedVariantLoadVcfJobConfiguration.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobConfiguration.java @@ -24,35 +24,34 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.job.builder.FlowJobBuilder; import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.job.flow.Flow; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Scope; -import uk.ac.ebi.eva.pipeline.configuration.jobs.flows.AnnotationFlowOptionalConfiguration; import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadFileStepConfiguration; import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadVariantsStepConfiguration; import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; -import uk.ac.ebi.eva.pipeline.parameters.validation.job.VariantLoadVcfJobParametersValidator; +import uk.ac.ebi.eva.pipeline.parameters.validation.job.LoadVcfJobParametersValidator; import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.*; /** - * Complete pipeline workflow for aggregated VCF. Aggregated statistics are provided in the VCF instead of the - * genotypes. + * Variant load pipeline workflow: *

- * load --> (optionalAnnotationFlow: variantsAnnotGenerateInput --> (annotationCreate --> annotationLoad)) + * | + * transform ---> load -+ + * | *

- * Steps in () are optional + * */ @Configuration @EnableBatchProcessing @Import({LoadVariantsStepConfiguration.class, LoadFileStepConfiguration.class}) -public class AggregatedVariantLoadVcfJobConfiguration { +public class LoadVcfJobConfiguration { - private static final Logger logger = LoggerFactory.getLogger(AggregatedVariantLoadVcfJobConfiguration.class); + private static final Logger logger = LoggerFactory.getLogger(LoadVcfJobConfiguration.class); @Autowired @Qualifier(LOAD_VARIANTS_STEP) @@ -62,15 +61,15 @@ public class AggregatedVariantLoadVcfJobConfiguration { @Qualifier(LOAD_FILE_STEP) private Step loadFileStep; - @Bean(AGGREGATED_VARIANT_LOAD_VCF_JOB) + @Bean(LOAD_VCF_JOB) @Scope("prototype") - public Job aggregatedVcfJob(JobBuilderFactory jobBuilderFactory) { - logger.debug("Building '" + AGGREGATED_VARIANT_LOAD_VCF_JOB + "'"); + public Job genotypedVcfJob(JobBuilderFactory jobBuilderFactory) { + logger.debug("Building '" + LOAD_VCF_JOB + "'"); JobBuilder jobBuilder = jobBuilderFactory - .get(AGGREGATED_VARIANT_LOAD_VCF_JOB) + .get(LOAD_VCF_JOB) .incrementer(new NewJobIncrementer()) - .validator(new VariantLoadVcfJobParametersValidator()); + .validator(new LoadVcfJobParametersValidator()); FlowJobBuilder builder = jobBuilder .flow(variantLoaderStep) .next(loadFileStep) @@ -78,4 +77,5 @@ public Job aggregatedVcfJob(JobBuilderFactory jobBuilderFactory) { return builder.build(); } + } diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/VariantLoadVcfJobParametersValidator.java b/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/LoadVcfJobParametersValidator.java similarity index 82% rename from src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/VariantLoadVcfJobParametersValidator.java rename to src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/LoadVcfJobParametersValidator.java index 3e72f3c9..d82a7504 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/VariantLoadVcfJobParametersValidator.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/LoadVcfJobParametersValidator.java @@ -20,18 +20,16 @@ import org.springframework.batch.core.JobParametersValidator; import org.springframework.batch.core.job.CompositeJobParametersValidator; import org.springframework.batch.core.job.DefaultJobParametersValidator; -import uk.ac.ebi.eva.pipeline.configuration.jobs.GenotypedVariantLoadVcfJobConfiguration; -import uk.ac.ebi.eva.pipeline.configuration.jobs.AggregatedVcfJobConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.LoadVcfJobConfiguration; import uk.ac.ebi.eva.pipeline.parameters.validation.step.*; import java.util.ArrayList; import java.util.List; /** - * Validates the job parameters necessary to execute an {@link GenotypedVariantLoadVcfJobConfiguration} - * or an {@link AggregatedVcfJobConfiguration} + * Validates the job parameters necessary to execute an {@link LoadVcfJobConfiguration} */ -public class VariantLoadVcfJobParametersValidator extends DefaultJobParametersValidator { +public class LoadVcfJobParametersValidator extends DefaultJobParametersValidator { @Override public void validate(JobParameters parameters) throws JobParametersInvalidException { diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobTest.java new file mode 100644 index 00000000..7246f343 --- /dev/null +++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobTest.java @@ -0,0 +1,164 @@ +/* + * Copyright 2015-2017 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs; + +import org.junit.*; +import org.junit.runner.RunWith; +import org.opencb.biodata.models.variant.Variant; +import org.opencb.datastore.core.QueryOptions; +import org.opencb.opencga.lib.common.Config; +import org.opencb.opencga.storage.core.StorageManagerFactory; +import org.opencb.opencga.storage.core.variant.VariantStorageManager; +import org.opencb.opencga.storage.core.variant.adaptors.VariantDBAdaptor; +import org.opencb.opencga.storage.core.variant.adaptors.VariantDBIterator; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import uk.ac.ebi.eva.pipeline.Application; +import uk.ac.ebi.eva.pipeline.configuration.BeanNames; +import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration; +import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration; +import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule; +import uk.ac.ebi.eva.test.rules.TemporaryMongoRule; +import uk.ac.ebi.eva.test.utils.GenotypedVcfJobTestUtils; +import uk.ac.ebi.eva.test.utils.JobTestUtils; +import uk.ac.ebi.eva.utils.EvaJobParameterBuilder; + +import java.io.File; +import java.io.FileInputStream; +import java.util.*; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static uk.ac.ebi.eva.test.utils.GenotypedVcfJobTestUtils.COLLECTION_ANNOTATIONS_NAME; +import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted; +import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertFailed; +import static uk.ac.ebi.eva.utils.FileUtils.getResource; + +/** + * Test for {@link LoadVcfJobConfiguration} + */ + +@RunWith(SpringRunner.class) +@ActiveProfiles({Application.VARIANT_WRITER_MONGO_PROFILE, Application.VARIANT_ANNOTATION_MONGO_PROFILE}) +@TestPropertySource({"classpath:variant-aggregated.properties", "classpath:test-mongo.properties"}) +@ContextConfiguration(classes = {LoadVcfJobConfiguration.class, BatchTestConfiguration.class, TemporaryRuleConfiguration.class}) +public class LoadVcfJobTest { + public static final String INPUT = "/input-files/vcf/aggregated.vcf.gz"; + + private static final String COLLECTION_VARIANTS_NAME = "variants"; + + private static final String COLLECTION_FILES_NAME = "files"; + + @Autowired + @Rule + public TemporaryMongoRule mongoRule; + + @Rule + public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule(); + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + public static final Set EXPECTED_REQUIRED_STEP_NAMES = new TreeSet<>( + Arrays.asList(BeanNames.LOAD_VARIANTS_STEP, BeanNames.LOAD_FILE_STEP)); + + @Before + public void setUp() throws Exception { + Config.setOpenCGAHome(GenotypedVcfJobTestUtils.getDefaultOpencgaHome()); + } + + @Test + public void aggregatedLoadVcf() throws Exception { + String dbName = mongoRule.getRandomTemporaryDatabaseName(); + + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionFilesName(COLLECTION_FILES_NAME) + .collectionVariantsName(COLLECTION_VARIANTS_NAME) + .databaseName(dbName) + .inputStudyId("aggregated-job") + .inputStudyName("inputStudyName") + .inputStudyType("COLLECTION") + .inputVcf(getResource(INPUT).getAbsolutePath()) + .inputVcfAggregation("BASIC") + .inputVcfId("1") + .timestamp() + .toJobParameters(); + JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters); + + // check execution flow + assertCompleted(jobExecution); + + Collection stepExecutions = jobExecution.getStepExecutions(); + Set names = stepExecutions.stream().map(StepExecution::getStepName) + .collect(Collectors.toSet()); + + assertEquals(EXPECTED_REQUIRED_STEP_NAMES, names); + + StepExecution lastRequiredStep = new ArrayList<>(stepExecutions).get(EXPECTED_REQUIRED_STEP_NAMES.size() - 1); + assertEquals(BeanNames.LOAD_FILE_STEP, lastRequiredStep.getStepName()); + + // check ((documents in DB) == (lines in file)) + VariantStorageManager variantStorageManager = StorageManagerFactory.getVariantStorageManager(); + VariantDBAdaptor variantDBAdaptor = variantStorageManager.getDBAdaptor(dbName, null); + VariantDBIterator iterator = variantDBAdaptor.iterator(new QueryOptions()); + + File file = getResource(INPUT); + long lines = JobTestUtils.getLines(new GZIPInputStream(new FileInputStream(file))); + Assert.assertEquals(lines, JobTestUtils.count(iterator)); + + // check that stats are loaded properly + Variant variant = variantDBAdaptor.iterator(new QueryOptions()).next(); + assertFalse(variant.getSourceEntries().values().iterator().next().getCohortStats().isEmpty()); + } + + @Test + public void GenotypedLoadVcfJob() throws Exception { + File inputFile = GenotypedVcfJobTestUtils.getInputFile(); + String databaseName = mongoRule.getRandomTemporaryDatabaseName(); + File fasta = temporaryFolderRule.newFile(); + + // Run the Job + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionFilesName(GenotypedVcfJobTestUtils.COLLECTION_FILES_NAME) + .collectionVariantsName(GenotypedVcfJobTestUtils.COLLECTION_VARIANTS_NAME) + .databaseName(databaseName) + .inputFasta(fasta.getAbsolutePath()) + .inputStudyId(GenotypedVcfJobTestUtils.INPUT_STUDY_ID) + .inputStudyName("inputStudyName") + .inputStudyType("COLLECTION") + .inputVcf(inputFile.getAbsolutePath()) + .inputVcfAggregation("NONE") + .inputVcfId(GenotypedVcfJobTestUtils.INPUT_VCF_ID) + .toJobParameters(); + + JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters); + + assertCompleted(jobExecution); + + GenotypedVcfJobTestUtils.checkLoadStep(mongoRule, databaseName); + + } + +}