Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(framework) Add enter and leave event for Simulation and ServerApp #4716

Merged
merged 20 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/py/flwr/client/supernode/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ def run_client_app() -> None:
event(EventType.RUN_CLIENT_APP_ENTER)
log(
ERROR,
"The command `flower-client-app` has been replaced by `flower-supernode`.",
"The command `flower-client-app` has been replaced by `flwr run`.",
)
log(INFO, "Execute `flower-supernode --help` to learn how to use it.")
register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE)


Expand Down
16 changes: 13 additions & 3 deletions src/py/flwr/common/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ def _generate_next_value_(name: str, start: int, count: int, last_values: list[A

# Not yet implemented

# --- `flwr-*` commands ------------------------------------------------------------

# CLI: flwr-simulation
FLWR_SIMULATION_RUN_ENTER = auto()
FLWR_SIMULATION_RUN_LEAVE = auto()

# CLI: flwr-serverapp
FLWR_SERVERAPP_RUN_ENTER = auto()
FLWR_SERVERAPP_RUN_LEAVE = auto()

# --- Simulation Engine ------------------------------------------------------------

# CLI: flower-simulation
Expand All @@ -171,12 +181,12 @@ def _generate_next_value_(name: str, start: int, count: int, last_values: list[A
RUN_SUPERNODE_ENTER = auto()
RUN_SUPERNODE_LEAVE = auto()

# CLI: `flower-server-app`
# --- DEPRECATED -------------------------------------------------------------------

# [DEPRECATED] CLI: `flower-server-app`
RUN_SERVER_APP_ENTER = auto()
RUN_SERVER_APP_LEAVE = auto()

# --- DEPRECATED -------------------------------------------------------------------

# [DEPRECATED] CLI: `flower-client-app`
RUN_CLIENT_APP_ENTER = auto()
RUN_CLIENT_APP_LEAVE = auto()
Expand Down
17 changes: 8 additions & 9 deletions src/py/flwr/server/run_serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
"""Run ServerApp."""


import sys
from logging import DEBUG, ERROR
from typing import Optional

from flwr.common import Context
from flwr.common.logger import log, warn_unsupported_feature
from flwr.common import Context, EventType, event
from flwr.common.exit_handlers import register_exit_handlers
from flwr.common.logger import log
from flwr.common.object_ref import load_app

from .driver import Driver
Expand Down Expand Up @@ -66,12 +66,11 @@ def _load() -> ServerApp:
return context


# pylint: disable-next=too-many-branches,too-many-statements,too-many-locals
def run_server_app() -> None:
"""Run Flower server app."""
warn_unsupported_feature(
"The command `flower-server-app` is deprecated and no longer in use. "
"Use the `flwr-serverapp` exclusively instead."
event(EventType.RUN_SERVER_APP_ENTER)
log(
ERROR,
"The command `flower-server-app` has been replaced by `flwr run`.",
)
log(ERROR, "`flower-server-app` used.")
sys.exit()
register_exit_handlers(event_type=EventType.RUN_SERVER_APP_LEAVE)
18 changes: 16 additions & 2 deletions src/py/flwr/server/serverapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.cli.utils import get_sha256_hash
from flwr.common.args import add_args_flwr_app_common
from flwr.common.config import (
get_flwr_dir,
Expand All @@ -51,6 +52,7 @@
run_from_proto,
run_status_to_proto,
)
from flwr.common.telemetry import EventType, event
from flwr.common.typing import RunNotRunningException, RunStatus
from flwr.proto.run_pb2 import UpdateRunStatusRequest # pylint: disable=E0611
from flwr.proto.serverappio_pb2 import ( # pylint: disable=E0611
Expand Down Expand Up @@ -113,7 +115,7 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
# Resolve directory where FABs are installed
flwr_dir_ = get_flwr_dir(flwr_dir)
log_uploader = None

success = True
while True:

try:
Expand All @@ -129,6 +131,8 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
run = run_from_proto(res.run)
fab = fab_from_proto(res.fab)

hash_run_id = get_sha256_hash(run.run_id)

driver.set_run(run.run_id)

# Start log uploader for this run
Expand Down Expand Up @@ -171,6 +175,11 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto)
)

event(
EventType.FLWR_SERVERAPP_RUN_ENTER,
event_details={"run-id-hash": hash_run_id},
)

# Load and run the ServerApp with the Driver
updated_context = run_(
driver=driver,
Expand All @@ -187,17 +196,18 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
_ = driver._stub.PushServerAppOutputs(out_req)

run_status = RunStatus(Status.FINISHED, SubStatus.COMPLETED, "")

except RunNotRunningException:
log(INFO, "")
log(INFO, "Run ID %s stopped.", run.run_id)
log(INFO, "")
run_status = None
success = False

except Exception as ex: # pylint: disable=broad-exception-caught
exc_entity = "ServerApp"
log(ERROR, "%s raised an exception", exc_entity, exc_info=ex)
run_status = RunStatus(Status.FINISHED, SubStatus.FAILED, str(ex))
success = False

finally:
# Stop log uploader for this run and upload final logs
Expand All @@ -213,6 +223,10 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
run_id=run.run_id, run_status=run_status_proto
)
)
event(
EventType.FLWR_SERVERAPP_RUN_LEAVE,
event_details={"run-id-hash": hash_run_id, "success": success},
)

# Stop the loop if `flwr-serverapp` is expected to process a single run
if run_once:
Expand Down
14 changes: 12 additions & 2 deletions src/py/flwr/simulation/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.common import EventType
from flwr.cli.utils import get_sha256_hash
from flwr.common import EventType, event
from flwr.common.args import add_args_flwr_app_common
from flwr.common.config import (
get_flwr_dir,
Expand Down Expand Up @@ -202,6 +203,15 @@ def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R09
verbose: bool = fed_opt.get("verbose", False)
enable_tf_gpu_growth: bool = fed_opt.get("enable_tf_gpu_growth", False)

event(
EventType.FLWR_SIMULATION_RUN_ENTER,
event_details={
"backend": "ray",
"num-supernodes": num_supernodes,
"run-id-hash": get_sha256_hash(run.run_id),
},
)

# Launch the simulation
updated_context = _run_simulation(
server_app_attr=server_app_attr,
Expand All @@ -214,7 +224,7 @@ def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R09
verbose_logging=verbose,
server_app_run_config=fused_config,
is_app=True,
exit_event=EventType.CLI_FLOWER_SIMULATION_LEAVE,
exit_event=EventType.FLWR_SIMULATION_RUN_LEAVE,
)

# Send resulting context
Expand Down
9 changes: 8 additions & 1 deletion src/py/flwr/simulation/run_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from typing import Any, Optional

from flwr.cli.config_utils import load_and_validate
from flwr.cli.utils import get_sha256_hash
from flwr.client import ClientApp
from flwr.common import Context, EventType, RecordSet, event, log, now
from flwr.common.config import get_fused_config_from_dir, parse_config_args
Expand Down Expand Up @@ -394,7 +395,13 @@ def _main_loop(
finally:
# Trigger stop event
f_stop.set()
event(exit_event, event_details={"success": success})
event(
exit_event,
event_details={
"run-id-hash": get_sha256_hash(run.run_id),
"success": success,
},
)
if serverapp_th:
serverapp_th.join()
if server_app_thread_has_exception.is_set():
Expand Down