diff --git a/sourcecode/scoring/constants.py b/sourcecode/scoring/constants.py index 2311325..214344a 100644 --- a/sourcecode/scoring/constants.py +++ b/sourcecode/scoring/constants.py @@ -3,7 +3,7 @@ from enum import Enum import os import time -from typing import Dict, Optional +from typing import Dict, Optional, Set import numpy as np import pandas as pd @@ -34,12 +34,17 @@ intervalHalfWidth = 0.3 # Max flip rates -prescoringAllUnlockedNotesMaxCrhChurn = 0.04 -finalUnlockedNotesWithNoNewRatingsMaxCrhChurn = 0.03 +prescoringAllUnlockedNotesMaxCrhChurn = 0.2 +prescoringAllNotesCreatedThreeToThirteenDaysAgoMaxChurn = 0.06 +finalUnlockedNotesWithNoNewRatingsMaxCrhChurn = 0.05 finalNotesWithNewRatingsMaxNewCrhChurn = 0.80 finalNotesWithNewRatingsMaxOldCrhChurn = 0.25 finalNotesThatJustFlippedStatusMaxCrhChurn = 1e8 finalNotesThatFlippedRecentlyMaxCrhChurn = 1e8 +# TODO(jiansongc): adjust these 2 below +finalNotesNmrDueToMinStableCrhTimeMaxOldCrhChurn = 1.0 +finalNotesNmrDueToMinStableCrhTimeMaxNewCrhChurn = 1.0 + # Data Filenames scoredNotesOutputPath = "scoredNotes.tsv" @@ -59,17 +64,14 @@ authorTopNotHelpfulTagValues = "authorTopNotHelpfulTagValues" modelingPopulationKey = "modelingPopulation" modelingGroupKey = "modelingGroup" +modelingMultiGroupKey = "modelingMultiGroup" numberOfTimesEarnedOutKey = "numberOfTimesEarnedOut" defaultIndexKey = "index" # Scoring Groups -coreGroups = {1, 2, 3, 6, 8, 9, 10, 11, 13, 14, 19, 21, 25} -expansionGroups = ( - # Divide into 3 grouping aggregates to prepare for multi-group models, - # and a 4th group containing leftovers - {0, 15, 17, 24, 29, 30} | {4, 5, 7, 12, 26} | {27} | {16, 20, 22, 23, 28} -) -expansionPlusGroups = {18} +coreGroups: Set[int] = {1, 2, 3, 6, 8, 9, 10, 11, 13, 14, 19, 21, 25} +expansionGroups: Set[int] = {0, 4, 5, 7, 12, 16, 18, 20, 22, 23, 24, 26, 27, 28} +expansionPlusGroups: Set[int] = {15, 17, 29, 30} # TSV Values notHelpfulValueTsv = "NOT_HELPFUL" @@ -193,6 +195,14 @@ def rater_factor_key(i): groupRaterFactor1Key = "groupRaterFactor1" groupInternalActiveRulesKey = "groupActiveRules" groupNumFinalRoundRatingsKey = "groupNumFinalRoundRatings" +# MultiGroup Model +multiGroupNoteInterceptKey = "multiGroupNoteIntercept" +multiGroupNoteFactor1Key = "multiGroupNoteFactor1" +multiGroupRatingStatusKey = "multiGroupRatingStatus" +multiGroupRaterInterceptKey = "multiGroupRaterIntercept" +multiGroupRaterFactor1Key = "multiGroupRaterFactor1" +multiGroupInternalActiveRulesKey = "multiGroupActiveRules" +multiGroupNumFinalRoundRatingsKey = "multiGroupNumFinalRoundRatings" # Topic Model topicNoteInterceptKey = "topicNoteIntercept" topicNoteFactor1Key = "topicNoteFactor1" @@ -445,6 +455,12 @@ def rater_factor_key(i): currentDecidedByKey = "currentDecidedBy" currentModelingGroupKey = "currentModelingGroup" timestampMillisOfMostRecentStatusChangeKey = "timestampMillisOfMostRecentStatusChange" +currentMultiGroupStatusKey = "currentMultiGroupStatus" +currentModelingMultiGroupKey = "currentModelingMultiGroup" +timestampMillisOfNmrDueToMinStableCrhTimeKey = "timestampMillisOfNmrDueToMinStableCrhTime" +updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey = ( + "updatedTimestampMillisOfNmrDueToMinStableCrhTime" +) noteStatusHistoryTSVColumnsAndTypes = [ (noteIdKey, np.int64), @@ -465,12 +481,22 @@ def rater_factor_key(i): (currentDecidedByKey, "category"), (currentModelingGroupKey, np.double), # TODO: int (timestampMillisOfMostRecentStatusChangeKey, np.double), # double because nullable. + (timestampMillisOfNmrDueToMinStableCrhTimeKey, np.double), # double because nullable. + (currentMultiGroupStatusKey, "category"), + (currentModelingMultiGroupKey, np.double), # TODO: int ] noteStatusHistoryTSVColumns = [col for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes] noteStatusHistoryTSVTypes = [dtype for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes] noteStatusHistoryTSVTypeMapping = { col: dtype for (col, dtype) in noteStatusHistoryTSVColumnsAndTypes } +# TODO(jiansongc): clean up after new column is in production. +noteStatusHistoryTSVColumnsOld = noteStatusHistoryTSVColumns[:-1] +noteStatusHistoryTSVColumnsAndTypesOld = noteStatusHistoryTSVColumnsAndTypes[:-1] +noteStatusHistoryTSVTypeMappingOld = { + col: dtype for (col, dtype) in noteStatusHistoryTSVColumnsAndTypesOld +} + # Earn In + Earn Out enrollmentState = "enrollmentState" @@ -587,6 +613,8 @@ def rater_factor_key(i): { coverageNoteInterceptMinKey, coverageNoteInterceptMaxKey, + groupNoteInterceptMinKey, + groupNoteInterceptMaxKey, } ) @@ -610,58 +638,64 @@ def rater_factor_key(i): (noteIdKey, np.int64), (coreNoteInterceptKey, np.double), (coreNoteFactor1Key, np.double), - (finalRatingStatusKey, str), - (firstTagKey, str), - (secondTagKey, str), + (finalRatingStatusKey, "category"), + (firstTagKey, "category"), + (secondTagKey, "category"), # Note that this column was formerly named "activeRules" and the name is now # updated to "coreActiveRules". The data values remain the compatible, # but the new column only contains rules that ran when deciding status based on # the core model. - (coreActiveRulesKey, str), - (activeFilterTagsKey, str), - (classificationKey, str), + (coreActiveRulesKey, "category"), + (activeFilterTagsKey, "category"), + (classificationKey, "category"), (createdAtMillisKey, np.int64), - (coreRatingStatusKey, str), - (metaScorerActiveRulesKey, str), - (decidedByKey, str), + (coreRatingStatusKey, "category"), + (metaScorerActiveRulesKey, "category"), + (decidedByKey, "category"), (expansionNoteInterceptKey, np.double), (expansionNoteFactor1Key, np.double), - (expansionRatingStatusKey, str), + (expansionRatingStatusKey, "category"), (coverageNoteInterceptKey, np.double), (coverageNoteFactor1Key, np.double), - (coverageRatingStatusKey, str), + (coverageRatingStatusKey, "category"), (coreNoteInterceptMinKey, np.double), (coreNoteInterceptMaxKey, np.double), - (expansionNoteInterceptMinKey, np.double), - (expansionNoteInterceptMaxKey, np.double), - (coverageNoteInterceptMinKey, np.double), - (coverageNoteInterceptMaxKey, np.double), + (expansionNoteInterceptMinKey, "category"), # category because always nan + (expansionNoteInterceptMaxKey, "category"), # category because always nan + (coverageNoteInterceptMinKey, "category"), # category because always nan + (coverageNoteInterceptMaxKey, "category"), # category because always nan (groupNoteInterceptKey, np.double), (groupNoteFactor1Key, np.double), - (groupRatingStatusKey, str), - (groupNoteInterceptMaxKey, np.double), - (groupNoteInterceptMinKey, np.double), + (groupRatingStatusKey, "category"), + (groupNoteInterceptMaxKey, "category"), # category because always nan + (groupNoteInterceptMinKey, "category"), # category because always nan (modelingGroupKey, np.float64), (numRatingsKey, np.int64), (timestampMillisOfNoteCurrentLabelKey, np.double), (expansionPlusNoteInterceptKey, np.double), (expansionPlusNoteFactor1Key, np.double), - (expansionPlusRatingStatusKey, str), + (expansionPlusRatingStatusKey, "category"), (topicNoteInterceptKey, np.double), (topicNoteFactor1Key, np.double), - (topicRatingStatusKey, str), - (noteTopicKey, str), + (topicRatingStatusKey, "category"), + (noteTopicKey, "category"), (topicNoteConfidentKey, pd.BooleanDtype()), - (expansionInternalActiveRulesKey, str), - (expansionPlusInternalActiveRulesKey, str), - (groupInternalActiveRulesKey, str), - (topicInternalActiveRulesKey, str), + (expansionInternalActiveRulesKey, "category"), + (expansionPlusInternalActiveRulesKey, "category"), + (groupInternalActiveRulesKey, "category"), + (topicInternalActiveRulesKey, "category"), (coreNumFinalRoundRatingsKey, np.double), # double because nullable. (expansionNumFinalRoundRatingsKey, np.double), # double because nullable. (expansionPlusNumFinalRoundRatingsKey, np.double), # double because nullable. (groupNumFinalRoundRatingsKey, np.double), # double because nullable. (topicNumFinalRoundRatingsKey, np.double), # double because nullable. - (rescoringActiveRulesKey, str), + (rescoringActiveRulesKey, "category"), + (multiGroupNoteInterceptKey, np.double), + (multiGroupNoteFactor1Key, np.double), + (multiGroupRatingStatusKey, str), + (modelingMultiGroupKey, np.float64), + (multiGroupInternalActiveRulesKey, str), + (multiGroupNumFinalRoundRatingsKey, np.double), # double because nullable. ] noteModelOutputTSVColumns = [col for (col, dtype) in noteModelOutputTSVColumnsAndTypes] noteModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in noteModelOutputTSVColumnsAndTypes} @@ -733,6 +767,9 @@ def rater_factor_key(i): (expansionRaterFactor1Key, np.double), (expansionPlusRaterInterceptKey, np.double), (expansionPlusRaterFactor1Key, np.double), + (multiGroupRaterInterceptKey, np.double), + (multiGroupRaterFactor1Key, np.double), + (modelingMultiGroupKey, np.float64), ] raterModelOutputTSVColumns = [col for (col, dtype) in raterModelOutputTSVColumnsAndTypes] raterModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in raterModelOutputTSVColumnsAndTypes} @@ -781,6 +818,8 @@ def rater_factor_key(i): inputPathsTSVColumns = [col for (col, _) in inputPathsTSVColumnsAndTypes] inputPathsTSVTypeMapping = {col: dtype for (col, dtype) in inputPathsTSVColumnsAndTypes} +timestampMinuteOfFinalScoringOutput = "timestampMinuteOfFinalScoringOutput" + @contextmanager def time_block(label): @@ -888,6 +927,8 @@ class RescoringRuleID(Enum): NOTES_FLIPPED_PREVIOUS_RUN = 3 NEW_NOTES_NOT_RESCORED_RECENTLY_ENOUGH = 4 RECENTLY_FLIPPED_NOTES_NOT_RESCORED_RECENTLY_ENOUGH = 5 + NMR_DUE_TO_MIN_STABLE_CRH_TIME = 6 + NOTES_CREATED_SOMEWHAT_RECENTLY = 7 @dataclass diff --git a/sourcecode/scoring/enums.py b/sourcecode/scoring/enums.py index cff342f..ea1cf6c 100644 --- a/sourcecode/scoring/enums.py +++ b/sourcecode/scoring/enums.py @@ -13,6 +13,7 @@ class Scorers(Enum): MFExpansionPlusScorer = auto() ReputationScorer = auto() MFTopicScorer = auto() + MFMultiGroupScorer = auto() class Topics(Enum): diff --git a/sourcecode/scoring/mf_group_scorer.py b/sourcecode/scoring/mf_group_scorer.py index 69447f1..3a01b8c 100644 --- a/sourcecode/scoring/mf_group_scorer.py +++ b/sourcecode/scoring/mf_group_scorer.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Set, Tuple from . import constants as c from .mf_base_scorer import MFBaseScorer, coalesce_columns @@ -13,7 +13,7 @@ trialScoringGroup = 14 # Mapping of how many threads to assign to each group scorer -_groupScorerParalleism = { +groupScorerParalleism = { # Group model 13 is larger and benefits from more threads. # Others can default to 4. 13: 8 @@ -32,8 +32,6 @@ def coalesce_group_model_scored_notes(scoredNotes: pd.DataFrame) -> pd.DataFrame c.groupNoteInterceptKey, c.groupNoteFactor1Key, c.groupRatingStatusKey, - c.groupNoteInterceptMaxKey, - c.groupNoteInterceptMinKey, c.modelingGroupKey, c.groupInternalActiveRulesKey, c.groupNumFinalRoundRatingsKey, @@ -59,9 +57,9 @@ def coalesce_group_model_helpfulness_scores(helpfulnessScores: pd.DataFrame) -> class MFGroupScorer(MFBaseScorer): def __init__( self, - groupNumber: int, + includedGroups: Set[int], + groupId: int, seed: Optional[int] = None, - pseudoraters: Optional[bool] = False, groupThreshold: float = 0.8, saveIntermediateState: bool = False, userFactorLambda=None, @@ -86,6 +84,7 @@ def __init__( tagConsensusHarassmentHelpfulRatingPenalty: int = 10, tagFilterPercentile: int = 95, incorrectFilterThreshold: float = 2.5, + threads: int = 4, ) -> None: """Configure MFGroupScorer object. @@ -104,14 +103,14 @@ def __init__( for the model to be active """ super().__init__( - includedGroups={groupNumber}, + includedGroups=includedGroups, includeUnassigned=False, captureThreshold=groupThreshold, seed=seed, - pseudoraters=pseudoraters, + pseudoraters=False, useStableInitialization=False, saveIntermediateState=saveIntermediateState, - threads=_groupScorerParalleism.get(groupNumber, 4), + threads=threads, userFactorLambda=userFactorLambda, noteFactorLambda=noteFactorLambda, userInterceptLambda=userInterceptLambda, @@ -135,22 +134,23 @@ def __init__( tagFilterPercentile=tagFilterPercentile, incorrectFilterThreshold=incorrectFilterThreshold, ) - assert groupNumber > 0, "groupNumber must be positive. 0 is reserved for unassigned." - assert groupNumber <= groupScorerCount, "groupNumber exceeds maximum expected groups." - self._groupNumber = groupNumber - self._groupNoteInterceptKey = f"{c.groupNoteInterceptKey}_{self._groupNumber}" - self._groupNoteFactor1Key = f"{c.groupNoteFactor1Key}_{self._groupNumber}" - self._groupRatingStatusKey = f"{c.groupRatingStatusKey}_{self._groupNumber}" - self._groupNoteInterceptMaxKey = f"{c.groupNoteInterceptMaxKey}_{self._groupNumber}" - self._groupNoteInterceptMinKey = f"{c.groupNoteInterceptMinKey}_{self._groupNumber}" - self._groupInternalActiveRulesKey = f"{c.groupInternalActiveRulesKey}_{self._groupNumber}" - self._groupNumFinalRoundRatingsKey = f"{c.groupNumFinalRoundRatingsKey}_{self._groupNumber}" - self._groupRaterInterceptKey = f"{c.groupRaterInterceptKey}_{self._groupNumber}" - self._groupRaterFactor1Key = f"{c.groupRaterFactor1Key}_{self._groupNumber}" - self._modelingGroupKey = f"{c.modelingGroupKey}_{self._groupNumber}" + assert groupId > 0, "groupNumber must be positive. 0 is reserved for unassigned." + self._groupId = groupId + self._init_column_names() + + def _init_column_names(self): + """Initialize column names based on prefixes and groupId.""" + self._groupNoteInterceptKey = f"{c.groupNoteInterceptKey}_{self._groupId}" + self._groupNoteFactor1Key = f"{c.groupNoteFactor1Key}_{self._groupId}" + self._groupRatingStatusKey = f"{c.groupRatingStatusKey}_{self._groupId}" + self._groupInternalActiveRulesKey = f"{c.groupInternalActiveRulesKey}_{self._groupId}" + self._groupNumFinalRoundRatingsKey = f"{c.groupNumFinalRoundRatingsKey}_{self._groupId}" + self._groupRaterInterceptKey = f"{c.groupRaterInterceptKey}_{self._groupId}" + self._groupRaterFactor1Key = f"{c.groupRaterFactor1Key}_{self._groupId}" + self._modelingGroupKey = f"{c.modelingGroupKey}_{self._groupId}" def get_name(self): - return f"MFGroupScorer_{self._groupNumber}" + return f"MFGroupScorer_{self._groupId}" def _get_note_col_mapping(self) -> Dict[str, str]: """Returns a dict mapping default note column names to custom names for a specific model.""" @@ -158,8 +158,6 @@ def _get_note_col_mapping(self) -> Dict[str, str]: c.internalNoteInterceptKey: self._groupNoteInterceptKey, c.internalNoteFactor1Key: self._groupNoteFactor1Key, c.internalRatingStatusKey: self._groupRatingStatusKey, - c.noteInterceptMinKey: self._groupNoteInterceptMinKey, - c.noteInterceptMaxKey: self._groupNoteInterceptMaxKey, c.internalActiveRulesKey: self._groupInternalActiveRulesKey, c.numFinalRoundRatingsKey: self._groupNumFinalRoundRatingsKey, c.lowDiligenceNoteInterceptKey: c.lowDiligenceLegacyNoteInterceptKey, @@ -179,8 +177,6 @@ def get_scored_notes_cols(self) -> List[str]: self._groupNoteInterceptKey, self._groupNoteFactor1Key, self._groupRatingStatusKey, - self._groupNoteInterceptMaxKey, - self._groupNoteInterceptMinKey, self._groupInternalActiveRulesKey, self._modelingGroupKey, self._groupNumFinalRoundRatingsKey, @@ -205,6 +201,8 @@ def _get_dropped_note_cols(self) -> List[str]: [ c.activeFilterTagsKey, c.ratingWeightKey, + c.noteInterceptMinKey, + c.noteInterceptMaxKey, ] + c.notHelpfulTagsAdjustedColumns + c.notHelpfulTagsAdjustedRatioColumns @@ -261,9 +259,9 @@ def _postprocess_output( ), how="left", ) - userScores = userScores[userScores[c.modelingGroupKey] == self._groupNumber] + userScores = userScores[userScores[c.modelingGroupKey].isin(self._includedGroups)] userScores = userScores.drop(columns=c.modelingGroupKey) # Set the modelingGroupKey column in each output - noteScores[self._modelingGroupKey] = self._groupNumber - userScores[self._modelingGroupKey] = self._groupNumber + noteScores[self._modelingGroupKey] = self._groupId + userScores[self._modelingGroupKey] = self._groupId return noteScores, userScores diff --git a/sourcecode/scoring/mf_multi_group_scorer.py b/sourcecode/scoring/mf_multi_group_scorer.py new file mode 100644 index 0000000..736e6f9 --- /dev/null +++ b/sourcecode/scoring/mf_multi_group_scorer.py @@ -0,0 +1,54 @@ +from . import constants as c +from .mf_base_scorer import coalesce_columns +from .mf_group_scorer import MFGroupScorer + +import pandas as pd + + +def coalesce_multi_group_model_scored_notes(scoredNotes: pd.DataFrame) -> pd.DataFrame: + """Coalesce all multi group modeling columns across note scoring. + + Since each Scorer must have distinct output columns, we use coalescing to run + multiple instances of MFGroupScorer objects and then condense the results into + a single set of columns. This approach works because each note will be scored + by at most one MFGroupScorer instance. + """ + for col in [ + c.multiGroupNoteInterceptKey, + c.multiGroupNoteFactor1Key, + c.multiGroupRatingStatusKey, + c.modelingMultiGroupKey, + c.multiGroupInternalActiveRulesKey, + c.multiGroupNumFinalRoundRatingsKey, + ]: + scoredNotes = coalesce_columns(scoredNotes, col) + + return scoredNotes + + +def coalesce_multi_group_model_helpfulness_scores(helpfulnessScores: pd.DataFrame) -> pd.DataFrame: + """Coalesce all group modeling columns across user scoring. + + Since each Scorer must have distinct output columns, we use coalescing to run + multiple instances of MFGroupScorer objects and then condense the results into + a single set of columns. This approach works because each note will be scored + by at most one MFGroupScorer instance. + """ + for col in [c.multiGroupRaterInterceptKey, c.multiGroupRaterFactor1Key, c.modelingMultiGroupKey]: + helpfulnessScores = coalesce_columns(helpfulnessScores, col) + return helpfulnessScores + + +class MFMultiGroupScorer(MFGroupScorer): + def _init_column_names(self): + self._groupNoteInterceptKey = f"{c.multiGroupNoteInterceptKey}_{self._groupId}" + self._groupNoteFactor1Key = f"{c.multiGroupNoteFactor1Key}_{self._groupId}" + self._groupRatingStatusKey = f"{c.multiGroupRatingStatusKey}_{self._groupId}" + self._groupInternalActiveRulesKey = f"{c.multiGroupInternalActiveRulesKey}_{self._groupId}" + self._groupNumFinalRoundRatingsKey = f"{c.multiGroupNumFinalRoundRatingsKey}_{self._groupId}" + self._groupRaterInterceptKey = f"{c.multiGroupRaterInterceptKey}_{self._groupId}" + self._groupRaterFactor1Key = f"{c.multiGroupRaterFactor1Key}_{self._groupId}" + self._modelingGroupKey = f"{c.modelingMultiGroupKey}_{self._groupId}" + + def get_name(self): + return f"MFMultiGroupScorer_{self._groupId}" diff --git a/sourcecode/scoring/note_status_history.py b/sourcecode/scoring/note_status_history.py index 298dee5..b408c6b 100644 --- a/sourcecode/scoring/note_status_history.py +++ b/sourcecode/scoring/note_status_history.py @@ -89,6 +89,11 @@ def _update_single_note_status_history(mergedNote, currentTimeMillis, newScoredN Returns: row of pd.DataFrame """ + if not pd.isna(mergedNote[c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey]): + mergedNote[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] = mergedNote[ + c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey + ] + if mergedNote[c.finalRatingStatusKey] != mergedNote[c.currentLabelKey]: # Changed status vs. previous run: mergedNote[c.timestampMillisOfMostRecentStatusChangeKey] = currentTimeMillis @@ -110,6 +115,8 @@ def _update_single_note_status_history(mergedNote, currentTimeMillis, newScoredN mergedNote[c.currentDecidedByKey] = mergedNote[c.decidedByKey] mergedNote[c.currentModelingGroupKey] = mergedNote[c.modelingGroupKey] mergedNote[c.timestampMillisOfNoteCurrentLabelKey] = currentTimeMillis + mergedNote[c.currentMultiGroupStatusKey] = mergedNote[c.multiGroupRatingStatusKey] + mergedNote[c.currentModelingMultiGroupKey] = mergedNote[c.modelingMultiGroupKey] # Lock notes which are (1) not already locked, (2) old enough to lock and (3) # were decided by logic which has global display impact. Criteria (3) guarantees @@ -196,7 +203,10 @@ def check_flips(mergedStatuses: pd.DataFrame, noteSubset: c.NoteSubset) -> None: def _check_flips( - mergedStatuses: pd.DataFrame, maxNewCrhChurn: float, maxOldCrhChurn: Optional[float] = None + mergedStatuses: pd.DataFrame, + maxNewCrhChurn: float, + maxOldCrhChurn: Optional[float] = None, + smoothingCount: int = 100, ) -> None: if maxOldCrhChurn is None: maxOldCrhChurn = maxNewCrhChurn @@ -210,19 +220,23 @@ def _check_flips( ) if len(oldCrhNotes) > 0 and len(newCrhNotes) > 0: # Validate that changes are within allowable bounds. + smoothedNewNoteRatio = (len(newCrhNotes - oldCrhNotes)) / (len(oldCrhNotes) + smoothingCount) + rawNewNoteRatio = (len(newCrhNotes - oldCrhNotes)) / len(oldCrhNotes) print( - f"new note ratio: {(len(newCrhNotes - oldCrhNotes) / len(oldCrhNotes))}. (newCrhNotes={len(newCrhNotes)}, oldCrhNotes={len(oldCrhNotes)}, delta={len(newCrhNotes - oldCrhNotes)}" + f"Raw new note ratio: {rawNewNoteRatio}, smoothed new note ratio: {smoothedNewNoteRatio}. (newCrhNotes={len(newCrhNotes)}, oldCrhNotes={len(oldCrhNotes)}, delta={len(newCrhNotes - oldCrhNotes)}" ) + smoothedOldNoteRatio = (len(oldCrhNotes - newCrhNotes)) / (len(oldCrhNotes) + smoothingCount) + rawOldNoteRatio = (len(oldCrhNotes - newCrhNotes)) / len(oldCrhNotes) print( - f"old note ratio: {len(oldCrhNotes - newCrhNotes) / len(oldCrhNotes)} (newCrhNotes={len(newCrhNotes)}, oldCrhNotes={len(oldCrhNotes)}, delta={len(oldCrhNotes - newCrhNotes)}" + f"Raw old note ratio: {rawOldNoteRatio}, smoothed old note ratio: {smoothedOldNoteRatio}. (newCrhNotes={len(newCrhNotes)}, oldCrhNotes={len(oldCrhNotes)}, delta={len(oldCrhNotes - newCrhNotes)}" ) assert ( - (len(newCrhNotes - oldCrhNotes) / len(oldCrhNotes)) < maxNewCrhChurn + smoothedNewNoteRatio < maxNewCrhChurn ), f"Too many new CRH notes: newCrhNotes={len(newCrhNotes)}, oldCrhNotes={len(oldCrhNotes)}, delta={len(newCrhNotes - oldCrhNotes)}" assert ( - (len(oldCrhNotes - newCrhNotes) / len(oldCrhNotes)) < maxOldCrhChurn + smoothedOldNoteRatio < maxOldCrhChurn ), f"Too many notes lost CRH status: oldCrhNotes={len(oldCrhNotes)}, newCrhNotes={len(newCrhNotes)}, delta={len(oldCrhNotes - newCrhNotes)}" @@ -242,6 +256,9 @@ def merge_old_and_new_note_statuses( c.expansionRatingStatusKey, c.groupRatingStatusKey, c.modelingGroupKey, + c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey, + c.multiGroupRatingStatusKey, + c.modelingMultiGroupKey, ] ].rename( { diff --git a/sourcecode/scoring/process_data.py b/sourcecode/scoring/process_data.py index 9e4cf42..ef0409e 100644 --- a/sourcecode/scoring/process_data.py +++ b/sourcecode/scoring/process_data.py @@ -188,19 +188,36 @@ def read_from_tsv( if noteStatusHistoryPath is None: noteStatusHistory = None else: - noteStatusHistory = tsv_reader( - noteStatusHistoryPath, - c.noteStatusHistoryTSVTypeMapping, - c.noteStatusHistoryTSVColumns, - header=headers, - convertNAToNone=False, - ) - assert len(noteStatusHistory.columns.values) == len(c.noteStatusHistoryTSVColumns) and all( - noteStatusHistory.columns == c.noteStatusHistoryTSVColumns - ), ( - f"noteStatusHistory columns don't match: \n{[col for col in noteStatusHistory.columns if not col in c.noteStatusHistoryTSVColumns]} are extra columns, " - + f"\n{[col for col in c.noteStatusHistoryTSVColumns if not col in noteStatusHistory.columns]} are missing." - ) + # TODO(jiansongc): clean up after new column is in production. + try: + noteStatusHistory = tsv_reader( + noteStatusHistoryPath, + c.noteStatusHistoryTSVTypeMapping, + c.noteStatusHistoryTSVColumns, + header=headers, + convertNAToNone=False, + ) + assert len(noteStatusHistory.columns.values) == len(c.noteStatusHistoryTSVColumns) and all( + noteStatusHistory.columns == c.noteStatusHistoryTSVColumns + ), ( + f"noteStatusHistory columns don't match: \n{[col for col in noteStatusHistory.columns if not col in c.noteStatusHistoryTSVColumns]} are extra columns, " + + f"\n{[col for col in c.noteStatusHistoryTSVColumns if not col in noteStatusHistory.columns]} are missing." + ) + except ValueError: + noteStatusHistory = tsv_reader( + noteStatusHistoryPath, + c.noteStatusHistoryTSVTypeMappingOld, + c.noteStatusHistoryTSVColumnsOld, + header=headers, + convertNAToNone=False, + ) + noteStatusHistory[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] = np.nan + assert len(noteStatusHistory.columns.values) == len(c.noteStatusHistoryTSVColumns) and all( + noteStatusHistory.columns == c.noteStatusHistoryTSVColumns + ), ( + f"noteStatusHistory columns don't match: \n{[col for col in noteStatusHistory.columns if not col in c.noteStatusHistoryTSVColumns]} are extra columns, " + + f"\n{[col for col in c.noteStatusHistoryTSVColumns if not col in noteStatusHistory.columns]} are missing." + ) if userEnrollmentPath is None: userEnrollment = None diff --git a/sourcecode/scoring/run_scoring.py b/sourcecode/scoring/run_scoring.py index 9df92b7..a2bd35b 100644 --- a/sourcecode/scoring/run_scoring.py +++ b/sourcecode/scoring/run_scoring.py @@ -26,8 +26,14 @@ coalesce_group_model_helpfulness_scores, coalesce_group_model_scored_notes, groupScorerCount, + groupScorerParalleism, trialScoringGroup, ) +from .mf_multi_group_scorer import ( + MFMultiGroupScorer, + coalesce_multi_group_model_helpfulness_scores, + coalesce_multi_group_model_scored_notes, +) from .mf_topic_scorer import MFTopicScorer, coalesce_topic_models from .pandas_utils import get_df_info, keep_columns from .post_selection_similarity import ( @@ -48,7 +54,6 @@ def _get_scorers( seed: Optional[int], pseudoraters: Optional[bool], - enabledScorers: Optional[Set[Scorers]], useStableInitialization: bool = True, ) -> Dict[Scorers, List[Scorer]]: """Instantiate all Scorer objects which should be used for note ranking. @@ -56,73 +61,72 @@ def _get_scorers( Args: seed (int, optional): if not None, base distinct seeds for the first and second MF rounds on this value pseudoraters (bool, optional): if True, compute optional pseudorater confidence intervals - enabledScorers: if not None, set of which scorers should be instantiated and enabled Returns: Dict[Scorers, List[Scorer]] containing instantiated Scorer objects for note ranking. """ scorers: Dict[Scorers, List[Scorer]] = dict() - - if enabledScorers is None or Scorers.MFCoreScorer in enabledScorers: - scorers[Scorers.MFCoreScorer] = [ - MFCoreScorer(seed, pseudoraters, useStableInitialization=useStableInitialization, threads=12) - ] - if enabledScorers is None or Scorers.MFExpansionScorer in enabledScorers: - scorers[Scorers.MFExpansionScorer] = [ - MFExpansionScorer(seed, useStableInitialization=useStableInitialization, threads=12) - ] - if enabledScorers is None or Scorers.MFExpansionPlusScorer in enabledScorers: - scorers[Scorers.MFExpansionPlusScorer] = [ - MFExpansionPlusScorer(seed, useStableInitialization=useStableInitialization, threads=12) - ] - if enabledScorers is None or Scorers.ReputationScorer in enabledScorers: - scorers[Scorers.ReputationScorer] = [ - ReputationScorer(seed, useStableInitialization=useStableInitialization, threads=12) - ] - if enabledScorers is None or Scorers.MFGroupScorer in enabledScorers: - # Note that index 0 is reserved, corresponding to no group assigned, so scoring group - # numbers begin with index 1. - scorers[Scorers.MFGroupScorer] = [ - # Scoring Group 13 is currently the largest by far, so total runtime benefits from - # adding the group scorers in descending order so we start work on Group 13 first. - MFGroupScorer(groupNumber=i, seed=seed) - for i in range(groupScorerCount, 0, -1) - if i != trialScoringGroup - ] - scorers[Scorers.MFGroupScorer].append( - MFGroupScorer( - groupNumber=trialScoringGroup, - seed=seed, - noteInterceptLambda=0.03 * 30, - userInterceptLambda=0.03 * 5, - globalInterceptLambda=0.03 * 5, - noteFactorLambda=0.03 / 3, - userFactorLambda=0.03 / 4, - diamondLambda=0.03 * 25, - normalizedLossHyperparameters=NormalizedLossHyperparameters( - globalSignNorm=True, noteSignAlpha=None, noteNormExp=0, raterNormExp=-0.25 - ), - maxFinalMFTrainError=0.16, - groupThreshold=0.4, - minMeanNoteScore=-0.01, - crhThreshold=0.15, - crhSuperThreshold=None, - crnhThresholdIntercept=-0.01, - crnhThresholdNoteFactorMultiplier=0, - crnhThresholdNMIntercept=-0.02, - lowDiligenceThreshold=1000, - factorThreshold=0.4, - multiplyPenaltyByHarassmentScore=False, - minimumHarassmentScoreToPenalize=2.5, - tagConsensusHarassmentHelpfulRatingPenalty=10, - tagFilterPercentile=90, - incorrectFilterThreshold=1.5, - ) + scorers[Scorers.MFCoreScorer] = [ + MFCoreScorer(seed, pseudoraters, useStableInitialization=useStableInitialization, threads=12) + ] + scorers[Scorers.MFExpansionScorer] = [ + MFExpansionScorer(seed, useStableInitialization=useStableInitialization, threads=12) + ] + scorers[Scorers.MFExpansionPlusScorer] = [ + MFExpansionPlusScorer(seed, useStableInitialization=useStableInitialization, threads=12) + ] + scorers[Scorers.ReputationScorer] = [ + ReputationScorer(seed, useStableInitialization=useStableInitialization, threads=12) + ] + # Note that index 0 is reserved, corresponding to no group assigned, so scoring group + # numbers begin with index 1. + scorers[Scorers.MFGroupScorer] = [ + # Scoring Group 13 is currently the largest by far, so total runtime benefits from + # adding the group scorers in descending order so we start work on Group 13 first. + MFGroupScorer(includedGroups={i}, groupId=i, threads=groupScorerParalleism.get(i, 4), seed=seed) + for i in range(groupScorerCount, 0, -1) + if i != trialScoringGroup + ] + scorers[Scorers.MFGroupScorer].append( + MFGroupScorer( + includedGroups={trialScoringGroup}, + groupId=trialScoringGroup, + threads=groupScorerParalleism.get(trialScoringGroup, 4), + seed=seed, + noteInterceptLambda=0.03 * 30, + userInterceptLambda=0.03 * 5, + globalInterceptLambda=0.03 * 5, + noteFactorLambda=0.03 / 3, + userFactorLambda=0.03 / 4, + diamondLambda=0.03 * 25, + normalizedLossHyperparameters=NormalizedLossHyperparameters( + globalSignNorm=True, noteSignAlpha=None, noteNormExp=0, raterNormExp=-0.25 + ), + maxFinalMFTrainError=0.16, + groupThreshold=0.4, + minMeanNoteScore=-0.01, + crhThreshold=0.15, + crhSuperThreshold=None, + crnhThresholdIntercept=-0.01, + crnhThresholdNoteFactorMultiplier=0, + crnhThresholdNMIntercept=-0.02, + lowDiligenceThreshold=1000, + factorThreshold=0.4, + multiplyPenaltyByHarassmentScore=False, + minimumHarassmentScoreToPenalize=2.5, + tagConsensusHarassmentHelpfulRatingPenalty=10, + tagFilterPercentile=90, + incorrectFilterThreshold=1.5, ) - if enabledScorers is None or Scorers.MFTopicScorer in enabledScorers: - scorers[Scorers.MFTopicScorer] = [ - MFTopicScorer(topicName=topic.name, seed=seed) for topic in Topics - ] + ) + scorers[Scorers.MFTopicScorer] = [ + MFTopicScorer(topicName=topic.name, seed=seed) for topic in Topics + ] + scorers[Scorers.MFMultiGroupScorer] = [ + MFMultiGroupScorer(includedGroups={4, 5, 7, 12, 26}, groupId=1, threads=4, seed=seed), + MFMultiGroupScorer(includedGroups={24, 27}, groupId=2, threads=4, seed=seed), + MFMultiGroupScorer(includedGroups={16, 20, 22, 23, 28}, groupId=3, threads=4, seed=seed), + ] return scorers @@ -539,6 +543,7 @@ def combine_final_scorer_results( modelResult.auxiliaryNoteInfo, ) scoredNotes = coalesce_group_model_scored_notes(scoredNotes) + scoredNotes = coalesce_multi_group_model_scored_notes(scoredNotes) scoredNotes = coalesce_topic_models(scoredNotes) return scoredNotes, auxiliaryNoteInfo @@ -561,7 +566,7 @@ def convert_prescoring_rater_model_output_to_coalesced_helpfulness_scores( ] ].drop_duplicates() - scorersEnumDict = _get_scorers(seed=None, pseudoraters=None, enabledScorers=None) + scorersEnumDict = _get_scorers(seed=None, pseudoraters=None) scorers = chain(*scorersEnumDict.values()) uniqueScorerNames = prescoringRaterModelOutput[c.scorerNameKey].unique() for scorer in scorers: @@ -576,10 +581,10 @@ def convert_prescoring_rater_model_output_to_coalesced_helpfulness_scores( columns=scorer._get_user_col_mapping() ) if isinstance(scorer, MFGroupScorer): - scorerOutputExternalNames[scorer._modelingGroupKey] = scorer._groupNumber + scorerOutputExternalNames[scorer._modelingGroupKey] = scorer._groupId # Raters may appear in multiple groups due to authorship -- filter out rows not from this group scorerOutputExternalNames = scorerOutputExternalNames[ - scorerOutputExternalNames[c.modelingGroupKey] == scorer._groupNumber + scorerOutputExternalNames[c.modelingGroupKey].isin(scorer._includedGroups) ] finalCols = scorer.get_helpfulness_scores_cols() @@ -606,8 +611,9 @@ def meta_score( scorers: Dict[Scorers, List[Scorer]], scoredNotes: pd.DataFrame, auxiliaryNoteInfo: pd.DataFrame, - lockedStatus: pd.DataFrame, + noteStatusHistory: pd.DataFrame, enabledScorers: Optional[Set[Scorers]], + enableNmrDueToMinStableCrhTime: bool = False, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Determine final note status based on individual scoring results. @@ -619,7 +625,7 @@ def meta_score( Args: scoredNotes: pd.DataFrame containing all scored note results. auxiliaryNoteInfo: pd.DataFrame containing tag aggregates - lockedStatus: pd.DataFrame containing {noteId, status} pairs for all notes + noteStatusHistory: pd.DataFrame containing {noteId, lockedStatus, timestampMillisOfNmrDueToMinStableCrhTime} for all notes enabledScorers: if not None, set of which scorers should be instantiated and enabled Returns: @@ -631,7 +637,13 @@ def meta_score( with c.time_block("Post-scorers: Meta Score: Setup"): assert len(scoredNotes) == len(auxiliaryNoteInfo) scoredNotes = scoredNotes.merge( - auxiliaryNoteInfo[[c.noteIdKey] + c.helpfulTagsTSVOrder + c.notHelpfulTagsTSVOrder], + auxiliaryNoteInfo[ + [c.noteIdKey, c.currentLabelKey] + c.helpfulTagsTSVOrder + c.notHelpfulTagsTSVOrder + ], + on=c.noteIdKey, + ) + scoredNotes = scoredNotes.merge( + noteStatusHistory[[c.noteIdKey, c.timestampMillisOfNmrDueToMinStableCrhTimeKey]], on=c.noteIdKey, ) assert len(scoredNotes) == len(auxiliaryNoteInfo) @@ -668,6 +680,17 @@ def meta_score( c.coreRatingStatusKey, ) ) + if enabledScorers is None or Scorers.MFMultiGroupScorer in enabledScorers: + for i in range(1, 4): + rules.append( + scoring_rules.ApplyModelResult( + RuleID[f"MULTI_GROUP_MODEL_{i}"], + {RuleID.CORE_MODEL}, + c.multiGroupRatingStatusKey, + checkFirmReject=True, + filterColumnPairs=[(c.modelingMultiGroupKey, i)], + ) + ) if enabledScorers is None or Scorers.MFGroupScorer in enabledScorers: # TODO: modify this code to work when MFExpansionScorer is disabled by the system test assert len(scorers[Scorers.MFCoreScorer]) == 1 @@ -711,10 +734,17 @@ def meta_score( topic, ) ) + if enableNmrDueToMinStableCrhTime: + rules.append( + scoring_rules.NmrDueToMinStableCrhTime( + RuleID.NMR_DUE_TO_MIN_STABLE_CRH_TIME, + {RuleID.CORE_MODEL}, + ) + ) rules.extend( [ scoring_rules.ScoringDriftGuard( - RuleID.SCORING_DRIFT_GUARD, {RuleID.CORE_MODEL}, lockedStatus + RuleID.SCORING_DRIFT_GUARD, {RuleID.CORE_MODEL}, noteStatusHistory ), # TODO: The rule below both sets tags for notes which are CRH / CRNH and unsets status for # any notes which are CRH / CRNH but don't have enough ratings to assign two tags. The later @@ -742,6 +772,8 @@ def meta_score( c.metaScorerActiveRulesKey, decidedByColumn=c.decidedByKey, ) + if not enableNmrDueToMinStableCrhTime: + scoringResult[c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey] = np.nan # Validate that nothing that was a FIRM_REJECT or CRNH from Core or Expansion is rated CRH coreRejects = scoringResult[c.coreRatingStatusKey].isin( {c.firmReject, c.currentlyRatedNotHelpful} @@ -757,7 +789,6 @@ def meta_score( scoringResult[blockedRows & crhRows][c.metaScorerActiveRulesKey].value_counts(dropna=False) ) print(scoringResult[blockedRows & crhRows][c.decidedByKey].value_counts(dropna=False)) - with c.time_block("Post-scorers: Meta Score: Preparing Return Values"): scoredNotesCols = scoringResult[ [ @@ -767,6 +798,7 @@ def meta_score( c.firstTagKey, c.secondTagKey, c.decidedByKey, + c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey, ] ] auxiliaryNoteInfoCols = scoringResult[ @@ -954,6 +986,8 @@ def _add_deprecated_columns(scoredNotes: pd.DataFrame) -> pd.DataFrame: scoredNotes[column] = np.nan elif columnType == str: scoredNotes[column] = "" + elif columnType == "category": + scoredNotes[column] = np.nan else: assert False, f"column type {columnType} unsupported" return scoredNotes @@ -1009,6 +1043,8 @@ def run_prescoring( useStableInitialization: bool = True, pseudoraters: bool = True, checkFlips: bool = True, + enableNmrDueToMinStableCrhTime: bool = False, + previousRatingCutoffTimestampMillis: Optional[int] = None, ) -> Tuple[ pd.DataFrame, pd.DataFrame, sklearn.pipeline.Pipeline, c.PrescoringMetaOutput, pd.DataFrame ]: @@ -1040,7 +1076,6 @@ def run_prescoring( scorers = _get_scorers( seed=seed, pseudoraters=False, - enabledScorers=enabledScorers, useStableInitialization=useStableInitialization, ) @@ -1147,7 +1182,7 @@ def run_prescoring( userEnrollment=userEnrollment, seed=seed, pseudoraters=pseudoraters, - enabledScorers=None, + enabledScorers=enabledScorers, runParallel=runParallel, useStableInitialization=useStableInitialization, prescoringNoteModelOutput=prescoringNoteModelOutput, @@ -1155,6 +1190,8 @@ def run_prescoring( noteTopicClassifier=noteTopicClassifierPipe, prescoringMetaOutput=prescoringMetaOutput, checkFlips=checkFlips, + enableNmrDueToMinStableCrhTime=enableNmrDueToMinStableCrhTime, + previousRatingCutoffTimestampMillis=previousRatingCutoffTimestampMillis, ) else: scoredNotes = None @@ -1181,6 +1218,7 @@ def run_contributor_scoring( prescoringRaterModelOutput, userEnrollment ) helpfulnessScores = coalesce_group_model_helpfulness_scores(helpfulnessScores) + helpfulnessScores = coalesce_multi_group_model_helpfulness_scores(helpfulnessScores) # Compute contribution statistics and enrollment state for users. with c.time_block("Post-scorers: Compute helpfulness scores"): @@ -1316,12 +1354,38 @@ def determine_which_notes_to_rescore( ) ) + # 5. Rescore all notes that were NMRed due to MinStableCrhTime was not met. + nmrDueToMinStableCrhTimeNotes = set( + noteStatusHistory.loc[ + ( + ~noteStatusHistory[c.timestampMillisOfNmrDueToMinStableCrhTimeKey].isna() + & noteStatusHistory[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] + > 0 + ), + c.noteIdKey, + ] + ) + print( + "5. Rescore all notes that were NMRed due to MinStableCrhTime was not met.", + len(nmrDueToMinStableCrhTimeNotes), + ) + notesToRescoreSet.update(nmrDueToMinStableCrhTimeNotes) + noteSubsets.append( + c.NoteSubset( + noteSet=nmrDueToMinStableCrhTimeNotes, + maxNewCrhChurnRate=c.finalNotesNmrDueToMinStableCrhTimeMaxNewCrhChurn, + maxOldCrhChurnRate=c.finalNotesNmrDueToMinStableCrhTimeMaxOldCrhChurn, + description=c.RescoringRuleID.NMR_DUE_TO_MIN_STABLE_CRH_TIME, + ) + ) + print( f"""----\nNotes to rescore: * {len(notesWithNewRatings)} notes with new ratings since last scoring run. * {len(newNotesNotRescoredRecentlyEnough)} notes created recently and not rescored recently enough. * {len(justFlippedNotes)} notes that flipped status in the previous scoring run. * {len(recentlyFlippedNotesNotRescoredRecentlyEnough)} notes that flipped status recently and not rescored recently enough. + * {len(nmrDueToMinStableCrhTimeNotes)} notes that were NMRed due to MinStableCrhTime was not met. Overall: {len(notesToRescoreSet)} notes to rescore, out of {len(notes)} total.\n----""" ) @@ -1348,6 +1412,7 @@ def run_final_note_scoring( previousScoredNotes: Optional[pd.DataFrame] = None, previousAuxiliaryNoteInfo: Optional[pd.DataFrame] = None, previousRatingCutoffTimestampMillis: Optional[int] = 0, + enableNmrDueToMinStableCrhTime: bool = False, ): with c.time_block("Logging Final Scoring RAM usage"): print(get_df_info(notes, "notes")) @@ -1361,14 +1426,49 @@ def run_final_note_scoring( print("No previous scored notes passed; scoring all notes.") notesToRescoreSet: Set[int] = set() scoredNotesPassthrough = None + currentMillis = int(time.time() * 1000) + recentNotesAgeTooOldCutoffMillis = ( + 1000 * 60 * 60 * 24 * 13 + ) # 13 days: one less than final scoring to avoid boundary issues + recentNotesAgeTooRecentCutoffMillis = ( + 1000 * 60 * 60 * 24 * 3 + ) # 2 days, to avoid notes with too many new ratings + noteSubsets: List[c.NoteSubset] = [ c.NoteSubset( noteSet=None, maxNewCrhChurnRate=c.prescoringAllUnlockedNotesMaxCrhChurn, maxOldCrhChurnRate=c.prescoringAllUnlockedNotesMaxCrhChurn, description=c.RescoringRuleID.ALL_NOTES, - ) + ), + c.NoteSubset( + noteSet=set( + noteStatusHistory.loc[ + ( + ( + noteStatusHistory[c.createdAtMillisKey] + >= currentMillis - recentNotesAgeTooOldCutoffMillis + ) + & ( + noteStatusHistory[c.createdAtMillisKey] + < currentMillis - recentNotesAgeTooRecentCutoffMillis + ) + ), + c.noteIdKey, + ] + ), + maxNewCrhChurnRate=c.prescoringAllNotesCreatedThreeToThirteenDaysAgoMaxChurn, + maxOldCrhChurnRate=c.prescoringAllNotesCreatedThreeToThirteenDaysAgoMaxChurn, + description=c.RescoringRuleID.NOTES_CREATED_SOMEWHAT_RECENTLY, + ), ] + + noteSubsetsForProdScoring, _ = determine_which_notes_to_rescore( + notes, ratings, noteStatusHistory, previousRatingCutoffTimestampMillis + ) + for noteSubset in noteSubsetsForProdScoring: + if noteSubset.description == c.RescoringRuleID.NEW_NOTES_NOT_RESCORED_RECENTLY_ENOUGH: + noteSubsets.append(noteSubset) else: assert previousAuxiliaryNoteInfo is not None assert previousRatingCutoffTimestampMillis is not None @@ -1420,9 +1520,7 @@ def run_final_note_scoring( ) print(f"Post Selection Similarity Final Scoring: {len(ratings)} ratings remaining.") - scorers = _get_scorers( - seed, pseudoraters, enabledScorers, useStableInitialization=useStableInitialization - ) + scorers = _get_scorers(seed, pseudoraters, useStableInitialization=useStableInitialization) modelResults = _run_scorers( scorers=list(chain(*scorers.values())), @@ -1455,6 +1553,7 @@ def run_final_note_scoring( enabledScorers, strictColumns, checkFlips, + enableNmrDueToMinStableCrhTime, ) # Concat final scoring results for newly-scored notes with the results for old notes not scores. @@ -1497,6 +1596,7 @@ def post_note_scoring( enabledScorers: Optional[Set[Scorers]] = None, strictColumns: bool = True, checkFlips: bool = True, + enableNmrDueToMinStableCrhTime: bool = False, ): """ Apply individual scoring models and obtained merged result. @@ -1517,8 +1617,11 @@ def post_note_scoring( scorers, scoredNotes, auxiliaryNoteInfo, - noteStatusHistory[[c.noteIdKey, c.lockedStatusKey]], + noteStatusHistory[ + [c.noteIdKey, c.lockedStatusKey, c.timestampMillisOfNmrDueToMinStableCrhTimeKey] + ], enabledScorers, + enableNmrDueToMinStableCrhTime, ) with c.time_block("Post-scorers: Join scored notes"): @@ -1539,6 +1642,8 @@ def post_note_scoring( mergedNoteStatuses = note_status_history.merge_old_and_new_note_statuses( noteStatusHistory, scoredNotes ) + # Not needed anymore, has been merged into note_status_history. + scoredNotes = scoredNotes.drop(columns=[c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey]) scoredNotes[c.rescoringActiveRulesKey] = "" for noteSubset in noteSubsetsAndMaxFlipRates: @@ -1667,6 +1772,7 @@ def run_scoring( dataLoader=dataLoader, useStableInitialization=useStableInitialization, checkFlips=False, + previousRatingCutoffTimestampMillis=previousRatingCutoffTimestampMillis, ) print("We invoked run_scoring and are now in between prescoring and scoring.") diff --git a/sourcecode/scoring/scoring_rules.py b/sourcecode/scoring/scoring_rules.py index c9fa926..aca0745 100644 --- a/sourcecode/scoring/scoring_rules.py +++ b/sourcecode/scoring/scoring_rules.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod from collections import namedtuple from enum import Enum -from typing import Callable, Dict, List, Optional, Set, Tuple +from typing import Any, Callable, Dict, List, Optional, Set, Tuple from . import constants as c from .enums import Topics @@ -53,11 +53,15 @@ class RuleID(Enum): GROUP_MODEL_12 = RuleAndVersion("GroupModel12", "1.1", False) GROUP_MODEL_13 = RuleAndVersion("GroupModel13", "1.1", True) GROUP_MODEL_14 = RuleAndVersion("GroupModel14", "1.1", True) - INSUFFICIENT_EXPLANATION = RuleAndVersion("InsufficientExplanation", "1.0", True) - SCORING_DRIFT_GUARD = RuleAndVersion("ScoringDriftGuard", "1.0", False) TOPIC_MODEL_1 = RuleAndVersion("TopicModel01", "1.0", False) TOPIC_MODEL_2 = RuleAndVersion("TopicModel02", "1.0", False) TOPIC_MODEL_3 = RuleAndVersion("TopicModel03", "1.0", False) + MULTI_GROUP_MODEL_1 = RuleAndVersion("MultiGroupModel01", "1.0", False) + MULTI_GROUP_MODEL_2 = RuleAndVersion("MultiGroupModel02", "1.0", False) + MULTI_GROUP_MODEL_3 = RuleAndVersion("MultiGroupModel03", "1.0", False) + INSUFFICIENT_EXPLANATION = RuleAndVersion("InsufficientExplanation", "1.0", True) + SCORING_DRIFT_GUARD = RuleAndVersion("ScoringDriftGuard", "1.0", False) + NMR_DUE_TO_MIN_STABLE_CRH_TIME = RuleAndVersion("NmrDueToMinStableCrhTime", "1.0", False) def get_name(self) -> str: """Returns a string combining the name and version to uniquely name the logic of the ScoringRule.""" @@ -179,6 +183,8 @@ def __init__( ruleID: RuleID, dependencies: Set[RuleID], sourceColumn: str, + checkFirmReject: bool = False, + filterColumnPairs: List[Tuple[str, Any]] = [], ): """Propagate the note status from sourceColumn when the status is not NaN. @@ -189,11 +195,28 @@ def __init__( """ super().__init__(ruleID, dependencies) self._sourceColumn = sourceColumn + self._checkFirmReject = checkFirmReject + self._filterColumnPairs = filterColumnPairs def score_notes( self, noteStats: pd.DataFrame, currentLabels: pd.DataFrame, statusColumn: str ) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]: """Propagates any status set in sourceColumn when it is non-NaN.""" + # If necessary, prune noteStats according to prior firm rejects + if self._checkFirmReject: + coreRejects = noteStats[c.coreRatingStatusKey].isin( + {c.firmReject, c.currentlyRatedNotHelpful} + ) + expansionRejects = noteStats[c.expansionRatingStatusKey].isin( + {c.firmReject, c.currentlyRatedNotHelpful} + ) + crhBlocked = coreRejects | (noteStats[c.coreRatingStatusKey].isna() & expansionRejects) + crhNotes = noteStats[self._sourceColumn] == c.currentlyRatedHelpful + noteStats = noteStats[~(crhBlocked & crhNotes)] + # If necessary, prune noteStatus based on filter column pairs + if self._filterColumnPairs: + for col, value in self._filterColumnPairs: + noteStats = noteStats[noteStats[col] == value] # Generate the set of note status updates statusUpdateRows = ~noteStats[self._sourceColumn].isna() noteStatusUpdates = noteStats[statusUpdateRows][[c.noteIdKey, self._sourceColumn]].rename( @@ -427,6 +450,118 @@ def score_notes( return (noteStatusUpdates, None) +class NmrDueToMinStableCrhTime(ScoringRule): + def __init__( + self, + ruleID: RuleID, + dependencies: Set[RuleID], + requiredStableCrhMinutesThreshold: int = 30, + ): + """Make CRH notes NMR if it hasn't been stably CRH >= requiredStableCrhMinutesThreshold. + + Args: + rule: enum corresponding to a namedtuple defining a rule name and version string for the + ScoringRule. + dependencies: Rules which must run before this rule can run. + requiredStableCrhMinutesThreshold: threshold for required stable CRH time, in minutes. + """ + super().__init__(ruleID, dependencies) + self.requiredStableCrhMinutesThreshold = requiredStableCrhMinutesThreshold + + def score_notes( + self, noteStats: pd.DataFrame, currentLabels: pd.DataFrame, statusColumn: str + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + # Prune noteStats to exclude CRH notes (CRHed before current scoring run). + noteStats = noteStats[noteStats[c.currentLabelKey] != c.currentlyRatedHelpful] + noteStats = noteStats.merge(currentLabels, on=c.noteIdKey, how="inner") + + # Identify impacted notes: + # (1) CRH from current run + # (A) If timestampMillisOfNmrDueToMinStableCrhTime doesn't exist: + # Set status to NMR, set timestampMillisOfNmrDueToMinStableCrhTime to now. + # (B) Otherwise: + # (a) If it has been long enough since timestampMillisOfNmrDueToMinStableCrhTime, + # set status to CRH, clear timestampMillisOfNmrDueToMinStableCrhTime + # (b) Otherwise, set status to NMR. + # (2) Non-CRH from current run and timestampMillisOfNmrDueToMinStableCrhTime exists. + # Clear timestampMillisOfNmrDueToMinStableCrhTime. + noteStatusUpdates = noteStats.loc[ + (noteStats[statusColumn] == c.currentlyRatedHelpful) + | ( + ~noteStats[c.timestampMillisOfNmrDueToMinStableCrhTimeKey].isna() + & (noteStats[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] > 0) + ) + ][[c.noteIdKey, c.timestampMillisOfNmrDueToMinStableCrhTimeKey, statusColumn]] + + pd.testing.assert_frame_equal(noteStatusUpdates, noteStatusUpdates.drop_duplicates()) + + newStatusColumn = statusColumn + "_new" + noteStatusUpdates[newStatusColumn] = np.nan + noteStatusUpdates[c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey] = noteStatusUpdates[ + c.timestampMillisOfNmrDueToMinStableCrhTimeKey + ] + # (1)-(A) + noteStatusUpdates.loc[ + (noteStatusUpdates[statusColumn] == c.currentlyRatedHelpful) + & ( + noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey].isna() + | (noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] <= 0) + ), + [newStatusColumn, c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey], + ] = [c.needsMoreRatings, c.epochMillis] + # (1)-(B)-(a) + noteStatusUpdates.loc[ + (noteStatusUpdates[statusColumn] == c.currentlyRatedHelpful) + & ( + ~noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey].isna() + & (noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] > 0) + ) + & ( + c.epochMillis - noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] + >= self.requiredStableCrhMinutesThreshold * 60 * 1000 + ), + [newStatusColumn, c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey], + ] = [c.currentlyRatedHelpful, -1] + # (1)-(B)-(b) + noteStatusUpdates.loc[ + (noteStatusUpdates[statusColumn] == c.currentlyRatedHelpful) + & ( + ~noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey].isna() + & (noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] > 0) + ) + & ( + c.epochMillis - noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] + < self.requiredStableCrhMinutesThreshold * 60 * 1000 + ), + newStatusColumn, + ] = c.needsMoreRatings + # (2) + noteStatusUpdates.loc[ + (noteStatusUpdates[statusColumn] != c.currentlyRatedHelpful) + & ( + ~noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey].isna() + & (noteStatusUpdates[c.timestampMillisOfNmrDueToMinStableCrhTimeKey] > 0) + ), + c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey, + ] = -1 + + noteStatusUpdatesWithStatusChange = noteStatusUpdates.loc[ + (noteStatusUpdates[statusColumn] == c.currentlyRatedHelpful) + & (noteStatusUpdates[newStatusColumn] == c.needsMoreRatings) + ][[c.noteIdKey, newStatusColumn]] + noteStatusUpdatesWithStatusChange.rename(columns={newStatusColumn: statusColumn}, inplace=True) + + print( + f"Total notes impacted (CRH->NMR) by NmrDueToMinStableCrhTime: " + f"{len(noteStatusUpdatesWithStatusChange)}" + ) + + return ( + noteStatusUpdatesWithStatusChange, + noteStatusUpdates[[c.noteIdKey, c.updatedTimestampMillisOfNmrDueToMinStableCrhTimeKey]], + ) + + class RejectLowIntercept(ScoringRule): def __init__( self, @@ -890,8 +1025,14 @@ def apply_scoring_rules( ruleIDs.add(rule.get_rule_id()) with c.time_block(f"Calling score_notes: {rule.get_name()}"): noteStatusUpdates, additionalColumns = rule.score_notes(noteStats, noteLabels, statusColumn) - if additionalColumns is not None: + if ( + additionalColumns is not None + # This rule updates both status and NmrDueToStableCrhTime (in additional column), they can + # be on different rows. + and rule.get_rule_id() != RuleID.NMR_DUE_TO_MIN_STABLE_CRH_TIME + ): assert set(noteStatusUpdates[c.noteIdKey]) == set(additionalColumns[c.noteIdKey]) + # Update noteLabels, which will always hold at most one label per note. unsafeAllowed = {c.internalRatingStatusKey, c.finalRatingStatusKey, c.defaultIndexKey} noteLabels = (