Skip to content

Commit

Permalink
Merge branch 'main' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored May 6, 2024
2 parents b2c4cdb + 2433238 commit 1e6c756
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 2 deletions.
1 change: 1 addition & 0 deletions pipelines/lgpd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# -*- coding: utf-8 -*-
from pipelines.lgpd.auditlog.flows import * # noqa
from pipelines.lgpd.tables_bindings.flows import * # noqa
Empty file.
67 changes: 67 additions & 0 deletions pipelines/lgpd/auditlog/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.utilities.edges import unmapped
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.lgpd.auditlog.schedules import update_audit_logs_schedule
from pipelines.lgpd.auditlog.tasks import (
get_auditlog_dataframe,
get_last_execution_datetime,
get_now,
get_redis_url,
set_last_execution_datetime,
)
from pipelines.lgpd.tables_bindings.tasks import (
list_projects,
merge_dataframes,
upload_dataframe_to_bigquery,
)

with Flow(
name="LGPD - Histórico de concessão de permissões no IAM",
state_handlers=[handler_inject_bd_credentials],
skip_if_running=True,
parallelism=10,
) as rj_escritorio__lgpd__auditlog__flow:
# Parameters
credentials_secret_name = Parameter("credentials_secret_name")
dataset_id = Parameter("dataset_id")
dump_mode = Parameter("dump_mode", default="append")
last_execution_redis_key = Parameter("last_execution_redis_key")
redis_url_secret_name = Parameter("redis_url_secret_name")
table_id = Parameter("table_id")

# Flow
now = get_now()
redis_url = get_redis_url(secret_name=redis_url_secret_name)
last_execution = get_last_execution_datetime(redis_url=redis_url, key=last_execution_redis_key)
project_ids = list_projects(credentials_secret_name=credentials_secret_name)
audit_log_dataframes = get_auditlog_dataframe.map(
project_id=project_ids,
credentials_secret_name=unmapped(credentials_secret_name),
start=unmapped(last_execution),
end=unmapped(now),
)
merged_dataframe = merge_dataframes(dfs=audit_log_dataframes)
upload_task = upload_dataframe_to_bigquery(
dataframe=merged_dataframe,
dataset_id=dataset_id,
table_id=table_id,
dump_mode=dump_mode,
)
set_last_execution_task = set_last_execution_datetime(
redis_url=redis_url, key=last_execution_redis_key, value=now
)
set_last_execution_task.set_upstream(upload_task)


rj_escritorio__lgpd__auditlog__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
rj_escritorio__lgpd__auditlog__flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value],
)
rj_escritorio__lgpd__auditlog__flow.schedule = update_audit_logs_schedule
28 changes: 28 additions & 0 deletions pipelines/lgpd/auditlog/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock

from pipelines.constants import constants

update_audit_logs_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2023, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_ESCRITORIO_AGENT_LABEL.value,
],
parameter_defaults={
"credentials_secret_name": "LGPD_SERVICE_ACCOUNT_B64",
"dataset_id": "datalake_gestao",
"dump_mode": "append",
"last_execution_redis_key": "lgpd_iam_audit_log_last_execution",
"redis_url_secret_name": "LGPD_REDIS_URL",
"table_id": "iam_audit_log",
},
),
]
)
57 changes: 57 additions & 0 deletions pipelines/lgpd/auditlog/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta

import pandas as pd
from prefect import task
from prefeitura_rio.pipelines_utils.infisical import get_secret
from prefeitura_rio.pipelines_utils.logging import log
from redis import Redis

from pipelines.lgpd.auditlog.utils import extract_iam_audit_logs, parse_iam_audit_logs


@task(
checkpoint=False,
max_retries=10,
retry_delay=timedelta(minutes=5),
)
def get_auditlog_dataframe(
project_id: str,
credentials_secret_name: str,
start: datetime = None,
end: datetime = None,
) -> pd.DataFrame:
start = start or datetime(2021, 1, 1)
end = end or datetime.now()
log(f"Getting audit logs from {start} to {end}")
entries = extract_iam_audit_logs(
project_id=project_id, secret_name=credentials_secret_name, start=start, end=end
)
return parse_iam_audit_logs(entries=entries)


@task(checkpoint=False)
def get_last_execution_datetime(redis_url: str, key: str) -> datetime:
redis = Redis.from_url(redis_url)
last_execution = redis.get(key)
if last_execution:
ret_value = datetime.fromisoformat(last_execution.decode())
log(f"Last execution: {ret_value}")
return ret_value
return None


@task(checkpoint=False)
def get_now() -> datetime:
return datetime.now()


@task
def get_redis_url(secret_name: str) -> str:
return get_secret(secret_name)[secret_name]


@task
def set_last_execution_datetime(redis_url: str, key: str, value: datetime):
redis = Redis.from_url(redis_url)
redis.set(key, value.isoformat())
80 changes: 80 additions & 0 deletions pipelines/lgpd/auditlog/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
from datetime import datetime

import pandas as pd
from google.cloud import logging_v2
from prefeitura_rio.pipelines_utils.logging import log

from pipelines.lgpd.tables_bindings.utils import get_gcp_credentials


def extract_iam_audit_logs(
*, project_id: str, secret_name: str, start: datetime, end: datetime
) -> list:
client = get_logging_client(secret_name=secret_name)
parent = f"projects/{project_id}"
filter_ = (
'log_id("cloudaudit.googleapis.com/activity") AND protoPayload.methodName:"SetIamPolicy"'
)
filter_ += f" AND timestamp>=\"{start.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}\""
filter_ += f" AND timestamp<=\"{end.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}\""
log(f"Project: {project_id}")
log(f"Filter: {filter_}")

response = client.list_entries(resource_names=[parent], filter_=filter_)

entries = []
for entry in response:
entries.append(entry)

log(f"Found {len(entries)} entries")
return entries


def get_logging_client(secret_name: str) -> logging_v2.Client:
"""
Get a logging client.
Returns:
logging_v2.Client: A logging client.
"""
credentials = get_gcp_credentials(secret_name=secret_name)
return logging_v2.Client(credentials=credentials)


def parse_iam_audit_logs(entries: list) -> pd.DataFrame:
"""
Parse IAM audit logs.
Args:
entries (list): A list of log entries.
Returns:
List[dict]: A list of parsed log entries.
"""
parsed_entries = []
for entry in entries:
payload = entry.payload
principal = payload.get("authenticationInfo", {}).get("principalEmail")
resource = payload.get("resourceName")
timestamp = entry.timestamp
response = payload.get("response")
if response is None:
# TODO: Why is this happening?
log("Response is None", level="warning")
continue
for binding in response.get("bindings", []):
role = binding.get("role")
members = binding.get("members", [])
for member in members:
parsed_entries.append(
{
"data_particao": timestamp.strftime("%Y-%m-%d"),
"timestamp": timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"principal": principal,
"resource": resource,
"role": role,
"member": member,
}
)
return pd.DataFrame(parsed_entries)
69 changes: 67 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pillow = "^10.1.0"
shapely = "^2.0.2"
h3 = "^3.7.6"
google-cloud-asset = "^3.24.1"
google-cloud-logging = "^3.10.0"

[tool.poetry.group.dev]
optional = true
Expand Down

0 comments on commit 1e6c756

Please sign in to comment.