diff --git a/config.sample b/config.sample index 8073515..ab876e3 100644 --- a/config.sample +++ b/config.sample @@ -248,33 +248,33 @@ program_dashboard_data = {{ ml_program_dashboard_data }} cloud_module_path = {{ ml_analytics_cloud_package_path }} -observation_blob_path = {{ ml_analytics_observation_cloud_blob_path }} +observation_blob_path = {{ ml_analytics_observation_azure_blob_path }} -projects_blob_path = {{ ml_analytics_project_cloud_blob_path }} +projects_blob_path = {{ ml_analytics_project_azure_blob_path }} -observation_distinctCount_blob_path = {{ ml_analytics_obs_distinctCnt_cloud_blob_path }} +observation_distinctCount_blob_path = {{ ml_analytics_obs_distinctCnt_azure_blob_path }} -observation_distinctCount_domain_blob_path = {{ ml_analytics_obs_distinctCnt_domain_cloud_blob_path }} +observation_distinctCount_domain_blob_path = {{ ml_analytics_obs_distinctCnt_domain_azure_blob_path }} -observation_distinctCount_domain_criteria_blob_path = {{ ml_analytics_obs_distinctCnt_domain_criteria_cloud_blob_path }} +observation_distinctCount_domain_criteria_blob_path = {{ ml_analytics_obs_distinctCnt_domain_criteria_azure_blob_path }} -projects_distinctCnt_blob_path = {{ ml_analytics_projects_distinctCnt_cloud_blob_path }} +projects_distinctCnt_blob_path = {{ ml_analytics_projects_distinctCnt_azure_blob_path }} -projects_distinctCnt_prgmlevel_blob_path = {{ ml_analytics_projects_distinctCnt_prglevel_cloud_blob_path }} +projects_distinctCnt_prgmlevel_blob_path = {{ ml_analytics_projects_distinctCnt_prglevel_azure_blob_path }} -projects_rollup_blob_path = {{ ml_analytics_project_rollup_cloud_blob_path }} +projects_rollup_blob_path = {{ ml_analytics_project_rollup_azure_blob_path }} -observation_rollup_blob_path = {{ ml_analytics_observation_rollup_cloud_blob_path }} +observation_rollup_blob_path = {{ ml_analytics_observation_rollup_azure_blob_path }} -survey_rollup_blob_path = {{ ml_analytics_survey_rollup_cloud_blob_path }} +survey_rollup_blob_path = {{ ml_analytics_survey_rollup_azure_blob_path }} -survey_blob_path = {{ ml_analytics_survey_cloud_blob_path }} +survey_blob_path = {{ ml_analytics_survey_azure_blob_path }} -projects_program_csv = {{ ml_analytics_program_dashboard_cloud_blob_path }} +projects_program_csv = {{ ml_analytics_program_dashboard_azure_blob_path }} -observation_batch_ingestion_data_del = {{ ml_analytics_observation_batchupdate_cloud_blob_path }} +observation_batch_ingestion_data_del = {{ ml_analytics_observation_batchupdate_azure_blob_path }} -survey_batch_ingestion_data_del = {{ml_analytics_survey_batchupdate_cloud_blob_path}} +survey_batch_ingestion_data_del = {{ml_analytics_survey_batchupdate_azure_blob_path}} cname_url = {{ ml_analytics_cname_url }} diff --git a/observations/py_observation_streaming.py b/observations/py_observation_streaming.py index 91b55e8..7788132 100755 --- a/observations/py_observation_streaming.py +++ b/observations/py_observation_streaming.py @@ -1,6 +1,6 @@ # ----------------------------------------------------------------- # Name : sl_py_observation_streaming.py -# Author : Ashwini.E , Shakthieshwari.A, Snehangsu De +# Author : Ashwini.E , Shakthieshwari.A, Snehangsu De, Sachin # Description : Program to read data from one kafka topic and # produce it to another kafka topic # ----------------------------------------------------------------- @@ -260,34 +260,38 @@ def creatingObj( observationSubQuestionsObj['programId'] = '' try: - for pgm in programsCollec.find({"_id":ObjectId(obSub['programId'])}): - observationSubQuestionsObj['programName'] = pgm['name'] - observationSubQuestionsObj['programDescription'] = pgm['description'] - except KeyError : + if 'programInfo' in obSub: + observationSubQuestionsObj['programName'] = obSub['programInfo']['name'] + observationSubQuestionsObj['programDescription'] = obSub['programInfo']['description'] + except KeyError: observationSubQuestionsObj['programName'] = '' observationSubQuestionsObj['programDescription'] = '' observationSubQuestionsObj['solutionExternalId'] = obSub['solutionExternalId'] observationSubQuestionsObj['solutionId'] = str(obSub['solutionId']) observationSubQuestionsObj['observationId'] = str(obSub['observationId']) - for critQues in criteriaQuestionsCollec.find({'_id':ObjectId(answer["criteriaId"])}): - observationSubQuestionsObj['criteriaExternalId'] = critQues['externalId'] - observationSubQuestionsObj['criteriaName'] = critQues['name'] - observationSubQuestionsObj['criteriaDescription'] = critQues['description'] - for eviCQ in critQues["evidences"] : - for secCQ in eviCQ["sections"] : - for quesCQ in secCQ["questions"] : - if str(quesCQ["_id"]) == str(answer["qid"]) : - observationSubQuestionsObj['section'] = secCQ["code"] - solutionObj = {} - for solu in solCollec.find({'_id':ObjectId(obSub['solutionId'])}): - solutionObj = solu - - if solutionObj: - observationSubQuestionsObj['solutionName'] = solutionObj['name'] - observationSubQuestionsObj['scoringSystem'] = solutionObj['scoringSystem'] - observationSubQuestionsObj['solutionDescription'] = solutionObj['description'] - observationSubQuestionsObj['questionSequenceByEcm'] = sequenceNumber(quesexternalId,answer,observationSubQuestionsObj['section'],solutionObj) + + if "criteria" in obSub: + for critQue in obSub["criteria"]: + if critQue["_id"] == answer["criteriaId"]: + observationSubQuestionsObj['criteriaExternalId'] = critQue['externalId'] + observationSubQuestionsObj['criteriaName'] = critQue['name'] + observationSubQuestionsObj['criteriaDescription'] = critQue['description'] + + + if 'solutionInfo' in obSub: + qse = obSub['solutionInfo']['questionSequenceByEcm'] + for obs_key, obs_val in qse.items(): + for key, val in obs_val.items(): + for ids in val: + if ids == answer["externalId"]: + observationSubQuestionsObj['section'] = key + + solutionObj = obSub['solutionInfo'] + observationSubQuestionsObj['solutionName'] = obSub['solutionInfo']['name'] + observationSubQuestionsObj['scoringSystem'] = obSub['solutionInfo']['scoringSystem'] + observationSubQuestionsObj['solutionDescription'] = obSub['solutionInfo']['description'] + observationSubQuestionsObj['questionSequenceByEcm'] = sequenceNumber(quesexternalId,answer,observationSubQuestionsObj['section'],solutionObj) try: if solutionObj['scoringSystem'] == 'pointsBasedScoring': @@ -337,18 +341,8 @@ def creatingObj( if 'observationInformation' in obSub : if 'name' in obSub['observationInformation']: observationSubQuestionsObj['observationName'] = obSub['observationInformation']['name'] - else : - try: - for ob in obsCollec.find({'_id':obSub['observationId']},{'name':1}): - observationSubQuestionsObj['observationName'] = ob['name'] - except KeyError : - observationSubQuestionsObj['observationName'] = '' - else : - try: - for ob in obsCollec.find({'_id':obSub['observationId']},{'name':1}): - observationSubQuestionsObj['observationName'] = ob['name'] - except KeyError : - observationSubQuestionsObj['observationName'] = '' + else: + observationSubQuestionsObj['observationName'] = '' observationSubQuestionsObj['questionId'] = str(answer['qid']) observationSubQuestionsObj['questionAnswer'] = ans_val @@ -398,20 +392,24 @@ def creatingObj( observationSubQuestionsObj['instanceParentId'] = ans['qid'] observationSubQuestionsObj['instanceParentResponsetype'] =ans['responseType'] observationSubQuestionsObj['instanceParentCriteriaId'] =ans['criteriaId'] - for critQuesInst in criteriaQuestionsCollec.find({'_id':ObjectId(ans["criteriaId"])}): - observationSubQuestionsObj['instanceParentCriteriaExternalId'] = critQuesInst['externalId'] - observationSubQuestionsObj['instanceParentCriteriaExternalId'] = critQuesInst['name'] - for eviCQInst in critQuesInst["evidences"] : - for secCQInst in eviCQInst["sections"] : - for quesCQInst in secCQInst["questions"] : - if str(quesCQInst["_id"]) == str(ans["qid"]) : - observationSubQuestionsObj['instanceParentSection'] = secCQInst["code"] - observationSubQuestionsObj['instanceId'] = instNumber - observationSubQuestionsObj['instanceParentExternalId'] = quesexternalId - observationSubQuestionsObj['instanceParentEcmSequence']= sequenceNumber( - observationSubQuestionsObj['instanceParentExternalId'], answer, - observationSubQuestionsObj['instanceParentSection'], solutionObj - ) + + for critQuesInst in obSub["criteria"]: + if critQuesInst["_id"] == ans["criteriaId"]: + observationSubQuestionsObj['instanceParentCriteriaExternalId'] = critQuesInst['externalId'] + observationSubQuestionsObj['instanceParentCriteriaExternalId'] = critQuesInst['name'] + + if 'solutionInfo' in obSub: + qse = obSub['solutionInfo']['questionSequenceByEcm'] + for obs_key, obs_val in qse.items(): + for key, val in obs_val.items(): + for ids in val: + if ids == ans["externalId"]: + observationSubQuestionsObj['instanceParentSection'] = key + observationSubQuestionsObj['instanceId'] = instNumber + observationSubQuestionsObj['instanceParentExternalId'] = quesexternalId + observationSubQuestionsObj['instanceParentEcmSequence']= sequenceNumber(observationSubQuestionsObj['instanceParentExternalId'], + answer,observationSubQuestionsObj['instanceParentSection'], solutionObj) + else: observationSubQuestionsObj['instanceParentQuestion'] = '' observationSubQuestionsObj['instanceParentId'] = '' @@ -424,132 +422,145 @@ def creatingObj( ### Assessment Domain Logic - Start ### domainArr = [] - for domain in solutionObj['themes']: - parent = None - builder = None - parent = domain['name'] - builder = implementation() - domObj = {} - domObj['name'] = domain['name'] - domObj['type'] = domain['type'] - domObj['externalId']=str(domain['externalId']) - - try: - if domain['criteria']: - domObj['theme']=builder.buildnode(domain, parent, str(answer['criteriaId'])) - except KeyError: - domObj['theme'] = builder.buildnode(domain, parent, str(answer['criteriaId'])) - - domainArr.append(domObj) - domArr.clear() - - for dom in domainArr: - if dom['theme']: - for obj in dom['theme']: - try: - if obj['type'] == 'criteria': - if (str(obj['externalId']) == str(answer['criteriaId'])): - for criteria in obSub['criteria'] : - if str(criteria["_id"]) == str(answer['criteriaId']) : - obj['name'] = criteria['name'] - obj['score'] = criteria['score'] - try: - obj['score_achieved'] = criteria['scoreAchieved'] - except KeyError : - obj['score_achieved'] = '' - obj['description'] = criteria['description'] - try: - levelArray = [] - levelArray = criteria['rubric']['levels'].values() - for labelValue in levelArray: - if (str((criteria['score'])) == labelValue['level']): - obj['label'] = labelValue['label'] - except Exception: - obj['label'] = '' - - try: - prj_id = [] - title = [] - goal = [] - externalId =[] - for prj in criteria['improvement-projects']: - prj_id.append(str(prj['_id'])) - title.append(prj['title']) - goal.append(prj['goal']) - externalId.append(prj['externalId']) - obj['imp_project_id'] = prj_id - obj['imp_project_title'] = title - obj['imp_project_goal'] = goal - obj['imp_project_externalId'] = externalId - except KeyError: - obj['imp_project_id'] = [] - obj['imp_project_title'] = [] - obj['imp_project_goal'] = [] - obj['imp_project_externalId'] = [] - if type(obj['externalId']) != str: - for cri in criteriaCollec.find({'_id':ObjectId(str(obj['externalId']))}): - obj['externalId'] = cri['externalId'] - obj['name']=cri['name'] - obj['score']=cri['score'] - obj['score_achieved'] = criteria['scoreAchieved'] - obj['description'] = cri['description'] - try: - levelArray = [] - levelArray = cri['rubric']['levels'].values() - for labelValue in levelArray: - if (str((cri['score'])) == labelValue['level']): - obj['label'] = labelValue['label'] - except Exception: - obj['label'] = '' - except KeyError: - pass - - for themes in domainArr: - for st in themes["theme"]: - if (st["type"] == "criteria") and (observationSubQuestionsObj['criteriaId'] == str(st["externalId"])): - observationSubQuestionsObj['domainName'] = themes['name'] - observationSubQuestionsObj['domainExternalId'] = themes['externalId'] - try : - for submTheme in obSub["themes"]: - if submTheme["externalId"] == themes['externalId'] : - observationSubQuestionsObj['domainLevel'] = submTheme["pointsBasedLevel"] - observationSubQuestionsObj['domainScore'] = submTheme["scoreAchieved"] - except KeyError : - observationSubQuestionsObj['domainLevel'] = '' - observationSubQuestionsObj['domainScore'] = '' - for theme in themes['theme']: - observationSubQuestionsObj['childName'] = theme['name'] - observationSubQuestionsObj['ancestorName'] = theme['parent'] - observationSubQuestionsObj['childType'] = theme['type'] - observationSubQuestionsObj['childExternalid'] = theme['externalId'] + if len(obSub['themes']) >= 1: + for domain in obSub['themes']: + parent = None + builder = None + parent = domain['name'] + builder = implementation() + domObj = {} + domObj['name'] = domain['name'] + domObj['type'] = domain['type'] + domObj['externalId']=str(domain['externalId']) + + try: + if domain['criteria']: + domObj['theme']=builder.buildnode(domain, parent, str(answer['criteriaId'])) + except KeyError: + domObj['theme'] = builder.buildnode(domain, parent, str(answer['criteriaId'])) - try: - observationSubQuestionsObj['level'] = theme['score'] - except KeyError: - observationSubQuestionsObj['level'] = '' + domainArr.append(domObj) + domArr.clear() + for dom in domainArr: + if dom['theme']: + for obj in dom['theme']: try: - observationSubQuestionsObj['criteriaScore'] = theme['score_achieved'] + if obj['type'] == 'criteria': + if (str(obj['externalId']) == str(answer['criteriaId'])): + for criteria in obSub['criteria'] : + if str(criteria["_id"]) == str(answer['criteriaId']) : + obj['name'] = criteria['name'] + obj['score'] = criteria['score'] + try: + obj['score_achieved'] = criteria['scoreAchieved'] + except KeyError : + obj['score_achieved'] = '' + obj['description'] = criteria['description'] + try: + levelArray = [] + levelArray = criteria['rubric']['levels'].values() + for labelValue in levelArray: + if (str((criteria['score'])) == labelValue['level']): + obj['label'] = labelValue['label'] + except Exception: + obj['label'] = '' + + try: + prj_id = [] + title = [] + goal = [] + externalId =[] + for prj in criteria['improvement-projects']: + prj_id.append(str(prj['_id'])) + title.append(prj['title']) + goal.append(prj['goal']) + externalId.append(prj['externalId']) + obj['imp_project_id'] = prj_id + obj['imp_project_title'] = title + obj['imp_project_goal'] = goal + obj['imp_project_externalId'] = externalId + except KeyError: + obj['imp_project_id'] = [] + obj['imp_project_title'] = [] + obj['imp_project_goal'] = [] + obj['imp_project_externalId'] = [] + if type(obj['externalId']) != str: + for critQueDom in obSub["criteria"]: + if critQueDom["_id"] == answer["criteriaId"]: + obj['externalId'] = critQueDom['externalId'] + obj['name']=critQueDom['name'] + obj['score']=critQueDom['score'] + obj['score_achieved'] = critQueDom['scoreAchieved'] + obj['description'] = critQueDom['description'] + try: + levelArray = [] + levelArray = critQueDom['rubric']['levels'].values() + for labelValue in levelArray: + if (str((critQueDom['score'])) == labelValue['level']): + obj['label'] = labelValue['label'] + except Exception: + obj['label'] = '' except KeyError: - observationSubQuestionsObj['criteriaScore'] = '' + pass + + for themes in domainArr: + for st in themes["theme"]: + if (st["type"] == "criteria") and (observationSubQuestionsObj['criteriaId'] == str(st["externalId"])): + observationSubQuestionsObj['domainName'] = themes['name'] + observationSubQuestionsObj['domainExternalId'] = themes['externalId'] + try : + for submTheme in obSub["themes"]: + if submTheme["externalId"] == themes['externalId'] : + observationSubQuestionsObj['domainLevel'] = submTheme["pointsBasedLevel"] + observationSubQuestionsObj['domainScore'] = submTheme["scoreAchieved"] + except KeyError : + observationSubQuestionsObj['domainLevel'] = '' + observationSubQuestionsObj['domainScore'] = '' + for theme in themes['theme']: + observationSubQuestionsObj['childName'] = theme['name'] + observationSubQuestionsObj['ancestorName'] = theme['parent'] + observationSubQuestionsObj['childType'] = theme['type'] + observationSubQuestionsObj['childExternalid'] = theme['externalId'] + + try: + observationSubQuestionsObj['level'] = theme['score'] + except KeyError: + observationSubQuestionsObj['level'] = '' + + try: + observationSubQuestionsObj['criteriaScore'] = theme['score_achieved'] + except KeyError: + observationSubQuestionsObj['criteriaScore'] = '' + + try: + observationSubQuestionsObj['label'] = theme['label'] + except KeyError: + observationSubQuestionsObj['label'] = '' + + try: + if (len(theme['imp_project_id']) >=0): + for i in range(len(theme['imp_project_id'])): + observationSubQuestionsObj['imp_project_id'] = theme['imp_project_id'][i] + observationSubQuestionsObj['imp_project_title'] = theme['imp_project_title'][i] + observationSubQuestionsObj['imp_project_goal'] = theme['imp_project_goal'][i] + observationSubQuestionsObj['imp_project_externalId'] = theme['imp_project_externalId'][i] + except KeyError: + observationSubQuestionsObj['imp_project_id'] = "" + observationSubQuestionsObj['imp_project_title'] = "" + observationSubQuestionsObj['imp_project_goal'] = "" + observationSubQuestionsObj['imp_project_externalId'] = "" - try: - observationSubQuestionsObj['label'] = theme['label'] - except KeyError: - observationSubQuestionsObj['label'] = '' - - try: - if (len(theme['imp_project_id']) >=0): - for i in range(len(theme['imp_project_id'])): - observationSubQuestionsObj['imp_project_id'] = theme['imp_project_id'][i] - observationSubQuestionsObj['imp_project_title'] = theme['imp_project_title'][i] - observationSubQuestionsObj['imp_project_goal'] = theme['imp_project_goal'][i] - observationSubQuestionsObj['imp_project_externalId'] = theme['imp_project_externalId'][i] - except KeyError: - observationSubQuestionsObj['imp_project_id'] = "" - observationSubQuestionsObj['imp_project_title'] = "" - observationSubQuestionsObj['imp_project_goal'] = "" - observationSubQuestionsObj['imp_project_externalId'] = "" + else: + observationSubQuestionsObj['domainName'] = '' + observationSubQuestionsObj['domainExternalId'] = '' + observationSubQuestionsObj['childName'] = '' + observationSubQuestionsObj['ancestorName'] = '' + observationSubQuestionsObj['childType'] = '' + observationSubQuestionsObj['childExternalid'] = '' + observationSubQuestionsObj['level'] = '' + observationSubQuestionsObj['criteriaScore'] = '' + observationSubQuestionsObj['label'] = '' if usrRolFn : observationSubQuestionsObj = {**usrRolFn, **observationSubQuestionsObj} diff --git a/projects/pyspark_project_batch.py b/projects/pyspark_project_batch.py index d32e63a..4714825 100644 --- a/projects/pyspark_project_batch.py +++ b/projects/pyspark_project_batch.py @@ -641,15 +641,15 @@ def orgName(val): entities_df_res.unpersist() projects_df_cols.unpersist() final_projects_df = projects_df_final.dropDuplicates() -try: - null_projects = final_projects_df["project_id", "state_name"].where(col("state_name").isNull()) - null_projects = null_projects.select(null_projects["project_id"]).rdd.flatMap(lambda x: x).collect() - errorLogger.error(f"For ProgramID: {program_unique_id}- \nProject IDs not uploaded: {list(set(null_projects))} \nThe above project_ids state_name/userProfile is null") - final_projects_df = final_projects_df.na.drop(subset=["state_name"]) -except ut.AnalysisException as e: - error_trim = (str(e).split('?'))[0] - errorLogger.error(f"For Program ID: {program_unique_id}: {error_trim}") - sys.exit() + +necessary_columns = ["state_name","state_externalId","district_name","district_externalId","block_name", + "block_externalId","organisation_name","organisation_id"] +final_df_columns = final_projects_df.columns +for miss_cols in necessary_columns: + if miss_cols not in final_df_columns: + bot.api_call("chat.postMessage",channel=config.get("SLACK","channel"),text=f"MISSED: {miss_cols}") + final_projects_df = final_projects_df.withColumn(miss_cols, lit(None).cast(StringType())) + bot.api_call("chat.postMessage",channel=config.get("SLACK","channel"),text=f"UPDATED: {final_projects_df.columns}") projects_df_final.unpersist() successLogger.debug(