Skip to content

Commit

Permalink
Merge pull request #291 from NOAA-OWP/issue66
Browse files Browse the repository at this point in the history
Consistently scrub threshold values from the summary statistics for g…
  • Loading branch information
james-d-brown authored Aug 23, 2024
2 parents 1d5b305 + 3a105c3 commit fbb5ab8
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 71 deletions.
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2122,5 +2122,4 @@ configurations.runtimeOnly {
exclude group: 'org.jboss.logmanager', module: 'jboss-logmanager'
}

defaultTasks 'installDist', 'test', 'javadoc'

defaultTasks 'installDist', 'test', 'javadoc'
166 changes: 110 additions & 56 deletions src/wres/pipeline/EvaluationUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -122,10 +123,6 @@ class EvaluationUtilities
private static final String PERFORMING_RETRIEVAL_WITH_AN_IN_MEMORY_RETRIEVER_FACTORY =
"Performing retrieval with an in-memory retriever factory.";

/** Metadata adapter for thresholds. */
private static final BinaryOperator<Statistics> METADATA_ADAPTER_FOR_THRESHOLDS =
EvaluationUtilities.getMetadataAdapterForThresholds();

/** Re-used string. */
private static final String PERFORMING_RETRIEVAL_WITH_A_RETRIEVER_FACTORY_BACKED_BY_A_PERSISTENT_STORE =
"Performing retrieval with a retriever factory backed by a persistent store.";
Expand Down Expand Up @@ -184,57 +181,37 @@ static void createAndPublishStatistics( EvaluationDetails evaluationDetails,
}

/**
* Creates and publishes the summary statistics.
* @param summaryStatistics the summary statistics calculators, mapped by message group identifier
* @param messager the evaluation messager
* @throws NullPointerException if any input is null
* Create and publish the summary statistics.
*
* @param evaluationDetails the evaluation details
*/
static void createAndPublishSummaryStatistics( Map<String, List<SummaryStatisticsCalculator>> summaryStatistics,
EvaluationMessager messager )
static void createAndPublishSummaryStatistics( EvaluationDetails evaluationDetails )
{
Objects.requireNonNull( summaryStatistics );
Objects.requireNonNull( messager );

LOGGER.debug( "Publishing summary statistics from {} summary statistics calculators.",
summaryStatistics.size() );

// Publish the summary statistics per message group
Set<String> groupIds = new HashSet<>();
for ( Map.Entry<String, List<SummaryStatisticsCalculator>> next : summaryStatistics.entrySet() )
{
// Generate the summary statistics
String groupId = next.getKey();
List<SummaryStatisticsCalculator> calculators = next.getValue();
List<Statistics> nextStatistics = calculators.stream()
.flatMap( c -> c.get()
.stream() )
.toList();

nextStatistics.forEach( m -> messager.publish( m, groupId ) );

groupIds.add( groupId );

LOGGER.debug( "Published {} summary statistics for group {}", nextStatistics.size(), groupId );
}
Objects.requireNonNull( evaluationDetails );

// Mark the publication complete for all groups
groupIds.forEach( messager::markGroupPublicationCompleteReportedSuccess );
// Main dataset
EvaluationUtilities.createAndPublishSummaryStatistics( evaluationDetails.summaryStatistics(),
evaluationDetails.evaluation() );

LOGGER.debug( "Finished publishing summary statistics." );
// Baseline
EvaluationUtilities.createAndPublishSummaryStatistics( evaluationDetails.summaryStatisticsForBaseline(),
evaluationDetails.evaluation() );
}

/**
* Generates a collection of {@link SummaryStatisticsCalculator} from an {@link EvaluationDeclaration}. Currently,
* supports only {@link wres.statistics.generated.SummaryStatistic.StatisticDimension#FEATURES}.
* @param declaration the evaluation declaration
* @param poolCount the number of pools for which raw (non-summary) statistics are required
* @param clearThresholdValues is true to clear event threshold values from the summary statistics, false otherwise
* @return the summary statistics calculators
* @throws NullPointerException if any input is null
* @throws IllegalArgumentException if the dimension is unsupported
*/

static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisticsCalculators( EvaluationDeclaration declaration,
long poolCount )
long poolCount,
boolean clearThresholdValues )
{
Objects.requireNonNull( declaration );

Expand All @@ -248,6 +225,7 @@ static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisticsCalcul
}

// Collect the geographic feature dimensions to aggregate, which are the only supported dimensions
// Note that clearing threshold values is linked to these aggregation dimensions.
Set<SummaryStatistic.StatisticDimension> dimensions =
declaration.summaryStatistics()
.stream()
Expand All @@ -256,7 +234,10 @@ static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisticsCalcul
|| d == SummaryStatistic.StatisticDimension.FEATURES )
.collect( Collectors.toSet() );

return EvaluationUtilities.getSummaryStatisticsForFeatures( declaration, dimensions, poolCount );
return EvaluationUtilities.getSummaryStatisticsForFeatures( declaration,
dimensions,
poolCount,
clearThresholdValues );
}

/**
Expand Down Expand Up @@ -738,6 +719,74 @@ static Set<FeatureGroup> getFeatureGroupsForSummaryStatisticsOnly( Set<FeatureGr
return Collections.unmodifiableSet( groups );
}

/**
* Determines whether there are event thresholds with the same name whose values vary across geographic features.
* @param metricsAndThresholds the metrics and thresholds
* @return true if there are event thresholds that vary across features, false if they are fixed
* @throws NullPointerException if the input is null
*/
static boolean hasEventThresholdsThatVaryAcrossFeatures( Set<MetricsAndThresholds> metricsAndThresholds )
{
Objects.requireNonNull( metricsAndThresholds );

// Group the thresholds by name and determine whether any groups contain thresholds with different values
return metricsAndThresholds.stream()
.flatMap( s -> s.thresholds()
.values()
.stream()
.flatMap( Collection::stream ) )
// Group the thresholds by name
.collect( Collectors.groupingBy( t -> t.getThreshold()
.getName() ) )
// Are there any named thresholds with different threshold values?
.values()
.stream()
.map( t -> t.stream()
.map( ThresholdOuter::getValues )
.collect( Collectors.toSet() ) )
.anyMatch( c -> c.size() > 1 );
}

/**
* Creates and publishes the summary statistics.
* @param summaryStatistics the summary statistics calculators, mapped by message group identifier
* @param messager the evaluation messager
* @throws NullPointerException if any input is null
*/
private static void createAndPublishSummaryStatistics( Map<String, List<SummaryStatisticsCalculator>> summaryStatistics,
EvaluationMessager messager )
{
Objects.requireNonNull( summaryStatistics );
Objects.requireNonNull( messager );

LOGGER.debug( "Publishing summary statistics from {} summary statistics calculators.",
summaryStatistics.size() );

// Publish the summary statistics per message group
Set<String> groupIds = new HashSet<>();
for ( Map.Entry<String, List<SummaryStatisticsCalculator>> next : summaryStatistics.entrySet() )
{
// Generate the summary statistics
String groupId = next.getKey();
List<SummaryStatisticsCalculator> calculators = next.getValue();
List<Statistics> nextStatistics = calculators.stream()
.flatMap( c -> c.get()
.stream() )
.toList();

nextStatistics.forEach( m -> messager.publish( m, groupId ) );

groupIds.add( groupId );

LOGGER.debug( "Published {} summary statistics for group {}", nextStatistics.size(), groupId );
}

// Mark the publication complete for all groups
groupIds.forEach( messager::markGroupPublicationCompleteReportedSuccess );

LOGGER.debug( "Finished publishing summary statistics." );
}

/**
* Creates one pool task for each pool request and then chains them together, such that all of the pools complete
* nominally or one completes exceptionally.
Expand Down Expand Up @@ -1518,13 +1567,15 @@ private static List<Predicate<Statistics>> join( List<Predicate<Statistics>> fir
* @param dimensions the feature dimensions over which to perform aggregation
* @return the summary statistics calculators
* @param poolCount the number of pools for which raw (non-summary) statistics are required
* @param clearThresholdValues is true to clear event threshold values, false otherwise
* @throws NullPointerException if any input is null
* @throws IllegalArgumentException if the dimension is unsupported
*/

private static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisticsForFeatures( EvaluationDeclaration declaration,
Set<SummaryStatistic.StatisticDimension> dimensions,
long poolCount )
long poolCount,
boolean clearThresholdValues )
{
Objects.requireNonNull( declaration );

Expand All @@ -1538,7 +1589,8 @@ private static Map<String, List<SummaryStatisticsCalculator>> getSummaryStatisti
List<TimeWindowAndThresholdFilterAdapter> timeWindowAndThresholdFilters =
EvaluationUtilities.getTimeWindowAndThresholdFilters( timeWindows,
thresholds,
separateMetricsForBaseline );
separateMetricsForBaseline,
clearThresholdValues );

// Get the geographic feature filters and metadata adapters
List<FeatureGroupFilterAdapter> featureFilters = new ArrayList<>();
Expand Down Expand Up @@ -1656,9 +1708,9 @@ else if ( behavioralName.isInGroup( MetricConstants.StatisticType.BOXPLOT_PER_PO
BinaryOperator<Statistics> poolNumberAdapter =
EvaluationUtilities.getMetadataAdapterForPoolNumber( poolNumber );

BinaryOperator<Statistics> thresholdAdapter = nextInnerFilter.adapter();
UnaryOperator<Statistics> thresholdAdapter = nextInnerFilter.adapter();
BinaryOperator<Statistics> metadataAdapter = ( p, q ) ->
poolNumberAdapter.apply( featureAdapter.apply( thresholdAdapter.apply( p, q ), q ), q );
poolNumberAdapter.apply( featureAdapter.apply( thresholdAdapter.apply( p ), q ), q );

SummaryStatisticsCalculator calculator = SummaryStatisticsCalculator.of( nextScalar,
nextDiagrams,
Expand Down Expand Up @@ -1714,12 +1766,13 @@ private static BinaryOperator<Statistics> getMetadataAdapterForFeatureGroup( Geo
/**
* Creates a metadata adapter that removes threshold values if they are unequal across instances.
*
* @param clearThresholds is true to clear event threshold values, false otherwise
* @return the metadata adapter
*/

private static BinaryOperator<Statistics> getMetadataAdapterForThresholds()
private static UnaryOperator<Statistics> getMetadataAdapterForThresholds( boolean clearThresholds )
{
return ( existing, latest ) ->
return existing ->
{
boolean isBaselinePool = !existing.hasPool()
&& existing.hasBaselinePool();
Expand All @@ -1729,15 +1782,11 @@ private static BinaryOperator<Statistics> getMetadataAdapterForThresholds()
wres.statistics.generated.Pool.Builder existingPool =
isBaselinePool ? adjusted.getBaselinePoolBuilder() : adjusted.getPoolBuilder();

wres.statistics.generated.Pool latestPool =
isBaselinePool ? latest.getBaselinePool() : latest.getPool();

// Clear the threshold values unless they are equal across statistics
// Clear the threshold values unless they are equal across statistics or represent all data
if ( existingPool.hasEventThreshold()
&& !Objects.equals( existingPool.getEventThreshold()
.getLeftThresholdValue(),
latestPool.getEventThreshold()
.getLeftThresholdValue() ) )
&& !ThresholdOuter.of( existingPool.getEventThreshold() )
.isAllDataThreshold()
&& clearThresholds )
{
// Set to missing rather than clearing: #126545
Threshold.Builder builder = existingPool.getEventThresholdBuilder()
Expand Down Expand Up @@ -1975,17 +2024,22 @@ private static FeatureGroupFilterAdapter getBaselineFeatureGroupForSummaryStatis
* @param timeWindows the time windows
* @param thresholds the thresholds
* @param separateMetricsForBaseline whether separate metrics are required for a baseline dataset
* @param clearThresholdValues is true to clear event threshold values, false otherwise
* @return the filters
*/
private static List<TimeWindowAndThresholdFilterAdapter> getTimeWindowAndThresholdFilters( Set<TimeWindow> timeWindows,
Set<wres.config.yaml.components.Threshold> thresholds,
boolean separateMetricsForBaseline )
boolean separateMetricsForBaseline,
boolean clearThresholdValues )
{
// Get the time window filters
List<Predicate<Statistics>> timeWindowFilters =
EvaluationUtilities.getTimeWindowFilters( timeWindows,
separateMetricsForBaseline );

UnaryOperator<Statistics> adapter =
EvaluationUtilities.getMetadataAdapterForThresholds( clearThresholdValues );

LOGGER.debug( "Discovered {} time windows, which produced {} filters.",
timeWindows.size(),
timeWindowFilters.size() );
Expand All @@ -2008,7 +2062,7 @@ private static List<TimeWindowAndThresholdFilterAdapter> getTimeWindowAndThresho
List<TimeWindowAndThresholdFilterAdapter> nextFilters
= joined.stream()
.map( n -> new TimeWindowAndThresholdFilterAdapter( n,
METADATA_ADAPTER_FOR_THRESHOLDS,
adapter,
nextCount ) )
.toList();
filters.addAll( nextFilters );
Expand Down Expand Up @@ -2045,7 +2099,7 @@ private record FeatureGroupFilterAdapter( GeometryGroup geometryGroup,
* @param timeWindowNumber the time window number
*/
private record TimeWindowAndThresholdFilterAdapter( Predicate<Statistics> filter,
BinaryOperator<Statistics> adapter,
UnaryOperator<Statistics> adapter,
long timeWindowNumber )
{
}
Expand Down
Loading

0 comments on commit fbb5ab8

Please sign in to comment.