diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java index d36a4b86..3d831c43 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java @@ -32,6 +32,7 @@ public class BeanNames { public static final String ANNOTATION_PARSER_PROCESSOR = "annotation-parser-processor"; public static final String ANNOTATION_COMPOSITE_PROCESSOR = "annotation-composite-processor"; public static final String VARIANT_STATS_PROCESSOR = "variant-stats-processor"; + public static final String COMPOSITE_VARIANT_PROCESSOR = "composite-variant-processor"; public static final String GENE_WRITER = "gene-writer"; public static final String ANNOTATION_WRITER = "annotation-writer"; diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/LoadVariantsStepConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/LoadVariantsStepConfiguration.java index 4fdc277c..2f3771a1 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/LoadVariantsStepConfiguration.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/LoadVariantsStepConfiguration.java @@ -20,6 +20,7 @@ import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; @@ -33,14 +34,15 @@ import uk.ac.ebi.eva.pipeline.configuration.ChunkSizeCompletionPolicyConfiguration; import uk.ac.ebi.eva.pipeline.configuration.io.readers.VcfReaderConfiguration; import uk.ac.ebi.eva.pipeline.configuration.io.writers.VariantWriterConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors.VariantProcessorConfiguration; import uk.ac.ebi.eva.pipeline.configuration.policies.InvalidVariantSkipPolicyConfiguration; -import uk.ac.ebi.eva.pipeline.jobs.steps.processors.VariantNoAlternateFilterProcessor; import uk.ac.ebi.eva.pipeline.listeners.SkippedItemListener; import uk.ac.ebi.eva.pipeline.listeners.StepProgressListener; import uk.ac.ebi.eva.pipeline.listeners.VariantLoaderStepStatisticsListener; import uk.ac.ebi.eva.pipeline.parameters.JobOptions; import uk.ac.ebi.eva.pipeline.policies.InvalidVariantSkipPolicy; +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.COMPOSITE_VARIANT_PROCESSOR; import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.LOAD_VARIANTS_STEP; import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_READER; import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_WRITER; @@ -53,8 +55,8 @@ */ @Configuration @EnableBatchProcessing -@Import({VcfReaderConfiguration.class, VariantWriterConfiguration.class, ChunkSizeCompletionPolicyConfiguration.class - , InvalidVariantSkipPolicyConfiguration.class}) +@Import({VcfReaderConfiguration.class, VariantProcessorConfiguration.class, VariantWriterConfiguration.class, + ChunkSizeCompletionPolicyConfiguration.class, InvalidVariantSkipPolicyConfiguration.class}) public class LoadVariantsStepConfiguration { private static final Logger logger = LoggerFactory.getLogger(LoadVariantsStepConfiguration.class); @@ -67,6 +69,10 @@ public class LoadVariantsStepConfiguration { @Qualifier(VARIANT_WRITER) private ItemWriter variantWriter; + @Autowired + @Qualifier(COMPOSITE_VARIANT_PROCESSOR) + private ItemProcessor variantProcessor; + @Autowired private InvalidVariantSkipPolicy invalidVariantSkipPolicy; @@ -78,7 +84,7 @@ public Step loadVariantsStep(StepBuilderFactory stepBuilderFactory, JobOptions j return stepBuilderFactory.get(LOAD_VARIANTS_STEP) .chunk(chunkSizeCompletionPolicy) .reader(reader) - .processor(new VariantNoAlternateFilterProcessor()) + .processor(variantProcessor) .writer(variantWriter) .faultTolerant() .skipPolicy(invalidVariantSkipPolicy) diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantProcessorConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantProcessorConfiguration.java new file mode 100644 index 00000000..80e73588 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantProcessorConfiguration.java @@ -0,0 +1,59 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.support.CompositeItemProcessor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import uk.ac.ebi.eva.commons.models.data.Variant; +import uk.ac.ebi.eva.pipeline.jobs.steps.processors.ExcludeStructuralVariantsProcessor; +import uk.ac.ebi.eva.pipeline.jobs.steps.processors.VariantNoAlternateFilterProcessor; + +import java.util.Arrays; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.COMPOSITE_VARIANT_PROCESSOR; + + +/** + * Configuration to inject a VariantProcessor as a bean. + */ +@Configuration +public class VariantProcessorConfiguration { + @Bean(COMPOSITE_VARIANT_PROCESSOR) + @StepScope + public ItemProcessor compositeVariantProcessor( + VariantNoAlternateFilterProcessor variantNoAlternateFilterProcessor, + ExcludeStructuralVariantsProcessor excludeStructuralVariantsProcessor) { + CompositeItemProcessor compositeProcessor = new CompositeItemProcessor<>(); + compositeProcessor.setDelegates(Arrays.asList(variantNoAlternateFilterProcessor, + excludeStructuralVariantsProcessor)); + + return compositeProcessor; + } + + @Bean + public ExcludeStructuralVariantsProcessor excludeStructuralVariantsProcessor() { + return new ExcludeStructuralVariantsProcessor(); + } + + @Bean + public VariantNoAlternateFilterProcessor variantNoAlternateFilterProcessor() { + return new VariantNoAlternateFilterProcessor(); + } +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/processors/ExcludeStructuralVariantsProcessor.java b/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/processors/ExcludeStructuralVariantsProcessor.java new file mode 100644 index 00000000..20a9ba31 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/processors/ExcludeStructuralVariantsProcessor.java @@ -0,0 +1,99 @@ +/* + * + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package uk.ac.ebi.eva.pipeline.jobs.steps.processors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ItemProcessor; +import uk.ac.ebi.eva.commons.models.data.Variant; + +import java.text.MessageFormat; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Direct implementation of VCF specification grammars from + * here and + * here. + */ +public class ExcludeStructuralVariantsProcessor implements ItemProcessor { + + private static final Logger logger = LoggerFactory.getLogger(ExcludeStructuralVariantsProcessor.class); + + private static String meta_contig_char = "(\\p{Alnum}|[\\p{Punct}&&[^:<>\\[\\]*=,]])"; + + private static String chromBasicRegEx = MessageFormat.format("([{0}&&[^#]][{0}]*)", meta_contig_char); + + private static String chromContigRegEx = String.format("(<%s>)", chromBasicRegEx); + + private static String chromosomeRegEx = String.format("(%s|%s)", chromBasicRegEx, chromContigRegEx); + + private static String positionRegEx = "([\\p{Digit}]+)"; + + private static String basesRegEx = "([ACTGNactgn]+)"; + + private static String altIDRegEx_positive_match = "([\\p{Alnum}|[\\p{Punct}&&[^,<>]]]+)"; + + private static String altIDRegEx_negative_match = "([\\p{Punct}]+)"; + + private static String altIDRegEx = String.format("((?!%s)%s)", altIDRegEx_negative_match, + altIDRegEx_positive_match); + + private static String stdPrefixRegEx = MessageFormat.format( + "|||||||", "(\\p{Alnum})+"); + + private static String altIndelRegEx = String.format("(%s|\\*)", stdPrefixRegEx); + + private static String altOtherRegEx = String.format("((?!%s)%s)", stdPrefixRegEx, + String.format("<%s>", altIDRegEx)); + + /** + * See VCF specification grammar + */ + private static String altSVRegEx = String.join("|", + String.format("(\\]%s:%s\\]%s)", chromosomeRegEx, positionRegEx, + basesRegEx), + String.format("(\\[%s:%s\\[%s)", chromosomeRegEx, positionRegEx, + basesRegEx), + String.format("(%s\\]%s:%s\\])", basesRegEx, chromosomeRegEx, + positionRegEx), + String.format("(%s\\[%s:%s\\[)", basesRegEx, chromosomeRegEx, + positionRegEx), + String.format("(\\.%s)", basesRegEx), + String.format("(%s\\.)", basesRegEx)); + + private static String altGVCFRegEx = "(<\\*>)"; + + /** + * See VCF specification grammar + */ + private static String STRUCTURAL_VARIANT_REGEX = String.format("^(%s|%s|%s|%s)$", altIndelRegEx, altSVRegEx, + altGVCFRegEx, altOtherRegEx); + + private static final Pattern STRUCTURAL_VARIANT_PATTERN = Pattern.compile(STRUCTURAL_VARIANT_REGEX); + + @Override + public Variant process(Variant variant) { + Matcher matcher = STRUCTURAL_VARIANT_PATTERN.matcher(variant.getAlternate()); + if (matcher.matches()) { + logger.info("Skipped processing structural variant " + variant); + return null; + } + return variant; + } +} diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/GenotypedVcfTestSkipStructuralVariant.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/GenotypedVcfTestSkipStructuralVariant.java new file mode 100644 index 00000000..826e1f9a --- /dev/null +++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/GenotypedVcfTestSkipStructuralVariant.java @@ -0,0 +1,111 @@ +package uk.ac.ebi.eva.pipeline.configuration.jobs.steps; + +import com.mongodb.client.model.Filters; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import uk.ac.ebi.eva.pipeline.Application; +import uk.ac.ebi.eva.pipeline.configuration.BeanNames; +import uk.ac.ebi.eva.pipeline.configuration.jobs.GenotypedVcfJobConfiguration; +import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration; +import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration; +import uk.ac.ebi.eva.test.rules.TemporaryMongoRule; +import uk.ac.ebi.eva.utils.EvaJobParameterBuilder; + +import static org.junit.Assert.assertEquals; +import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted; +import static uk.ac.ebi.eva.utils.FileUtils.getResource; + + +@RunWith(SpringRunner.class) +@ActiveProfiles({Application.VARIANT_WRITER_MONGO_PROFILE, Application.VARIANT_ANNOTATION_MONGO_PROFILE}) +@TestPropertySource({"classpath:common-configuration.properties", "classpath:test-mongo.properties"}) +@ContextConfiguration(classes = {GenotypedVcfJobConfiguration.class, BatchTestConfiguration.class, TemporaryRuleConfiguration.class}) +public class GenotypedVcfTestSkipStructuralVariant { + private static final String SMALL_STRUCTURAL_VARIANTS_VCF_FILE = "/input-files/vcf/small_structural_variant.vcf.gz"; + + private static final String SMALL_STRUCTURAL_VARIANTS_VCF_FILE_REF_ALT_STARTS_WITH_SAME_ALLELE = "/input-files/vcf/small_invalid_variant.vcf.gz"; + + private static final String COLLECTION_VARIANTS_NAME = "variants"; + + private static final String databaseName = "test_invalid_variant_db"; + + @Autowired + @Rule + public TemporaryMongoRule mongoRule; + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Before + public void setUp() throws Exception { + mongoRule.getTemporaryDatabase(databaseName).drop(); + } + + @Test + public void loaderStepShouldSkipStructuralVariants() throws Exception { + // When the execute method in variantsLoad is executed + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionVariantsName(COLLECTION_VARIANTS_NAME) + .databaseName(databaseName) + .inputStudyId("1") + .inputVcf(getResource(SMALL_STRUCTURAL_VARIANTS_VCF_FILE).getAbsolutePath()) + .inputVcfAggregation("NONE") + .inputVcfId("1") + .toJobParameters(); + + JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.LOAD_VARIANTS_STEP, jobParameters); + + //Then variantsLoad step should complete correctly + assertCompleted(jobExecution); + + // And the number of documents in the DB should be 1 as all other variants are invalid + assertEquals(1, mongoRule.getCollection(databaseName, COLLECTION_VARIANTS_NAME).count()); + assertEquals(1, mongoRule.getCollection(databaseName, COLLECTION_VARIANTS_NAME).countDocuments(Filters.eq("_id", "1_152739_A_G"))); + } + + /* + * This test case represents a special case of structural variants that should be skipped, but due to a bug makes its + * way into the DB. + * + * The variant has ref as "G" and alt as "G[2:421681[", which means it fits the definition of a structural variant + * and should be skipped by the variant processor that filters out structural variants. + * + * But instead what is currently happening is that after the variant is read, the normalization process in variant + * reader removes the prefix G and the variant is eventually reduced to ref "" and alt "[2:421681[", which does not get + * parsed correctly by the regex in @ExcludeStructuralVariantsProcessor and makes its way into the DB. + * + * Currently, the test case fails therefore we are skipping it for now. It should start passing once we have fixed the + * problem with the normalization in the variant reader. + */ + @Ignore + @Test + public void loaderStepShouldSkipStructuralVariantsWhereRefAndAltStartsWithSameAllele() throws Exception { + // When the execute method in variantsLoad is executed + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionVariantsName(COLLECTION_VARIANTS_NAME) + .databaseName(databaseName) + .inputStudyId("1") + .inputVcf(getResource(SMALL_STRUCTURAL_VARIANTS_VCF_FILE_REF_ALT_STARTS_WITH_SAME_ALLELE).getAbsolutePath()) + .inputVcfAggregation("NONE") + .inputVcfId("1") + .toJobParameters(); + + JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.LOAD_VARIANTS_STEP, jobParameters); + + //Then variantsLoad step should complete correctly + assertCompleted(jobExecution); + + assertEquals(0, mongoRule.getCollection(databaseName, COLLECTION_VARIANTS_NAME).count()); + } +} \ No newline at end of file diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/jobs/steps/processors/ExcludeStructuralVariantsProcessorTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/jobs/steps/processors/ExcludeStructuralVariantsProcessorTest.java new file mode 100644 index 00000000..c5a5f7f4 --- /dev/null +++ b/src/test/java/uk/ac/ebi/eva/pipeline/jobs/steps/processors/ExcludeStructuralVariantsProcessorTest.java @@ -0,0 +1,106 @@ +/* + * + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package uk.ac.ebi.eva.pipeline.jobs.steps.processors; + +import org.junit.BeforeClass; +import org.junit.Test; +import uk.ac.ebi.eva.commons.models.data.Variant; + +import java.util.Arrays; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class ExcludeStructuralVariantsProcessorTest { + + private String ALT_WITH_SINGLE_BASE = "A"; + + private String ALT_WITH_MULTI_BASE = "AT"; + + private String ALT_WITH_EMPTY_ALLELE = ""; + + private String[] ALTS_WITH_SYMBOLIC_INDEL = new String[]{"", "", "", "", "", + "", "", "", ""}; + + private String ALT_WITH_SYMBOLIC_ALLELE = ""; + + /** + * See here. + */ + private String[] ALTS_WITH_BREAK_END_NOTATION = new String[]{ + "G]2:421681]", "]2:421681]G", "]:1234]ATG", + "G[2:421681[", "[2:421681[G", "[:1234[ATG", + ".AGT", "AGT."}; + + private String ALT_GVCF_NOTATION_FOR_REF_ONLY_RECORDS = "<*>"; + + private static ExcludeStructuralVariantsProcessor processor; + + @BeforeClass + public static void setUp() { + processor = new ExcludeStructuralVariantsProcessor(); + } + + @Test + public void altWithSingleBaseAllele() { + Variant variant = newVariant(ALT_WITH_SINGLE_BASE); + assertEquals(variant, processor.process(variant)); + } + + private Variant newVariant(String alternate) { + return new Variant("contig", 1000, 1001, "A", alternate); + } + + @Test + public void altWithMultiBaseAllele() { + Variant variant = newVariant(ALT_WITH_MULTI_BASE); + assertEquals(variant, processor.process(variant)); + } + + @Test + public void altWithEmptyAllele() { + Variant variant = newVariant(ALT_WITH_EMPTY_ALLELE); + assertEquals(variant, processor.process(variant)); + } + + @Test + public void altsWithSymbolicIndel() { + assertTrue(Arrays.stream(ALTS_WITH_SYMBOLIC_INDEL).map(alt -> processor.process(newVariant(alt))) + .allMatch(Objects::isNull)); + } + + @Test + public void altWithSymbolicAllele() { + Variant variant = newVariant(ALT_WITH_SYMBOLIC_ALLELE); + assertNull(processor.process(variant)); + } + + @Test + public void altsWithBreakEndNotationAllele() { + assertTrue(Arrays.stream(ALTS_WITH_BREAK_END_NOTATION).map(alt -> processor.process(newVariant(alt))) + .allMatch(Objects::isNull)); + } + + @Test + public void altWithGVCFAsteriskNotation() { + Variant variant = newVariant(ALT_GVCF_NOTATION_FOR_REF_ONLY_RECORDS); + assertNull(processor.process(variant)); + } +} diff --git a/src/test/resources/input-files/vcf/small_invalid_variant.vcf.gz b/src/test/resources/input-files/vcf/small_invalid_variant.vcf.gz new file mode 100644 index 00000000..be56840c Binary files /dev/null and b/src/test/resources/input-files/vcf/small_invalid_variant.vcf.gz differ diff --git a/src/test/resources/input-files/vcf/small_structural_variant.vcf.gz b/src/test/resources/input-files/vcf/small_structural_variant.vcf.gz new file mode 100644 index 00000000..10f05323 Binary files /dev/null and b/src/test/resources/input-files/vcf/small_structural_variant.vcf.gz differ