Skip to content

Commit

Permalink
feat: add aggregated data for plano verao
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-milan committed Oct 31, 2024
1 parent 924bee1 commit 975b0bb
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pipelines/mapa_realizacoes/infopref/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,19 @@
all_realizacoes = merge_lists(list_a=realizacoes_filtered, list_b=realizacoes_pin_only)

aggregated_data = compute_aggregate_data(realizacoes=realizacoes_nova_gestao)
aggregated_data_plano_verao = compute_aggregate_data(
realizacoes=realizacoes_nova_gestao, version="PLANO_VERAO"
)

upload_aggregated_data_task = upload_aggregated_data_to_firestore(
data=aggregated_data, db=db, collection="aggregated_data", clear=clear
)
upload_aggregated_data_planoverao_task = upload_aggregated_data_to_firestore(
data=aggregated_data_plano_verao,
db=db,
collection="aggregated_data__planoverao",
clear=clear,
)
upload_cidades_task = upload_infopref_data_to_firestore(
data=clean_cidades, db=db, collection="cidade", clear=clear
)
Expand Down Expand Up @@ -226,6 +235,7 @@

log_task_ref = log_task(msg="This is the end.")
log_task_ref.set_upstream(upload_aggregated_data_task)
log_task_ref.set_upstream(upload_aggregated_data_planoverao_task)
log_task_ref.set_upstream(upload_cidades_task)
log_task_ref.set_upstream(upload_orgaos_task)
log_task_ref.set_upstream(upload_programas_task)
Expand Down
11 changes: 10 additions & 1 deletion pipelines/mapa_realizacoes/infopref/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,19 @@ def cleanup_unused(


@task
def compute_aggregate_data(realizacoes: list[dict]):
def compute_aggregate_data(realizacoes: list[dict], version: str = None):
"""
Pre-computes aggregated data for Firebase.
"""
# If we have a version, filter out the realizacoes that do not match the version
if version:
version = version.upper()
if version == "PLANO_VERAO":
realizacoes = [
realizacao for realizacao in realizacoes if realizacao["data"]["plano_verao"]
]
else:
raise ValueError(f"Invalid version for aggregating data: {version}.")
aggregated_data = collections.defaultdict(lambda: {"count": 0, "investment": 0})
for realizacao in realizacoes:
data = realizacao["data"]
Expand Down

0 comments on commit 975b0bb

Please sign in to comment.