Skip to content

Commit

Permalink
Continue to wire in event detection using a placeholder for the final…
Browse files Browse the repository at this point in the history
… detection request, #130
  • Loading branch information
james-d-brown committed Dec 12, 2024
1 parent 6e3fcdc commit 38a8a21
Show file tree
Hide file tree
Showing 12 changed files with 970 additions and 157 deletions.
4 changes: 2 additions & 2 deletions src/wres/pipeline/EvaluationDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @param caches the database caches/ORMs
* @param metricsAndThresholds the metrics and thresholds
* @param project the project
* @param evaluation the evaluation
* @param evaluationMessager the evaluation messager
* @param timeSeriesStore the time-series data store
* @param summaryStatistics the summary statistics calculators
* @param summaryStatisticsForBaseline the summary statistics calculators for baseline datasets
Expand All @@ -49,7 +49,7 @@ record EvaluationDetails( SystemSettings systemSettings,
DatabaseCaches caches,
Set<MetricsAndThresholds> metricsAndThresholds,
Project project,
EvaluationMessager evaluation,
EvaluationMessager evaluationMessager,
TimeSeriesStore timeSeriesStore,
Map<String, List<SummaryStatisticsCalculator>> summaryStatistics,
Map<String, List<SummaryStatisticsCalculator>> summaryStatisticsForBaseline,
Expand Down
112 changes: 69 additions & 43 deletions src/wres/pipeline/EvaluationUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import wres.reading.netcdf.grid.GriddedFeatures;
import wres.io.retrieving.database.EnsembleSingleValuedRetrieverFactory;
import wres.io.retrieving.memory.EnsembleSingleValuedRetrieverFactoryInMemory;
import wres.statistics.generated.Evaluation;
import wres.writing.csv.pairs.EnsemblePairsWriter;
import wres.metrics.BoxplotSummaryStatisticFunction;
import wres.metrics.DiagramSummaryStatisticFunction;
Expand All @@ -75,7 +76,6 @@
import wres.pipeline.statistics.EnsembleStatisticsProcessor;
import wres.pipeline.statistics.SingleValuedStatisticsProcessor;
import wres.statistics.generated.Consumer.Format;
import wres.statistics.generated.Evaluation;
import wres.statistics.generated.GeometryGroup;
import wres.statistics.generated.GeometryTuple;
import wres.statistics.generated.Outputs;
Expand Down Expand Up @@ -120,12 +120,12 @@ class EvaluationUtilities
private static final String SUMMARY_STATISTICS_ACROSS_FEATURES = "ALL FEATURES";

/** Re-used string. */
private static final String PERFORMING_RETRIEVAL_WITH_AN_IN_MEMORY_RETRIEVER_FACTORY =
"Performing retrieval with an in-memory retriever factory.";
private static final String CREATED_AN_IN_MEMORY_RETRIEVER_FACTORY =
"Created an in-memory retriever factory.";

/** Re-used string. */
private static final String PERFORMING_RETRIEVAL_WITH_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE =
"Performing retrieval with a retriever factory backed by a persistent store.";
private static final String CREATED_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE =
"Created a retriever factory backed by a persistent store.";

/** Maximum number of time windows to log. */
private static final int MAXIMUM_TIME_WINDOWS_TO_LOG = 1000;
Expand Down Expand Up @@ -191,11 +191,11 @@ static void createAndPublishSummaryStatistics( EvaluationDetails evaluationDetai

// Main dataset
EvaluationUtilities.createAndPublishSummaryStatistics( evaluationDetails.summaryStatistics(),
evaluationDetails.evaluation() );
evaluationDetails.evaluationMessager() );

// Baseline
EvaluationUtilities.createAndPublishSummaryStatistics( evaluationDetails.summaryStatisticsForBaseline(),
evaluationDetails.evaluation() );
evaluationDetails.evaluationMessager() );
}

/**
Expand Down Expand Up @@ -520,15 +520,26 @@ static Set<Format> getFormatsDeliveredByExternalSubscribers()
/**
* Creates the pool requests from the project.
*
* @param evaluationDescription the evaluation description
* @param poolFactory the pool factory
* @param evaluation the evaluation description
* @param evaluationDetails the evaluation details
* @return the pool requests
*/

static List<PoolRequest> getPoolRequests( PoolFactory poolFactory,
Evaluation evaluationDescription )
Evaluation evaluation,
EvaluationDetails evaluationDetails )
{
List<PoolRequest> poolRequests = poolFactory.getPoolRequests( evaluationDescription );
RetrieverFactory<Double, Double, Double> retriever = null;

// Event detection supports single-valued datasets only
if ( Objects.nonNull( evaluationDetails.declaration()
.eventDetection() ) )
{
retriever = EvaluationUtilities.getSingleValuedRetrieverFactory( evaluationDetails );
}

List<PoolRequest> poolRequests = poolFactory.getPoolRequests( evaluation, retriever );

// Log some information about the pools
if ( LOGGER.isInfoEnabled() )
Expand Down Expand Up @@ -627,18 +638,18 @@ static GriddedFeatures.Builder getGriddedFeaturesCache( EvaluationDeclaration de

/**
* Forcibly stops an evaluation messager on encountering an error, if already created.
* @param evaluation the evaluation messager
* @param evaluationMessager the evaluation messager
* @param error the error
* @param evaluationId the evaluation identifier
*/
static void forceStop( EvaluationMessager evaluation, Exception error, String evaluationId )
static void forceStop( EvaluationMessager evaluationMessager, Exception error, String evaluationId )
{
if ( Objects.nonNull( evaluation ) )
if ( Objects.nonNull( evaluationMessager ) )
{
// Stop forcibly
LOGGER.debug( FORCIBLY_STOPPING_EVALUATION_UPON_ENCOUNTERING_AN_INTERNAL_ERROR, evaluationId );

evaluation.stop( error );
evaluationMessager.stop( error );
}
}

Expand Down Expand Up @@ -747,6 +758,33 @@ static boolean hasEventThresholdsThatVaryAcrossFeatures( Set<MetricsAndThreshold
.anyMatch( c -> c.size() > 1 );
}

/**
* Returns a {@link RetrieverFactory} for single-valued datasets.
* @param details the evaluation details
* @return the retriever factory
*/
static RetrieverFactory<Double, Double, Double> getSingleValuedRetrieverFactory( EvaluationDetails details )
{
// Create a retriever factory to support retrieval for this project
RetrieverFactory<Double, Double, Double> retrieverFactory;
if ( details.hasInMemoryStore() )
{
LOGGER.debug( CREATED_AN_IN_MEMORY_RETRIEVER_FACTORY );
retrieverFactory = SingleValuedRetrieverFactoryInMemory.of( details.project(),
details.timeSeriesStore() );
}
else
{
LOGGER.debug( CREATED_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE );
retrieverFactory = SingleValuedRetrieverFactory.of( details.project(),
details.databaseServices()
.database(),
details.caches() );
}

return retrieverFactory;
}

/**
* Creates and publishes the summary statistics.
* @param summaryStatistics the summary statistics calculators, mapped by message group identifier
Expand Down Expand Up @@ -878,21 +916,8 @@ private static List<PoolProcessor<Double, Double>> getSingleValuedPoolProcessors
executors.metricExecutor() );

// Create a retriever factory to support retrieval for this project
RetrieverFactory<Double, Double, Double> retrieverFactory;
if ( evaluationDetails.hasInMemoryStore() )
{
LOGGER.debug( PERFORMING_RETRIEVAL_WITH_AN_IN_MEMORY_RETRIEVER_FACTORY );
retrieverFactory = SingleValuedRetrieverFactoryInMemory.of( evaluationDetails.project(),
evaluationDetails.timeSeriesStore() );
}
else
{
LOGGER.debug( PERFORMING_RETRIEVAL_WITH_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE );
retrieverFactory = SingleValuedRetrieverFactory.of( project,
evaluationDetails.databaseServices()
.database(),
evaluationDetails.caches() );
}
RetrieverFactory<Double, Double, Double> retrieverFactory =
EvaluationUtilities.getSingleValuedRetrieverFactory( evaluationDetails );

// Create the pool suppliers for all pools in this evaluation
PoolFactory poolFactory = poolDetails.poolFactory();
Expand Down Expand Up @@ -952,7 +977,7 @@ private static List<PoolProcessor<Double, Double>> getSingleValuedPoolProcessors
.setSamplingUncertaintyExecutor( executors.samplingUncertaintyExecutor() )
.setPoolRequest( poolRequest )
.setPoolSupplier( poolSupplier )
.setEvaluation( evaluationDetails.evaluation() )
.setEvaluation( evaluationDetails.evaluationMessager() )
.setMonitor( evaluationDetails.monitor() )
.setTraceCountEstimator( SINGLE_VALUED_TRACE_COUNT_ESTIMATOR )
.setSeparateMetricsForBaseline( separateMetrics )
Expand Down Expand Up @@ -1021,26 +1046,27 @@ private static List<PoolProcessor<Double, Ensemble>> getEnsemblePoolProcessors(
List<GeneratedBaselines> supported = Arrays.stream( GeneratedBaselines.values() )
.filter( GeneratedBaselines::isEnsemble )
.toList();
throw new DeclarationException( "Discovered an evaluation with ensemble forecasts and a generated "
+ "'baseline' with a 'method' of '"
+ method
+ "'. However, this 'method' produces single-valued forecasts, which "
+ "is not allowed. Please declare a baseline that contains ensemble "
+ "forecasts and try again. The following 'method' options support "
+ "ensemble forecasts: "
+ supported );
throw new DeclarationException(
"Discovered an evaluation with ensemble forecasts and a generated "
+ "'baseline' with a 'method' of '"
+ method
+ "'. However, this 'method' produces single-valued forecasts, which "
+ "is not allowed. Please declare a baseline that contains ensemble "
+ "forecasts and try again. The following 'method' options support "
+ "ensemble forecasts: "
+ supported );
}

RetrieverFactory<Double, Ensemble, Double> retrieverFactory;
if ( evaluationDetails.hasInMemoryStore() )
{
LOGGER.debug( PERFORMING_RETRIEVAL_WITH_AN_IN_MEMORY_RETRIEVER_FACTORY );
LOGGER.debug( CREATED_AN_IN_MEMORY_RETRIEVER_FACTORY );
retrieverFactory = EnsembleSingleValuedRetrieverFactoryInMemory.of( evaluationDetails.project(),
evaluationDetails.timeSeriesStore() );
}
else
{
LOGGER.debug( PERFORMING_RETRIEVAL_WITH_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE );
LOGGER.debug( CREATED_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE );
retrieverFactory = EnsembleSingleValuedRetrieverFactory.of( project,
evaluationDetails.databaseServices()
.database(),
Expand All @@ -1057,13 +1083,13 @@ private static List<PoolProcessor<Double, Ensemble>> getEnsemblePoolProcessors(
RetrieverFactory<Double, Ensemble, Ensemble> retrieverFactory;
if ( evaluationDetails.hasInMemoryStore() )
{
LOGGER.debug( PERFORMING_RETRIEVAL_WITH_AN_IN_MEMORY_RETRIEVER_FACTORY );
LOGGER.debug( CREATED_AN_IN_MEMORY_RETRIEVER_FACTORY );
retrieverFactory = EnsembleRetrieverFactoryInMemory.of( evaluationDetails.project(),
evaluationDetails.timeSeriesStore() );
}
else
{
LOGGER.debug( PERFORMING_RETRIEVAL_WITH_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE );
LOGGER.debug( CREATED_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE );
retrieverFactory = EnsembleRetrieverFactory.of( project,
evaluationDetails.databaseServices()
.database(),
Expand Down Expand Up @@ -1136,7 +1162,7 @@ private static List<PoolProcessor<Double, Ensemble>> getEnsemblePoolProcessors(
.setSamplingUncertaintyExecutor( executors.samplingUncertaintyExecutor() )
.setPoolRequest( poolRequest )
.setPoolSupplier( poolSupplier )
.setEvaluation( evaluationDetails.evaluation() )
.setEvaluation( evaluationDetails.evaluationMessager() )
.setMonitor( evaluationDetails.monitor() )
.setTraceCountEstimator( ENSEMBLE_TRACE_COUNT_ESTIMATOR )
.setSeparateMetricsForBaseline( separateMetrics )
Expand Down
14 changes: 10 additions & 4 deletions src/wres/pipeline/Evaluator.java
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,11 @@ private Pair<Set<Path>, String> evaluate( SystemSettings systemSettings,
// Re-assign the declaration augmented by the ingested data
declarationWithFeaturesAndThresholds = project.getDeclaration();

// Update the evaluation details with the newly created project
evaluationDetails = EvaluationDetailsBuilder.builder( evaluationDetails )
.project( project )
.build();

LOGGER.debug( "Finished ingest of time-series data." );

// Set the project hash for identification
Expand Down Expand Up @@ -706,7 +711,9 @@ private Pair<Set<Path>, String> evaluate( SystemSettings systemSettings,
evaluationMessager.start();

PoolFactory poolFactory = PoolFactory.of( project );
List<PoolRequest> poolRequests = EvaluationUtilities.getPoolRequests( poolFactory, evaluationDescription );
List<PoolRequest> poolRequests = EvaluationUtilities.getPoolRequests( poolFactory,
evaluationDescription,
evaluationDetails );

int poolCount = poolRequests.size();
monitor.setPoolCount( poolCount );
Expand Down Expand Up @@ -748,11 +755,10 @@ private Pair<Set<Path>, String> evaluate( SystemSettings systemSettings,
clearThresholdValues );
}

// Set the project and evaluation, metrics and thresholds and summary statistics
// Set the project and evaluation messager, metrics and thresholds and summary statistics
evaluationDetails =
EvaluationDetailsBuilder.builder( evaluationDetails )
.project( project )
.evaluation( evaluationMessager )
.evaluationMessager( evaluationMessager )
.declaration( declarationWithFeaturesAndThresholds )
.metricsAndThresholds( metricsAndThresholds )
.summaryStatistics( summaryStatsCalculators )
Expand Down
2 changes: 1 addition & 1 deletion src/wres/pipeline/package-info.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* This is where an evaluation pipeline is created. An evaluation pipeline involves reading and ingesting time-series,
* This is where an evaluation pipeline is created. An evaluation pipeline involves reading and ingesting time-series,
* retrieving time-series and placing them into pools (performing rescaling and pairing, as needed), calculating
* statistics by applying metric functions to pools and, finally, writing statistics to formats. The application of a
* metric to a pool produces a statistic.
Expand Down
Loading

0 comments on commit 38a8a21

Please sign in to comment.