Skip to content

Commit

Permalink
Merge pull request #104 from poggio84/MonitorProgressVariantLoaderSte…
Browse files Browse the repository at this point in the history
…p_EVA-675

* New listener to estimate the total number of rows in a VCF file created
* Estimator wired with the step listener reporting statistics about the chunk
* Listener statistics reporter wired into VariantLoader step
  • Loading branch information
Cristina Yenyxe Gonzalez Garcia authored Mar 20, 2017
2 parents 52182e1 + 2e5a9bc commit 5202fb5
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected void doWrite(List<? extends Variant> variants) {

private void executeBulk(BulkWriteOperation bulk, int currentBulkSize) {
if (currentBulkSize != 0) {
logger.debug("Execute bulk. BulkSize : " + currentBulkSize);
logger.trace("Execute bulk. BulkSize : " + currentBulkSize);
bulk.execute();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import uk.ac.ebi.eva.pipeline.configuration.ChunkSizeCompletionPolicyConfiguration;
import uk.ac.ebi.eva.pipeline.configuration.readers.VcfReaderConfiguration;
import uk.ac.ebi.eva.pipeline.configuration.writers.VariantWriterConfiguration;
import uk.ac.ebi.eva.pipeline.listeners.VariantLoaderStepStatisticsListener;
import uk.ac.ebi.eva.pipeline.listeners.SkippedItemListener;
import uk.ac.ebi.eva.pipeline.listeners.StepProgressListener;
import uk.ac.ebi.eva.pipeline.parameters.JobOptions;

import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.LOAD_VARIANTS_STEP;
Expand Down Expand Up @@ -73,6 +75,8 @@ public Step loadVariantsStep(StepBuilderFactory stepBuilderFactory, JobOptions j
.faultTolerant().skipLimit(50).skip(FlatFileParseException.class)
.allowStartIfComplete(jobOptions.isAllowStartIfComplete())
.listener(new SkippedItemListener())
.listener(new StepProgressListener())
.listener(new VariantLoaderStepStatisticsListener())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 EMBL - European Bioinformatics Institute
* Copyright 2016-2017 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,10 +20,11 @@
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

import uk.ac.ebi.eva.pipeline.parameters.ExecutionContextParametersNames;

/**
* Log the number of read, write and skip items for each chunk.
* Should be wired into a {@link org.springframework.batch.core.Step}
*
*/
public class StepProgressListener implements ChunkListener {
private static final Logger logger = LoggerFactory.getLogger(StepProgressListener.class);
Expand All @@ -34,13 +35,25 @@ public void beforeChunk(ChunkContext context) {

@Override
public void afterChunk(ChunkContext context) {
logger.info("Chunk stats: Items read count {}, items write count {}, items skip count{}",
context.getStepContext().getStepExecution().getReadCount(),
context.getStepContext().getStepExecution().getWriteCount(),
context.getStepContext().getStepExecution().getReadSkipCount()
+ context.getStepContext().getStepExecution().getProcessSkipCount()
+ context.getStepContext().getStepExecution().getWriteSkipCount()
);

long estimatedTotalNumberOfLines = (long)context.getStepContext().getStepExecutionContext()
.get(ExecutionContextParametersNames.NUMBER_OF_LINES);

long read = context.getStepContext().getStepExecution().getReadCount();
long write = context.getStepContext().getStepExecution().getWriteCount();
long skip = context.getStepContext().getStepExecution().getReadSkipCount()
+ context.getStepContext().getStepExecution().getProcessSkipCount()
+ context.getStepContext().getStepExecution().getWriteSkipCount();

String chunkStatisticsMessage = "Items read =" + read + ", items written = " + write + ", items skipped = " + skip;

if (estimatedTotalNumberOfLines != 0) {
int percent = (int) ((read * 100) / estimatedTotalNumberOfLines);
logger.info(percent + "% complete: " + chunkStatisticsMessage);
} else {
logger.info(chunkStatisticsMessage);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2017 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.ebi.eva.pipeline.listeners;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

import uk.ac.ebi.eva.pipeline.parameters.ExecutionContextParametersNames;
import uk.ac.ebi.eva.pipeline.parameters.JobParametersNames;
import uk.ac.ebi.eva.utils.VcfNumberOfLinesEstimator;

/**
* - Estimate the number of lines in the VCF file before the step. This will be used in {@link StepProgressListener}
* - Log a statistics summary after the step
*/
public class VariantLoaderStepStatisticsListener implements StepExecutionListener {
private static final Logger logger = LoggerFactory.getLogger(VariantLoaderStepStatisticsListener.class);

@Override
public void beforeStep(StepExecution stepExecution) {
String vcfFilePath = stepExecution.getJobExecution().getJobParameters().getString(JobParametersNames.INPUT_VCF);
long estimatedTotalNumberOfLines = new VcfNumberOfLinesEstimator().estimateVcfNumberOfLines(vcfFilePath);
stepExecution.getExecutionContext().put(ExecutionContextParametersNames.NUMBER_OF_LINES, estimatedTotalNumberOfLines);
}

@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.info("Items read = " + stepExecution.getReadCount()
+ ", items written = " + stepExecution.getWriteCount()
+ ", items skipped = " + stepExecution.getSkipCount());

return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2017 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.ebi.eva.pipeline.parameters;

/**
* Class that holds the names (keys) of the parameters used into {@link org.springframework.batch.item.ExecutionContext}
*/
public class ExecutionContextParametersNames {
public static final String NUMBER_OF_LINES = "line";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ public class JobParametersNames {
/*
* Input
*/

public static final String INPUT_VCF = "input.vcf";

public static final String INPUT_VCF_ID = "input.vcf.id";

public static final String INPUT_VCF_AGGREGATION = "input.vcf.aggregation";

public static final String INPUT_STUDY_NAME = "input.study.name";

public static final String INPUT_STUDY_ID = "input.study.id";

public static final String INPUT_STUDY_TYPE = "input.study.type";

public static final String INPUT_PEDIGREE = "input.pedigree";

public static final String INPUT_GTF = "input.gtf";

public static final String INPUT_FASTA = "input.fasta";
Expand All @@ -46,101 +46,102 @@ public class JobParametersNames {
/*
* Output
*/

public static final String OUTPUT_DIR = "output.dir";

public static final String OUTPUT_DIR_ANNOTATION = "output.dir.annotation";

public static final String OUTPUT_DIR_STATISTICS = "output.dir.statistics";


/*
* Database infrastructure (Spring Data)
*/

public static final String CONFIG_DB_HOSTS = "spring.data.mongodb.host";

public static final String CONFIG_DB_AUTHENTICATIONDB = "spring.data.mongodb.authentication-database";

public static final String CONFIG_DB_USER = "spring.data.mongodb.username";

public static final String CONFIG_DB_PASSWORD = "spring.data.mongodb.password";

public static final String CONFIG_DB_READPREFERENCE = "config.db.read-preference";


/*
* Database and collections
*/

public static final String DB_NAME = "spring.data.mongodb.database";

public static final String DB_COLLECTIONS_VARIANTS_NAME = "db.collections.variants.name";

public static final String DB_COLLECTIONS_FILES_NAME = "db.collections.files.name";

public static final String DB_COLLECTIONS_FEATURES_NAME = "db.collections.features.name";

public static final String DB_COLLECTIONS_STATISTICS_NAME = "db.collections.stats.name";


/*
* Skip and overwrite steps
*/

public static final String ANNOTATION_SKIP = "annotation.skip";

public static final String STATISTICS_SKIP = "statistics.skip";

public static final String STATISTICS_OVERWRITE = "statistics.overwrite";


/*
* OpenCGA (parameters read from OpenCGA "conf" folder)
*/

public static final String APP_OPENCGA_PATH = "app.opencga.path";

public static final String OPENCGA_DB_NAME = MongoDBVariantStorageManager.OPENCGA_STORAGE_MONGODB_VARIANT_DB_NAME;

public static final String OPENCGA_DB_HOSTS = MongoDBVariantStorageManager.OPENCGA_STORAGE_MONGODB_VARIANT_DB_HOSTS;

public static final String OPENCGA_DB_AUTHENTICATIONDB = MongoDBVariantStorageManager.OPENCGA_STORAGE_MONGODB_VARIANT_DB_AUTHENTICATION_DB;

public static final String OPENCGA_DB_USER = MongoDBVariantStorageManager.OPENCGA_STORAGE_MONGODB_VARIANT_DB_USER;

public static final String OPENCGA_DB_PASSWORD = MongoDBVariantStorageManager.OPENCGA_STORAGE_MONGODB_VARIANT_DB_PASS;

public static final String OPENCGA_DB_COLLECTIONS_VARIANTS_NAME = MongoDBVariantStorageManager.OPENCGA_STORAGE_MONGODB_VARIANT_DB_COLLECTION_VARIANTS;

public static final String OPENCGA_DB_COLLECTIONS_FILES_NAME = MongoDBVariantStorageManager.OPENCGA_STORAGE_MONGODB_VARIANT_DB_COLLECTION_FILES;


/*
* Variant Effect Predictor (VEP)
*/

public static final String APP_VEP_PATH = "app.vep.path";

public static final String APP_VEP_CACHE_PATH = "app.vep.cache.path";

public static final String APP_VEP_CACHE_VERSION = "app.vep.cache.version";

public static final String APP_VEP_CACHE_SPECIES = "app.vep.cache.species";

public static final String APP_VEP_NUMFORKS = "app.vep.num-forks";


/*
* Other configuration
*/

public static final String CONFIG_RESTARTABILITY_ALLOW = "config.restartability.allow";

public static final String CONFIG_CHUNK_SIZE = "config.chunk.size";


public static final String PROPERTY_FILE_PROPERTY = "parameters.path";

public static final String RESTART_PROPERTY = "force.restart";

}
23 changes: 22 additions & 1 deletion src/main/java/uk/ac/ebi/eva/utils/FileUtils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 EMBL - European Bioinformatics Institute
* Copyright 2016-2017 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,11 +23,15 @@

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;
import java.util.zip.GZIPOutputStream;

public abstract class FileUtils {

Expand Down Expand Up @@ -74,4 +78,21 @@ public static Properties getPropertiesFile(InputStream propertiesInputStream) th
properties.load(propertiesInputStream);
return properties;
}

/**
* Creates a temporary GzipFile withe the content at {@param content}.
* @param content
* @param name how the temporal file will be called
* @return
* @throws IOException
*/
public static File newGzipFile(String content, String name) throws IOException {
File tempFile = File.createTempFile(name, ".gz");
try (FileOutputStream output = new FileOutputStream(tempFile)) {
try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(output), "UTF-8")) {
writer.write(content);
}
}
return tempFile;
}
}
Loading

0 comments on commit 5202fb5

Please sign in to comment.