diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java index d56a4fd10..8e7c7e843 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java @@ -65,6 +65,7 @@ public class BeanNames { public static final String ANNOTATE_VARIANTS_JOB = "annotate-variants-job"; public static final String INIT_DATABASE_JOB = "init-database-job"; public static final String GENOTYPED_VCF_JOB = "genotyped-vcf-job"; + public static final String LOAD_VCF_JOB = "load-vcf-job"; public static final String CALCULATE_STATISTICS_JOB = "calculate-statistics-job"; public static final String DROP_STUDY_JOB = "drop-study-job"; public static final String ACCESSION_IMPORT_JOB = "accession-import-job"; diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobConfiguration.java new file mode 100644 index 000000000..1cabcf3a3 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobConfiguration.java @@ -0,0 +1,81 @@ +/* + * Copyright 2015-2017 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.ac.ebi.eva.pipeline.configuration.jobs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.job.builder.FlowJobBuilder; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Scope; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadFileStepConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadVariantsStepConfiguration; +import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; +import uk.ac.ebi.eva.pipeline.parameters.validation.job.LoadVcfJobParametersValidator; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.*; + +/** + * Variant load pipeline workflow: + *

+ * | + * transform ---> load -+ + * | + *

+ * + */ +@Configuration +@EnableBatchProcessing +@Import({LoadVariantsStepConfiguration.class, LoadFileStepConfiguration.class}) +public class LoadVcfJobConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(LoadVcfJobConfiguration.class); + + @Autowired + @Qualifier(LOAD_VARIANTS_STEP) + private Step variantLoaderStep; + + @Autowired + @Qualifier(LOAD_FILE_STEP) + private Step loadFileStep; + + @Bean(LOAD_VCF_JOB) + @Scope("prototype") + public Job genotypedVcfJob(JobBuilderFactory jobBuilderFactory) { + logger.debug("Building '" + LOAD_VCF_JOB + "'"); + + JobBuilder jobBuilder = jobBuilderFactory + .get(LOAD_VCF_JOB) + .incrementer(new NewJobIncrementer()) + .validator(new LoadVcfJobParametersValidator()); + FlowJobBuilder builder = jobBuilder + .flow(variantLoaderStep) + .next(loadFileStep) + .end(); + + return builder.build(); + } + +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/LoadVcfJobParametersValidator.java b/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/LoadVcfJobParametersValidator.java new file mode 100644 index 000000000..d82a75048 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/parameters/validation/job/LoadVcfJobParametersValidator.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.parameters.validation.job; + +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.JobParametersValidator; +import org.springframework.batch.core.job.CompositeJobParametersValidator; +import org.springframework.batch.core.job.DefaultJobParametersValidator; +import uk.ac.ebi.eva.pipeline.configuration.jobs.LoadVcfJobConfiguration; +import uk.ac.ebi.eva.pipeline.parameters.validation.step.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Validates the job parameters necessary to execute an {@link LoadVcfJobConfiguration} + */ +public class LoadVcfJobParametersValidator extends DefaultJobParametersValidator { + + @Override + public void validate(JobParameters parameters) throws JobParametersInvalidException { + compositeJobParametersValidator(parameters).validate(parameters); + } + + private CompositeJobParametersValidator compositeJobParametersValidator(JobParameters jobParameters) { + List jobParametersValidators = new ArrayList<>(); + + jobParametersValidators.add(new LoadVariantsStepParametersValidator()); + jobParametersValidators.add(new LoadFileStepParametersValidator()); + + CompositeJobParametersValidator compositeJobParametersValidator = new CompositeJobParametersValidator(); + compositeJobParametersValidator.setValidators(jobParametersValidators); + return compositeJobParametersValidator; + } + +} diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobTest.java new file mode 100644 index 000000000..7246f343c --- /dev/null +++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/LoadVcfJobTest.java @@ -0,0 +1,164 @@ +/* + * Copyright 2015-2017 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs; + +import org.junit.*; +import org.junit.runner.RunWith; +import org.opencb.biodata.models.variant.Variant; +import org.opencb.datastore.core.QueryOptions; +import org.opencb.opencga.lib.common.Config; +import org.opencb.opencga.storage.core.StorageManagerFactory; +import org.opencb.opencga.storage.core.variant.VariantStorageManager; +import org.opencb.opencga.storage.core.variant.adaptors.VariantDBAdaptor; +import org.opencb.opencga.storage.core.variant.adaptors.VariantDBIterator; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import uk.ac.ebi.eva.pipeline.Application; +import uk.ac.ebi.eva.pipeline.configuration.BeanNames; +import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration; +import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration; +import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule; +import uk.ac.ebi.eva.test.rules.TemporaryMongoRule; +import uk.ac.ebi.eva.test.utils.GenotypedVcfJobTestUtils; +import uk.ac.ebi.eva.test.utils.JobTestUtils; +import uk.ac.ebi.eva.utils.EvaJobParameterBuilder; + +import java.io.File; +import java.io.FileInputStream; +import java.util.*; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static uk.ac.ebi.eva.test.utils.GenotypedVcfJobTestUtils.COLLECTION_ANNOTATIONS_NAME; +import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted; +import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertFailed; +import static uk.ac.ebi.eva.utils.FileUtils.getResource; + +/** + * Test for {@link LoadVcfJobConfiguration} + */ + +@RunWith(SpringRunner.class) +@ActiveProfiles({Application.VARIANT_WRITER_MONGO_PROFILE, Application.VARIANT_ANNOTATION_MONGO_PROFILE}) +@TestPropertySource({"classpath:variant-aggregated.properties", "classpath:test-mongo.properties"}) +@ContextConfiguration(classes = {LoadVcfJobConfiguration.class, BatchTestConfiguration.class, TemporaryRuleConfiguration.class}) +public class LoadVcfJobTest { + public static final String INPUT = "/input-files/vcf/aggregated.vcf.gz"; + + private static final String COLLECTION_VARIANTS_NAME = "variants"; + + private static final String COLLECTION_FILES_NAME = "files"; + + @Autowired + @Rule + public TemporaryMongoRule mongoRule; + + @Rule + public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule(); + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + public static final Set EXPECTED_REQUIRED_STEP_NAMES = new TreeSet<>( + Arrays.asList(BeanNames.LOAD_VARIANTS_STEP, BeanNames.LOAD_FILE_STEP)); + + @Before + public void setUp() throws Exception { + Config.setOpenCGAHome(GenotypedVcfJobTestUtils.getDefaultOpencgaHome()); + } + + @Test + public void aggregatedLoadVcf() throws Exception { + String dbName = mongoRule.getRandomTemporaryDatabaseName(); + + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionFilesName(COLLECTION_FILES_NAME) + .collectionVariantsName(COLLECTION_VARIANTS_NAME) + .databaseName(dbName) + .inputStudyId("aggregated-job") + .inputStudyName("inputStudyName") + .inputStudyType("COLLECTION") + .inputVcf(getResource(INPUT).getAbsolutePath()) + .inputVcfAggregation("BASIC") + .inputVcfId("1") + .timestamp() + .toJobParameters(); + JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters); + + // check execution flow + assertCompleted(jobExecution); + + Collection stepExecutions = jobExecution.getStepExecutions(); + Set names = stepExecutions.stream().map(StepExecution::getStepName) + .collect(Collectors.toSet()); + + assertEquals(EXPECTED_REQUIRED_STEP_NAMES, names); + + StepExecution lastRequiredStep = new ArrayList<>(stepExecutions).get(EXPECTED_REQUIRED_STEP_NAMES.size() - 1); + assertEquals(BeanNames.LOAD_FILE_STEP, lastRequiredStep.getStepName()); + + // check ((documents in DB) == (lines in file)) + VariantStorageManager variantStorageManager = StorageManagerFactory.getVariantStorageManager(); + VariantDBAdaptor variantDBAdaptor = variantStorageManager.getDBAdaptor(dbName, null); + VariantDBIterator iterator = variantDBAdaptor.iterator(new QueryOptions()); + + File file = getResource(INPUT); + long lines = JobTestUtils.getLines(new GZIPInputStream(new FileInputStream(file))); + Assert.assertEquals(lines, JobTestUtils.count(iterator)); + + // check that stats are loaded properly + Variant variant = variantDBAdaptor.iterator(new QueryOptions()).next(); + assertFalse(variant.getSourceEntries().values().iterator().next().getCohortStats().isEmpty()); + } + + @Test + public void GenotypedLoadVcfJob() throws Exception { + File inputFile = GenotypedVcfJobTestUtils.getInputFile(); + String databaseName = mongoRule.getRandomTemporaryDatabaseName(); + File fasta = temporaryFolderRule.newFile(); + + // Run the Job + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionFilesName(GenotypedVcfJobTestUtils.COLLECTION_FILES_NAME) + .collectionVariantsName(GenotypedVcfJobTestUtils.COLLECTION_VARIANTS_NAME) + .databaseName(databaseName) + .inputFasta(fasta.getAbsolutePath()) + .inputStudyId(GenotypedVcfJobTestUtils.INPUT_STUDY_ID) + .inputStudyName("inputStudyName") + .inputStudyType("COLLECTION") + .inputVcf(inputFile.getAbsolutePath()) + .inputVcfAggregation("NONE") + .inputVcfId(GenotypedVcfJobTestUtils.INPUT_VCF_ID) + .toJobParameters(); + + JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters); + + assertCompleted(jobExecution); + + GenotypedVcfJobTestUtils.checkLoadStep(mongoRule, databaseName); + + } + +}