Skip to content

Commit

Permalink
New data model (#166)
Browse files Browse the repository at this point in the history
* fix ruff

* update fetch data

* prevent empty

* fix display

* drop media_url

* device name

* drop useless

* drop useless

* fix vision cone

* put back sites

* fix docstring

* update api ref

* put back past n days

* use docker compose v2

* faster iteration

* fix docstring
  • Loading branch information
MateoLostanlen authored Sep 5, 2024
1 parent f115345 commit df9d2ac
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 412 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ jobs:
API_PWD: ${{ secrets.API_PWD }}
run: |
docker network create web
docker-compose up -d --build
docker compose up -d --build
- name: Check docker sanity
run: sleep 200 && docker-compose logs && curl http://platform.localhost:8050/
run: sleep 200 && docker compose logs && curl http://platform.localhost:8050/
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ quality:
# this target runs checks on all files and potentially modifies some of them
style:
black .
ruff --fix .
ruff check --fix .

# Build the docker
build:
Expand Down
245 changes: 41 additions & 204 deletions app/callbacks/data_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
past_ndays_api_events,
process_bbox,
read_stored_DataFrame,
retrieve_site_from_device_id,
)

logger = logging_config.configure_logging(cfg.DEBUG, cfg.SENTRY_DSN)
Expand All @@ -41,6 +40,22 @@
],
)
def login_callback(n_clicks, username, password, user_headers):
"""
Callback to handle user login.
Parameters:
n_clicks (int): Number of times the login button has been clicked.
username (str or None): The value entered in the username input field.
password (str or None): The value entered in the password input field.
user_headers (dict or None): Existing user headers, if any, containing authentication details.
This function is triggered when the login button is clicked. It verifies the provided username and password,
attempts to authenticate the user via the API, and updates the user credentials and headers.
If authentication fails or credentials are missing, it provides appropriate feedback.
Returns:
dash.dependencies.Output: Updated user credentials and headers, and form feedback.
"""
if user_headers is not None:
return dash.no_update, dash.no_update, dash.no_update

Expand Down Expand Up @@ -77,239 +92,61 @@ def login_callback(n_clicks, username, password, user_headers):

@app.callback(
[
Output("store_api_events_data", "data"),
Output("store_api_alerts_data", "data"),
Output("trigger_no_events", "data"),
],
[Input("main_api_fetch_interval", "n_intervals")],
[Input("main_api_fetch_interval", "n_intervals"), Input("user_credentials", "data")],
[
State("store_api_events_data", "data"),
State("store_api_alerts_data", "data"),
State("user_headers", "data"),
State("user_credentials", "data"),
],
prevent_initial_call=True,
)
def api_watcher(n_intervals, local_events, local_alerts, user_headers, user_credentials):
def api_watcher(n_intervals, user_credentials, local_alerts, user_headers):
"""
Fetches and processes live event and alert data from the API at regular intervals.
This callback periodically checks for new event and alert data from the API.
It processes the new data, updates local storage with the latest information,
and prepares it for displaying in the application.
Callback to periodically fetch alerts data from the API.
Parameters:
- n_intervals (int): Number of intervals passed since the start of the app,
used to trigger the periodic update.
- local_events (json): Currently stored events data in JSON format.
- local_alerts (json): Currently stored alerts data in JSON format.
- user_headers (dict): User authorization headers for API requests.
- user_credentials (tuple): User credentials (username, password).
n_intervals (int): Number of times the interval has been triggered.
user_credentials (dict or None): Current user credentials for API authentication.
local_alerts (dict or None): Locally stored alerts data, serialized as JSON.
user_headers (dict or None): Current user headers containing authentication details.
This function is triggered at specified intervals and when user credentials are updated.
It retrieves unacknowledged events from the API, processes the data, and stores it locally.
If the local data matches the API data, no updates are made.
Returns:
- json: Updated events data in JSON format.
- json: Updated alerts data in JSON format.
dash.dependencies.Output: Serialized JSON data of alerts and a flag indicating if data is loaded.
"""
if user_headers is None:
raise PreventUpdate
user_token = user_headers["Authorization"].split(" ")[1]
api_client.token = user_token

# Read local data
local_events, event_data_loaded = read_stored_DataFrame(local_events)
local_alerts, alerts_data_loaded = read_stored_DataFrame(local_alerts)
logger.info("Start Fetching the events")

# Fetch events
api_events = pd.DataFrame(call_api(api_client.get_unacknowledged_events, user_credentials)())
api_events["created_at"] = convert_time(api_events)
if len(api_events) == 0:
api_alerts = pd.DataFrame(call_api(api_client.get_unacknowledged_events, user_credentials)())
api_alerts["created_at"] = convert_time(api_alerts)
api_alerts = past_ndays_api_events(api_alerts, n_days=0)

if len(api_alerts) == 0:
return [
json.dumps(
{
"data": pd.DataFrame().to_json(orient="split"),
"data_loaded": True,
}
),
json.dumps(
{
"data": pd.DataFrame().to_json(orient="split"),
"data_loaded": True,
}
),
dash.no_update,
]
else:
api_events = past_ndays_api_events(api_events, n_days=0) # keep only events from today
if api_events.empty:
return dash.no_update, dash.no_update, True
api_events = api_events[::-1] # Display the last alert first

# Drop acknowledged
if not local_events.empty:
local_events = local_events[local_events["id"].isin(api_events["id"])]

if event_data_loaded and api_events.empty and local_events.empty:
new_api_events = api_events[~api_events["id"].isin(local_events["id"])].copy()

else:
new_api_events = api_events.copy()

if alerts_data_loaded and not local_alerts.empty:
# drop old alerts

local_alerts = local_alerts[local_alerts["event_id"].isin(api_events["id"])]

# Find ongoing alerts for the events started within 30 minutes;
# after that, any new alert is part of a new event
local_alerts["created_at"] = pd.to_datetime(local_alerts["created_at"])

# Define the end_event timestamp as timezone-naive
end_event = pd.Timestamp.utcnow().replace(tzinfo=None) - pd.Timedelta(minutes=30)

# Get ongoing alerts
ongoing_local_alerts = local_alerts[local_alerts["created_at"] > end_event].copy()
get_alerts = call_api(api_client.get_alerts_for_event, user_credentials)
ongoing_alerts = pd.DataFrame()

# Iterate over each unique event_id
for event_id in ongoing_local_alerts["event_id"].drop_duplicates():
# Get the alerts for the current event_id and convert to DataFrame
alerts_df = pd.DataFrame(get_alerts(event_id))

# Concatenate the current alerts DataFrame to the ongoing_alerts DataFrame
ongoing_alerts = pd.concat([ongoing_alerts, alerts_df], ignore_index=True)

if not ongoing_alerts.empty:
ongoing_alerts = (
ongoing_alerts.groupby(["event_id"]).head(cfg.MAX_ALERTS_PER_EVENT).reset_index(drop=True)
)
ongoing_alerts = ongoing_alerts[~ongoing_alerts["id"].isin(local_alerts["id"])].copy()
ongoing_alerts["processed_loc"] = ongoing_alerts["localization"].apply(process_bbox)

# Get new alerts
new_alerts = pd.DataFrame()

# Iterate over each unique event_id
for event_id in new_api_events["id"].drop_duplicates():
# Get the alerts for the current event_id and convert to DataFrame
alerts_df = pd.DataFrame(get_alerts(event_id))

# Concatenate the current alerts DataFrame to the new_alerts DataFrame
new_alerts = pd.concat([new_alerts, alerts_df], ignore_index=True)

if not new_alerts.empty:
new_alerts = new_alerts.groupby(["event_id"]).head(cfg.MAX_ALERTS_PER_EVENT).reset_index(drop=True)
new_alerts["processed_loc"] = new_alerts["localization"].apply(process_bbox)
new_alerts = pd.concat([new_alerts, ongoing_alerts], join="outer")
local_alerts = pd.concat([local_alerts, new_alerts], join="outer")
local_alerts = local_alerts.drop_duplicates(subset=["id"])

else:
get_alerts = call_api(api_client.get_alerts_for_event, user_credentials)
_ = api_events["id"].apply(lambda x: pd.DataFrame(get_alerts(x))) # type: ignore[arg-type, return-value]
local_alerts = (
pd.concat(_.values).groupby(["event_id"]).head(cfg.MAX_ALERTS_PER_EVENT).reset_index(drop=True)
)
local_alerts["created_at"] = pd.to_datetime(local_alerts["created_at"])
local_alerts["processed_loc"] = local_alerts["localization"].apply(process_bbox)

if len(new_api_events):
alerts_data = new_api_events.merge(local_alerts, left_on="id", right_on="event_id").drop_duplicates(
subset=["id_x"]
)[["azimuth", "device_id"]]

new_api_events["device_name"] = [
f"{retrieve_site_from_device_id(api_client, user_credentials, device_id)} - {int(azimuth)}°".title()
for _, (azimuth, device_id) in alerts_data.iterrows()
]

if event_data_loaded:
local_events = pd.concat([local_events, new_api_events], join="outer")
local_events = local_events.drop_duplicates()

else:
local_events = new_api_events

return [
json.dumps({"data": local_events.to_json(orient="split"), "data_loaded": True}),
json.dumps({"data": local_alerts.to_json(orient="split"), "data_loaded": True}),
dash.no_update,
]

else:
api_alerts["processed_loc"] = api_alerts["localization"].apply(process_bbox)
if alerts_data_loaded and not local_alerts.empty:
aligned_api_alerts, aligned_local_alerts = api_alerts["alert_id"].align(local_alerts["alert_id"])
if all(aligned_api_alerts == aligned_local_alerts):
return [dash.no_update]

@app.callback(
Output("media_url", "data"),
Input("store_api_alerts_data", "data"),
[
State("media_url", "data"),
State("user_headers", "data"),
State("user_credentials", "data"),
],
prevent_initial_call=True,
)
def get_media_url(
local_alerts,
media_url,
user_headers,
user_credentials,
):
"""
Retrieves media URLs for alerts and manages the fetching process from the API.
This callback is designed to efficiently load media URLs during app initialization
and subsequently update them. Initially, it focuses on loading URLs event by event
to quickly provide data for visualization. Once URLs for all events are loaded, the
callback then methodically checks for and retrieves any missing URLs.
The callback is triggered by two inputs: a change in the data to load and a regular
interval check. It includes a cleanup step to remove event IDs no longer present in
local alerts.
Parameters:
- interval (int): Current interval for fetching URLs.
- local_alerts (json): Currently stored alerts data in JSON format.
- media_url (dict): Dictionary holding media URLs for alerts.
- user_headers (dict): User authorization headers for API requests.
- user_credentials (tuple): User credentials (username, password).
Returns:
- dict: Updated dictionary with media URLs for each alert.
"""
if user_headers is None:
raise PreventUpdate
user_token = user_headers["Authorization"].split(" ")[1]
api_client.token = user_token

local_alerts, alerts_data_loaded = read_stored_DataFrame(local_alerts)

if not alerts_data_loaded:
raise PreventUpdate

if local_alerts.empty:
return {}

current_event_ids = set(local_alerts["event_id"].astype(str))

# Cleanup: Remove any event_ids from media_url not present in local_alerts
media_url_keys = set(media_url.keys())
for event_id in media_url_keys - current_event_ids:
del media_url[event_id]

# Loop through each row in local_alerts
for _, row in local_alerts.iterrows():
event_id = str(row["event_id"])
media_id = str(row["media_id"])
if event_id not in media_url:
media_url[event_id] = {}

# Check if the URL for this event_id and media_id already exists
if media_id not in media_url[event_id]:
# Fetch the URL for this media_id
try:
media_url[event_id][media_id] = call_api(api_client.get_media_url, user_credentials)(media_id)["url"]
except Exception: # General catch-all for other exceptions
media_url[event_id][media_id] = "" # Handle potential exceptions

return media_url
return [json.dumps({"data": api_alerts.to_json(orient="split"), "data_loaded": True})]
Loading

0 comments on commit df9d2ac

Please sign in to comment.