From 975b0bb543d5cbe8902b54deca6f1d4ac3667834 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Thu, 31 Oct 2024 17:48:19 -0300 Subject: [PATCH] feat: add aggregated data for plano verao --- pipelines/mapa_realizacoes/infopref/flows.py | 10 ++++++++++ pipelines/mapa_realizacoes/infopref/tasks.py | 11 ++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pipelines/mapa_realizacoes/infopref/flows.py b/pipelines/mapa_realizacoes/infopref/flows.py index 3d450a9..b384f39 100644 --- a/pipelines/mapa_realizacoes/infopref/flows.py +++ b/pipelines/mapa_realizacoes/infopref/flows.py @@ -189,10 +189,19 @@ all_realizacoes = merge_lists(list_a=realizacoes_filtered, list_b=realizacoes_pin_only) aggregated_data = compute_aggregate_data(realizacoes=realizacoes_nova_gestao) + aggregated_data_plano_verao = compute_aggregate_data( + realizacoes=realizacoes_nova_gestao, version="PLANO_VERAO" + ) upload_aggregated_data_task = upload_aggregated_data_to_firestore( data=aggregated_data, db=db, collection="aggregated_data", clear=clear ) + upload_aggregated_data_planoverao_task = upload_aggregated_data_to_firestore( + data=aggregated_data_plano_verao, + db=db, + collection="aggregated_data__planoverao", + clear=clear, + ) upload_cidades_task = upload_infopref_data_to_firestore( data=clean_cidades, db=db, collection="cidade", clear=clear ) @@ -226,6 +235,7 @@ log_task_ref = log_task(msg="This is the end.") log_task_ref.set_upstream(upload_aggregated_data_task) + log_task_ref.set_upstream(upload_aggregated_data_planoverao_task) log_task_ref.set_upstream(upload_cidades_task) log_task_ref.set_upstream(upload_orgaos_task) log_task_ref.set_upstream(upload_programas_task) diff --git a/pipelines/mapa_realizacoes/infopref/tasks.py b/pipelines/mapa_realizacoes/infopref/tasks.py index b363e9c..9b8d42f 100644 --- a/pipelines/mapa_realizacoes/infopref/tasks.py +++ b/pipelines/mapa_realizacoes/infopref/tasks.py @@ -138,10 +138,19 @@ def cleanup_unused( @task -def compute_aggregate_data(realizacoes: list[dict]): +def compute_aggregate_data(realizacoes: list[dict], version: str = None): """ Pre-computes aggregated data for Firebase. """ + # If we have a version, filter out the realizacoes that do not match the version + if version: + version = version.upper() + if version == "PLANO_VERAO": + realizacoes = [ + realizacao for realizacao in realizacoes if realizacao["data"]["plano_verao"] + ] + else: + raise ValueError(f"Invalid version for aggregating data: {version}.") aggregated_data = collections.defaultdict(lambda: {"count": 0, "investment": 0}) for realizacao in realizacoes: data = realizacao["data"]