Skip to content

Commit

Permalink
fix: checkpoint a few things
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-milan committed Nov 1, 2024
1 parent 975b0bb commit 0cfd8af
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
9 changes: 6 additions & 3 deletions pipelines/mapa_realizacoes/infopref/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
load_firestore_credential_to_file,
log_task,
merge_lists,
merge_lists_no_checkpoint,
split_by_gestao,
transform_csv_to_pin_only_realizacoes,
transform_infopref_realizacao_to_firebase,
Expand Down Expand Up @@ -107,7 +108,7 @@
gestao="3",
)

realizacoes_alarme_alertario = merge_lists(
realizacoes_alarme_alertario = merge_lists_no_checkpoint(
list_a=realizacoes_alarme_sonoro, list_b=realizacoes_alertario
)

Expand All @@ -120,7 +121,7 @@
gestao="3",
)

realizacoes_pin_only = merge_lists(
realizacoes_pin_only = merge_lists_no_checkpoint(
list_a=realizacoes_alarme_alertario, list_b=realizacoes_cameras
)

Expand Down Expand Up @@ -186,7 +187,9 @@
],
)

all_realizacoes = merge_lists(list_a=realizacoes_filtered, list_b=realizacoes_pin_only)
all_realizacoes = merge_lists_no_checkpoint(
list_a=realizacoes_filtered, list_b=realizacoes_pin_only
)

aggregated_data = compute_aggregate_data(realizacoes=realizacoes_nova_gestao)
aggregated_data_plano_verao = compute_aggregate_data(
Expand Down
27 changes: 16 additions & 11 deletions pipelines/mapa_realizacoes/infopref/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pipelines.utils import authenticated_task as task


@task(nout=5, checkpoint=False)
@task(nout=5)
def cleanup_unused(
cidades: List[dict],
orgaos: List[dict],
Expand Down Expand Up @@ -216,12 +216,12 @@ def get_infopref_url(url_secret: str = "INFOPREF_URL") -> str:
return get_secret(url_secret)[url_secret]


@task(checkpoint=False)
@task
def get_infopref_bairro() -> list[dict]:
return [] # TODO: Implement (future)


@task(checkpoint=False)
@task
def get_infopref_cidade() -> list[dict]:
return [
{
Expand All @@ -233,7 +233,7 @@ def get_infopref_cidade() -> list[dict]:
]


@task(checkpoint=False)
@task
def get_infopref_orgao(url_orgao: str, headers: dict) -> list[dict]:
raw_data = fetch_data(url_orgao, headers)
data = []
Expand All @@ -256,7 +256,7 @@ def get_infopref_orgao(url_orgao: str, headers: dict) -> list[dict]:
return data


@task(checkpoint=False)
@task
def get_infopref_programa(url_programa: str, headers: dict) -> list[dict]:
raw_data = fetch_data(url_programa, headers)
data = []
Expand All @@ -280,13 +280,13 @@ def get_infopref_programa(url_programa: str, headers: dict) -> list[dict]:
return data


@task(checkpoint=False)
@task
def get_infopref_realizacao_raw(url_realizacao: str, headers: dict) -> list[dict]:
realizacoes = fetch_data(url_realizacao, headers)
return realizacoes


@task(checkpoint=False)
@task
def get_infopref_status() -> list[dict]:
return [
{
Expand Down Expand Up @@ -322,12 +322,12 @@ def get_infopref_status() -> list[dict]:
]


@task(checkpoint=False)
@task
def get_infopref_subprefeitura() -> list[dict]:
return [] # TODO: Implement (future)


@task(checkpoint=False)
@task
def get_infopref_tema(url_tema: str, headers: dict) -> list[dict]:
raw_data = fetch_data(url_tema, headers)
data = []
Expand All @@ -349,7 +349,7 @@ def get_infopref_tema(url_tema: str, headers: dict) -> list[dict]:
return data


@task(checkpoint=False)
@task
def get_infopref_tipo() -> list[dict]:
return [] # TODO: Implement (future)

Expand All @@ -368,11 +368,16 @@ def load_firestore_credential_to_file(secret_name: str = "FIRESTORE_CREDENTIALS"
f.write(credentials)


@task(checkpoint=False)
@task
def merge_lists(list_a: list, list_b: list) -> list:
return list_a + list_b


@task(checkpoint=False)
def merge_lists_no_checkpoint(list_a: list, list_b: list) -> list:
return list_a + list_b


@task(checkpoint=False)
def get_firestore_client() -> FirestoreClient:
"""
Expand Down

0 comments on commit 0cfd8af

Please sign in to comment.