From 2cf0c11444cb16fb721ef54f9b2a17db79d6e0eb Mon Sep 17 00:00:00 2001
From: nitin-ebi <79518737+nitin-ebi@users.noreply.github.com>
Date: Thu, 29 Aug 2024 16:54:08 +0100
Subject: [PATCH] EVA-3639 File stats job (#197)
file stats job
---
.../subdocuments/VariantSourceEntryMongo.java | 4 +
.../eva/pipeline/configuration/BeanNames.java | 6 +-
...on.java => FileStatsJobConfiguration.java} | 31 ++-
.../jobs/VariantStatsJobConfiguration.java | 59 +++++
.../steps/FileStatsStepConfiguration.java | 65 ++++++
...ava => VariantStatsStepConfiguration.java} | 10 +-
.../io/readers/VariantStatsReader.java | 8 +-
.../jobs/steps/tasklets/FileStatsTasklet.java | 206 ++++++++++++++++++
.../jobs/steps/FileStatsStepTest.java | 118 ++++++++++
...tepTest.java => VariantStatsStepTest.java} | 14 +-
10 files changed, 487 insertions(+), 34 deletions(-)
rename src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/{CalculateAndLoadStatisticsJobConfiguration.java => FileStatsJobConfiguration.java} (61%)
create mode 100644 src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/VariantStatsJobConfiguration.java
create mode 100644 src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepConfiguration.java
rename src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/{CalculateAndLoadStatisticsStepConfiguration.java => VariantStatsStepConfiguration.java} (90%)
create mode 100644 src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java
create mode 100644 src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepTest.java
rename src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/{CalculateAndLoadStatisticsStepTest.java => VariantStatsStepTest.java} (89%)
diff --git a/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java b/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java
index 4ddb8a23..a75b653d 100644
--- a/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java
+++ b/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java
@@ -189,4 +189,8 @@ public String getFileId() {
public String[] getAlternates() {
return alternates;
}
+
+ public BasicDBObject getAttrs() {
+ return attrs;
+ }
}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java
index dcacd048..d36a4b86 100644
--- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java
@@ -63,7 +63,8 @@ public class BeanNames {
public static final String DROP_FILES_BY_STUDY_STEP = "drop-files-by-study-step";
public static final String LOAD_ANNOTATION_METADATA_STEP = "annotation-metadata-step";
public static final String ACCESSION_IMPORT_STEP = "accession-import-step";
- public static final String CALCULATE_AND_LOAD_STATISTICS_STEP = "calculate-load-statistics-step";
+ public static final String VARIANT_STATS_STEP = "variant-stats-step";
+ public static final String FILE_STATS_STEP = "file-stats-step";
public static final String AGGREGATED_VCF_JOB = "aggregated-vcf-job";
public static final String ANNOTATE_VARIANTS_JOB = "annotate-variants-job";
@@ -73,5 +74,6 @@ public class BeanNames {
public static final String CALCULATE_STATISTICS_JOB = "calculate-statistics-job";
public static final String DROP_STUDY_JOB = "drop-study-job";
public static final String ACCESSION_IMPORT_JOB = "accession-import-job";
- public static final String CALCULATE_AND_LOAD_STATISTICS_JOB = "calculate-load-statistics-job";
+ public static final String VARIANT_STATS_JOB = "variant-stats-job";
+ public static final String FILE_STATS_JOB = "file-stats-job";
}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/FileStatsJobConfiguration.java
similarity index 61%
rename from src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java
rename to src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/FileStatsJobConfiguration.java
index ea85cefc..da511736 100644
--- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/FileStatsJobConfiguration.java
@@ -27,37 +27,32 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Scope;
-import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.CalculateAndLoadStatisticsStepConfiguration;
+import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.FileStatsStepConfiguration;
import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer;
-import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_JOB;
-import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP;
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.FILE_STATS_JOB;
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.FILE_STATS_STEP;
-/**
- * Configuration to run a full Statistics job: variantStatsFlow: statsCreate --> statsLoad
- *
- * TODO add a new PopulationStatisticsJobParametersValidator
- */
@Configuration
@EnableBatchProcessing
-@Import({CalculateAndLoadStatisticsStepConfiguration.class})
-public class CalculateAndLoadStatisticsJobConfiguration {
+@Import({FileStatsStepConfiguration.class})
+public class FileStatsJobConfiguration {
- private static final Logger logger = LoggerFactory.getLogger(CalculateAndLoadStatisticsJobConfiguration.class);
+ private static final Logger logger = LoggerFactory.getLogger(FileStatsJobConfiguration.class);
@Autowired
- @Qualifier(CALCULATE_AND_LOAD_STATISTICS_STEP)
- private Step calculateAndLoadStatisticsStep;
+ @Qualifier(FILE_STATS_STEP)
+ private Step fileStatsStep;
- @Bean(CALCULATE_AND_LOAD_STATISTICS_JOB)
+ @Bean(FILE_STATS_JOB)
@Scope("prototype")
- public Job calculateAndLoadStatisticsJob(JobBuilderFactory jobBuilderFactory) {
- logger.debug("Building '" + CALCULATE_AND_LOAD_STATISTICS_JOB + "'");
+ public Job fileStatsJob(JobBuilderFactory jobBuilderFactory) {
+ logger.debug("Building '" + FILE_STATS_JOB + "'");
return jobBuilderFactory
- .get(CALCULATE_AND_LOAD_STATISTICS_JOB)
+ .get(FILE_STATS_JOB)
.incrementer(new NewJobIncrementer())
- .start(calculateAndLoadStatisticsStep)
+ .start(fileStatsStep)
.build();
}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/VariantStatsJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/VariantStatsJobConfiguration.java
new file mode 100644
index 00000000..398679ac
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/VariantStatsJobConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2024 EMBL - European Bioinformatics Institute
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package uk.ac.ebi.eva.pipeline.configuration.jobs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
+import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Scope;
+import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.VariantStatsStepConfiguration;
+import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer;
+
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_JOB;
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_STEP;
+
+@Configuration
+@EnableBatchProcessing
+@Import({VariantStatsStepConfiguration.class})
+public class VariantStatsJobConfiguration {
+
+ private static final Logger logger = LoggerFactory.getLogger(VariantStatsJobConfiguration.class);
+
+ @Autowired
+ @Qualifier(VARIANT_STATS_STEP)
+ private Step variantStatsStep;
+
+ @Bean(VARIANT_STATS_JOB)
+ @Scope("prototype")
+ public Job variantStatsJob(JobBuilderFactory jobBuilderFactory) {
+ logger.debug("Building '" + VARIANT_STATS_JOB + "'");
+
+ return jobBuilderFactory
+ .get(VARIANT_STATS_JOB)
+ .incrementer(new NewJobIncrementer())
+ .start(variantStatsStep)
+ .build();
+ }
+
+}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepConfiguration.java
new file mode 100644
index 00000000..1ba547f3
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepConfiguration.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2024 EMBL - European Bioinformatics Institute
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package uk.ac.ebi.eva.pipeline.configuration.jobs.steps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
+import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.core.step.tasklet.TaskletStep;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import uk.ac.ebi.eva.pipeline.jobs.steps.tasklets.FileStatsTasklet;
+import uk.ac.ebi.eva.pipeline.parameters.ChunkSizeParameters;
+import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters;
+import uk.ac.ebi.eva.pipeline.parameters.InputParameters;
+import uk.ac.ebi.eva.pipeline.parameters.JobOptions;
+import uk.ac.ebi.eva.utils.TaskletUtils;
+
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.FILE_STATS_STEP;
+
+
+@Configuration
+@EnableBatchProcessing
+public class FileStatsStepConfiguration {
+ private static final Logger logger = LoggerFactory.getLogger(FileStatsStepConfiguration.class);
+
+ @Bean
+ @StepScope
+ public FileStatsTasklet fileStatsTasklet(DatabaseParameters databaseParameters,
+ MongoTemplate mongoTemplate,
+ InputParameters inputParameters,
+ ChunkSizeParameters chunkSizeParameters) {
+ return new FileStatsTasklet(databaseParameters, mongoTemplate, inputParameters.getStudyId(),
+ chunkSizeParameters.getChunkSize());
+ }
+
+ @Bean(FILE_STATS_STEP)
+ public TaskletStep fileStatsStep(DatabaseParameters databaseParameters,
+ MongoTemplate mongoTemplate,
+ InputParameters inputParameters,
+ ChunkSizeParameters chunkSizeParameters,
+ StepBuilderFactory stepBuilderFactory,
+ JobOptions jobOptions) {
+ logger.debug("Building '" + FILE_STATS_STEP + "'");
+
+ return TaskletUtils.generateStep(stepBuilderFactory, FILE_STATS_STEP,
+ fileStatsTasklet(databaseParameters, mongoTemplate, inputParameters, chunkSizeParameters),
+ jobOptions.isAllowStartIfComplete());
+ }
+}
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/VariantStatsStepConfiguration.java
similarity index 90%
rename from src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java
rename to src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/VariantStatsStepConfiguration.java
index 3aaa813e..6a0a0125 100644
--- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/VariantStatsStepConfiguration.java
@@ -33,9 +33,9 @@
import uk.ac.ebi.eva.pipeline.configuration.io.writers.VariantStatsWriterConfiguration;
import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors.VariantStatsProcessorConfiguration;
-import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP;
import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_PROCESSOR;
import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_READER;
+import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_STEP;
import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_WRITER;
@@ -43,16 +43,16 @@
@EnableBatchProcessing
@Import({VariantStatsReaderConfiguration.class, VariantStatsWriterConfiguration.class,
VariantStatsProcessorConfiguration.class, ChunkSizeCompletionPolicyConfiguration.class})
-public class CalculateAndLoadStatisticsStepConfiguration {
+public class VariantStatsStepConfiguration {
- @Bean(CALCULATE_AND_LOAD_STATISTICS_STEP)
- public Step calculateAndLoadStatisticsStep(
+ @Bean(VARIANT_STATS_STEP)
+ public Step variantStatsStep(
@Qualifier(VARIANT_STATS_READER) ItemStreamReader variantStatsReader,
@Qualifier(VARIANT_STATS_PROCESSOR) ItemProcessor variantStatsProcessor,
@Qualifier(VARIANT_STATS_WRITER) ItemWriter variantStatsWriter,
StepBuilderFactory stepBuilderFactory,
SimpleCompletionPolicy chunkSizeCompletionPolicy) {
- TaskletStep step = stepBuilderFactory.get(CALCULATE_AND_LOAD_STATISTICS_STEP)
+ TaskletStep step = stepBuilderFactory.get(VARIANT_STATS_STEP)
.chunk(chunkSizeCompletionPolicy)
.reader(variantStatsReader)
.processor(variantStatsProcessor)
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java
index d845d0bd..eda7561a 100644
--- a/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java
@@ -22,6 +22,8 @@
import java.util.Map;
import java.util.stream.Collectors;
+import static com.mongodb.client.model.Accumulators.sum;
+import static com.mongodb.client.model.Aggregates.group;
import static com.mongodb.client.model.Aggregates.match;
import static com.mongodb.client.model.Aggregates.project;
import static com.mongodb.client.model.Projections.computed;
@@ -92,11 +94,13 @@ private void populateFilesIdAndNumberOfSamplesMap() {
computed("fid", "$fid"),
computed("numOfSamples", new Document("$size", new Document("$objectToArray", "$samp")))
));
+ Bson groupStage = group("$fid", sum("totalNumOfSamples", "$numOfSamples"));
+
filesIdNumberOfSamplesMap = mongoTemplate.getCollection(databaseParameters.getCollectionFilesName())
- .aggregate(asList(matchStage, projectStage))
+ .aggregate(asList(matchStage, projectStage, groupStage))
.into(new ArrayList<>())
.stream()
- .collect(Collectors.toMap(doc -> doc.getString("fid"), doc -> doc.getInteger("numOfSamples")));
+ .collect(Collectors.toMap(doc -> doc.getString("_id"), doc -> doc.getInteger("totalNumOfSamples")));
}
public static Map getFilesIdAndNumberOfSamplesMap() {
diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java b/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java
new file mode 100644
index 00000000..8a93ebe9
--- /dev/null
+++ b/src/main/java/uk/ac/ebi/eva/pipeline/jobs/steps/tasklets/FileStatsTasklet.java
@@ -0,0 +1,206 @@
+package uk.ac.ebi.eva.pipeline.jobs.steps.tasklets;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.Filters;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.data.mongodb.core.BulkOperations;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.convert.MongoConverter;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+import uk.ac.ebi.eva.commons.models.data.Variant;
+import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument;
+import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantSourceEntryMongo;
+import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.mongodb.client.model.Accumulators.sum;
+import static com.mongodb.client.model.Aggregates.group;
+import static com.mongodb.client.model.Aggregates.match;
+import static com.mongodb.client.model.Aggregates.project;
+import static com.mongodb.client.model.Projections.computed;
+import static com.mongodb.client.model.Projections.fields;
+import static java.util.Arrays.asList;
+
+public class FileStatsTasklet implements Tasklet {
+ private static final Logger logger = LoggerFactory.getLogger(FileStatsTasklet.class);
+
+ private static final String KEY_NO_OF_SAMPLES = "nSamp";
+ private static final String KEY_NO_OF_VARIANTS = "nVar";
+ private static final String KEY_NO_OF_SNP = "nSnp";
+ private static final String KEY_NO_OF_INDEL = "nIndel";
+ private static final String KEY_NO_OF_PASS = "nPass";
+ private static final String KEY_NO_OF_TRANSITION = "nTi";
+ private static final String KEY_NO_OF_TRANSVERSION = "nTv";
+
+ private static final Set transitions = new HashSet<>(Arrays.asList("AG", "GA", "CT", "TC"));
+
+ // Store the map of files to number of sample from the file_2_0 collection
+ private static Map fileIdNumberOfSamplesMap;
+ // Store the map of files to their stats counts
+ private static Map> fileIdCountsMap = new HashMap<>();
+
+ private DatabaseParameters databaseParameters;
+ private MongoTemplate mongoTemplate;
+ private MongoCursor cursor;
+ private MongoConverter converter;
+ private int chunkSize;
+ private String studyId;
+
+ public FileStatsTasklet(DatabaseParameters databaseParameters, MongoTemplate mongoTemplate, String studyId, int chunkSize) {
+ this.databaseParameters = databaseParameters;
+ this.mongoTemplate = mongoTemplate;
+ this.studyId = studyId;
+ this.chunkSize = chunkSize;
+ this.converter = mongoTemplate.getConverter();
+ }
+
+ @Override
+ public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
+ populateFilesIdAndNumberOfSamplesMap();
+ initializeFileIdCountsMap();
+ cursor = initializeCursor();
+ try {
+ while (cursor.hasNext()) {
+ VariantDocument variantDocument = getVariant(cursor.next());
+ processCounts(variantDocument);
+ }
+ } finally {
+ cursor.close();
+ }
+
+ writeStatsInTheDB();
+
+ return RepeatStatus.FINISHED;
+ }
+
+ private void populateFilesIdAndNumberOfSamplesMap() {
+ Bson matchStage = match(Filters.eq("sid", studyId));
+ Bson projectStage = project(fields(
+ computed("fid", "$fid"),
+ computed("numOfSamples", new Document("$size", new Document("$objectToArray", "$samp")))
+ ));
+ Bson groupStage = group("$fid", sum("totalNumOfSamples", "$numOfSamples"));
+
+ fileIdNumberOfSamplesMap = mongoTemplate.getCollection(databaseParameters.getCollectionFilesName())
+ .aggregate(asList(matchStage, projectStage, groupStage))
+ .into(new ArrayList<>())
+ .stream()
+ .collect(Collectors.toMap(doc -> doc.getString("_id"), doc -> doc.getInteger("totalNumOfSamples")));
+ }
+
+ private void initializeFileIdCountsMap() {
+ for (String fileId : fileIdNumberOfSamplesMap.keySet()) {
+ HashMap countsMap = new HashMap<>();
+
+ countsMap.put(KEY_NO_OF_SAMPLES, fileIdNumberOfSamplesMap.get(fileId));
+ countsMap.put(KEY_NO_OF_VARIANTS, 0);
+ countsMap.put(KEY_NO_OF_SNP, 0);
+ countsMap.put(KEY_NO_OF_INDEL, 0);
+ countsMap.put(KEY_NO_OF_PASS, 0);
+ countsMap.put(KEY_NO_OF_TRANSITION, 0);
+ countsMap.put(KEY_NO_OF_TRANSVERSION, 0);
+
+ fileIdCountsMap.put(fileId, countsMap);
+ }
+ }
+
+ private MongoCursor initializeCursor() {
+ Bson query = Filters.elemMatch(VariantDocument.FILES_FIELD, Filters.eq(VariantSourceEntryMongo.STUDYID_FIELD, studyId));
+ logger.info("Issuing find: {}", query);
+
+ FindIterable statsVariantDocuments = mongoTemplate.getCollection(databaseParameters.getCollectionVariantsName())
+ .find(query)
+ .noCursorTimeout(true)
+ .batchSize(chunkSize);
+
+ return statsVariantDocuments.iterator();
+ }
+
+ private VariantDocument getVariant(Document variantDocument) {
+ return converter.read(VariantDocument.class, new BasicDBObject(variantDocument));
+ }
+
+ private void processCounts(VariantDocument variantDocument) {
+ // get all fileIds this variant belongs to
+ Set fileIds = variantDocument.getVariantSources().stream()
+ .filter(vse -> vse.getStudyId().equals(studyId))
+ .map(vse -> vse.getFileId())
+ .collect(Collectors.toSet());
+
+ boolean isSNV = variantDocument.getVariantType().equals(Variant.VariantType.SNV);
+ boolean isINDEL = variantDocument.getVariantType().equals(Variant.VariantType.INDEL);
+
+ boolean isTransition = false;
+ boolean isTransversion = false;
+ if (isSNV) {
+ String ref = variantDocument.getReference();
+ String alt = variantDocument.getAlternate();
+ String refAlt = ref + alt;
+ if (transitions.contains(refAlt)) {
+ isTransition = true;
+ } else {
+ isTransversion = true;
+ }
+ }
+
+ for (String fileId : fileIds) {
+ Map countsMap = fileIdCountsMap.get(fileId);
+ countsMap.merge(KEY_NO_OF_VARIANTS, 1, Integer::sum);
+
+ if (isSNV) {
+ countsMap.merge(KEY_NO_OF_SNP, 1, Integer::sum);
+ } else if (isINDEL) {
+ countsMap.merge(KEY_NO_OF_INDEL, 1, Integer::sum);
+ }
+
+ if (isTransition) {
+ countsMap.merge(KEY_NO_OF_TRANSITION, 1, Integer::sum);
+ } else if (isTransversion) {
+ countsMap.merge(KEY_NO_OF_TRANSVERSION, 1, Integer::sum);
+ }
+
+ boolean hasPass = variantDocument.getVariantSources().stream()
+ .filter(vse -> vse.getStudyId().equals(studyId) && vse.getFileId().equals(fileId))
+ .map(vse -> (vse.getAttrs() != null) ? vse.getAttrs().getOrDefault("FILTER", "") : "")
+ .allMatch(f -> f.equals("PASS"));
+ if (hasPass) {
+ countsMap.merge(KEY_NO_OF_PASS, 1, Integer::sum);
+ }
+ }
+ }
+
+ private void writeStatsInTheDB() {
+ BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED,
+ databaseParameters.getCollectionFilesName());
+
+ for (Map.Entry> entry : fileIdCountsMap.entrySet()) {
+ String fileId = entry.getKey();
+ Map countsMap = entry.getValue();
+
+ Query query = new Query(Criteria.where("sid").is(studyId).and("fid").is(fileId));
+ Update update = new Update();
+ update.set("st", countsMap);
+ bulkOperations.updateMulti(query, update);
+ }
+
+ bulkOperations.execute();
+ }
+}
diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepTest.java
new file mode 100644
index 00000000..d4aea46e
--- /dev/null
+++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/FileStatsStepTest.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2024 EMBL - European Bioinformatics Institute
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package uk.ac.ebi.eva.pipeline.configuration.jobs.steps;
+
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.test.JobLauncherTestUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+import uk.ac.ebi.eva.pipeline.configuration.BeanNames;
+import uk.ac.ebi.eva.pipeline.configuration.MongoConfiguration;
+import uk.ac.ebi.eva.pipeline.configuration.jobs.FileStatsJobConfiguration;
+import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration;
+import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration;
+import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule;
+import uk.ac.ebi.eva.test.rules.TemporaryMongoRule;
+import uk.ac.ebi.eva.utils.EvaJobParameterBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted;
+import static uk.ac.ebi.eva.test.utils.TestFileUtils.getResourceUrl;
+
+/**
+ * Test for {@link VariantStatsStepConfiguration}
+ */
+@RunWith(SpringRunner.class)
+@TestPropertySource({"classpath:test-stats.properties"})
+@ContextConfiguration(classes = {FileStatsJobConfiguration.class, BatchTestConfiguration.class,
+ TemporaryRuleConfiguration.class, MongoConfiguration.class})
+public class FileStatsStepTest {
+ private static final String MONGO_DUMP = "/dump/VariantStatsConfigurationTest_vl";
+
+ private static final String COLLECTION_VARIANTS_NAME = "variants";
+
+ private static final String COLLECTION_FILES_NAME = "files";
+
+ private static final String DATABASE_NAME = "file_stats_test_db";
+
+ private static final String STUDY_ID = "1";
+
+ @Autowired
+ @Rule
+ public TemporaryMongoRule mongoRule;
+
+ @Rule
+ public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule();
+
+ @Autowired
+ private JobLauncherTestUtils jobLauncherTestUtils;
+
+ @Before
+ public void setUp() throws Exception {
+ mongoRule.getTemporaryDatabase(DATABASE_NAME).drop();
+ mongoRule.restoreDump(getResourceUrl(MONGO_DUMP), DATABASE_NAME);
+ }
+
+ @After
+ public void cleanUp() {
+ mongoRule.getTemporaryDatabase(DATABASE_NAME).drop();
+ }
+
+ @Test
+ public void fileStatsStepShouldCalculateAndLoadStats() {
+ JobParameters jobParameters = new EvaJobParameterBuilder()
+ .collectionFilesName(COLLECTION_FILES_NAME)
+ .collectionVariantsName(COLLECTION_VARIANTS_NAME)
+ .databaseName(DATABASE_NAME)
+ .inputStudyId(STUDY_ID)
+ .chunkSize("100")
+ .toJobParameters();
+
+ JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.FILE_STATS_STEP, jobParameters);
+
+ // check job completed successfully
+ assertCompleted(jobExecution);
+ List documents = mongoRule.getTemporaryDatabase(DATABASE_NAME).getCollection(COLLECTION_FILES_NAME)
+ .find().into(new ArrayList<>());
+ assertEquals(1, documents.size());
+ // assert all statistics are calculated for all documents
+ Assert.assertTrue(documents.stream().allMatch(doc -> doc.containsKey("st")));
+ // assert statistics for the study id 1 and file id 1
+ Document fileStats = documents.stream()
+ .filter(doc -> doc.get("sid").equals("1") && doc.get("fid").equals("1"))
+ .findFirst().get().get("st", Document.class);
+ assertEquals(2504, fileStats.get("nSamp"));
+ assertEquals(300, fileStats.get("nVar"));
+ assertEquals(281, fileStats.get("nSnp"));
+ assertEquals(19, fileStats.get("nIndel"));
+ assertEquals(300, fileStats.get("nPass"));
+ assertEquals(178, fileStats.get("nTi"));
+ assertEquals(103, fileStats.get("nTv"));
+ }
+}
diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/VariantStatsStepTest.java
similarity index 89%
rename from src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java
rename to src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/VariantStatsStepTest.java
index f44291f4..fae03763 100644
--- a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java
+++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/VariantStatsStepTest.java
@@ -31,7 +31,7 @@
import org.springframework.test.context.junit4.SpringRunner;
import uk.ac.ebi.eva.pipeline.configuration.BeanNames;
import uk.ac.ebi.eva.pipeline.configuration.MongoConfiguration;
-import uk.ac.ebi.eva.pipeline.configuration.jobs.CalculateAndLoadStatisticsJobConfiguration;
+import uk.ac.ebi.eva.pipeline.configuration.jobs.VariantStatsJobConfiguration;
import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration;
import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration;
import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule;
@@ -46,20 +46,20 @@
import static uk.ac.ebi.eva.test.utils.TestFileUtils.getResourceUrl;
/**
- * Test for {@link CalculateAndLoadStatisticsStepConfiguration}
+ * Test for {@link VariantStatsStepConfiguration}
*/
@RunWith(SpringRunner.class)
@TestPropertySource({"classpath:test-stats.properties"})
-@ContextConfiguration(classes = {CalculateAndLoadStatisticsJobConfiguration.class, BatchTestConfiguration.class,
+@ContextConfiguration(classes = {VariantStatsJobConfiguration.class, BatchTestConfiguration.class,
TemporaryRuleConfiguration.class, MongoConfiguration.class})
-public class CalculateAndLoadStatisticsStepTest {
+public class VariantStatsStepTest {
private static final String MONGO_DUMP = "/dump/VariantStatsConfigurationTest_vl";
private static final String COLLECTION_VARIANTS_NAME = "variants";
private static final String COLLECTION_FILES_NAME = "files";
- private static final String DATABASE_NAME = "calculate_load_stats_test_db";
+ private static final String DATABASE_NAME = "variant_stats_test_db";
private static final String STUDY_ID = "1";
@@ -85,7 +85,7 @@ public void cleanUp() {
}
@Test
- public void calculateAndLoadStatisticsStepShouldCalculateAndLoadStats() {
+ public void variantStatsStepShouldCalculateAndLoadStats() {
JobParameters jobParameters = new EvaJobParameterBuilder()
.collectionFilesName(COLLECTION_FILES_NAME)
.collectionVariantsName(COLLECTION_VARIANTS_NAME)
@@ -94,7 +94,7 @@ public void calculateAndLoadStatisticsStepShouldCalculateAndLoadStats() {
.chunkSize("100")
.toJobParameters();
- JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP, jobParameters);
+ JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.VARIANT_STATS_STEP, jobParameters);
// check job completed successfully
assertCompleted(jobExecution);