diff --git a/datasets/flwr_datasets/partitioner/__init__.py b/datasets/flwr_datasets/partitioner/__init__.py index 8770d5b8b76e..583c48efee93 100644 --- a/datasets/flwr_datasets/partitioner/__init__.py +++ b/datasets/flwr_datasets/partitioner/__init__.py @@ -29,6 +29,7 @@ from .shard_partitioner import ShardPartitioner from .size_partitioner import SizePartitioner from .square_partitioner import SquarePartitioner +from .vertical_even_partitioner import VerticalEvenPartitioner from .vertical_size_partitioner import VerticalSizePartitioner __all__ = [ @@ -46,5 +47,6 @@ "ShardPartitioner", "SizePartitioner", "SquarePartitioner", + "VerticalEvenPartitioner", "VerticalSizePartitioner", ] diff --git a/datasets/flwr_datasets/partitioner/vertical_even_partitioner.py b/datasets/flwr_datasets/partitioner/vertical_even_partitioner.py new file mode 100644 index 000000000000..03a8631b86cb --- /dev/null +++ b/datasets/flwr_datasets/partitioner/vertical_even_partitioner.py @@ -0,0 +1,220 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""VerticalEvenPartitioner class.""" +# flake8: noqa: E501 +# pylint: disable=C0301, R0902, R0913 +from typing import Literal, Optional, Union + +import numpy as np + +import datasets +from flwr_datasets.partitioner.partitioner import Partitioner +from flwr_datasets.partitioner.vertical_partitioner_utils import ( + _add_active_party_columns, + _init_optional_str_or_list_str, + _list_split, +) + + +class VerticalEvenPartitioner(Partitioner): + """Partitioner that splits features (columns) evenly into vertical partitions. + + Enables selection of "active party" column(s) and placement into + a specific partition or creation of a new partition just for it. + Also enables droping columns and sharing specified columns across + all partitions. + + Parameters + ---------- + num_partitions : int + Number of partitions to create. + active_party_column : Optional[Union[str, list[str]]] + Column(s) (typically representing labels) associated with the + "active party" (which can be the server). + active_party_columns_mode : Union[Literal[["add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all"], int] + Determines how to assign the active party columns: + + - `"add_to_first"`: Append active party columns to the first partition. + - `"add_to_last"`: Append active party columns to the last partition. + - `"create_as_first"`: Create a new partition at the start containing only these columns. + - `"create_as_last"`: Create a new partition at the end containing only these columns. + - `"add_to_all"`: Append active party columns to all partitions. + - int: Append active party columns to the specified partition index. + drop_columns : Optional[Union[str, list[str]]] + Columns to remove entirely from the dataset before partitioning. + shared_columns : Optional[Union[str, list[str]]] + Columns to duplicate into every partition after initial partitioning. + shuffle : bool + Whether to shuffle the order of columns before partitioning. + seed : Optional[int] + Random seed for shuffling columns. Has no effect if `shuffle=False`. + + Examples + -------- + >>> from flwr_datasets import FederatedDataset + >>> from flwr_datasets.partitioner import VerticalEvenPartitioner + >>> + >>> partitioner = VerticalEvenPartitioner( + ... num_partitions=3, + ... active_party_columns="income", + ... active_party_columns_mode="add_to_last", + ... shuffle=True, + ... seed=42 + ... ) + >>> fds = FederatedDataset( + ... dataset="scikit-learn/adult-census-income", + ... partitioners={"train": partitioner} + ... ) + >>> partitions = [fds.load_partition(i) for i in range(fds.partitioners["train"].num_partitions)] + >>> print([partition.column_names for partition in partitions]) + """ + + def __init__( + self, + num_partitions: int, + active_party_columns: Optional[Union[str, list[str]]] = None, + active_party_columns_mode: Union[ + Literal[ + "add_to_first", + "add_to_last", + "create_as_first", + "create_as_last", + "add_to_all", + ], + int, + ] = "add_to_last", + drop_columns: Optional[Union[str, list[str]]] = None, + shared_columns: Optional[Union[str, list[str]]] = None, + shuffle: bool = True, + seed: Optional[int] = 42, + ) -> None: + super().__init__() + + self._num_partitions = num_partitions + self._active_party_columns = _init_optional_str_or_list_str( + active_party_columns + ) + self._active_party_columns_mode = active_party_columns_mode + self._drop_columns = _init_optional_str_or_list_str(drop_columns) + self._shared_columns = _init_optional_str_or_list_str(shared_columns) + self._shuffle = shuffle + self._seed = seed + self._rng = np.random.default_rng(seed=self._seed) + + self._partition_columns: Optional[list[list[str]]] = None + self._partitions_determined = False + + self._validate_parameters_in_init() + + def _determine_partitions_if_needed(self) -> None: + if self._partitions_determined: + return + + if self.dataset is None: + raise ValueError("No dataset is set for this partitioner.") + + all_columns = list(self.dataset.column_names) + self._validate_parameters_while_partitioning( + all_columns, self._shared_columns, self._active_party_columns + ) + columns = [column for column in all_columns if column not in self._drop_columns] + columns = [column for column in columns if column not in self._shared_columns] + columns = [ + column for column in columns if column not in self._active_party_columns + ] + + if self._shuffle: + self._rng.shuffle(columns) + partition_columns = _list_split(columns, self._num_partitions) + partition_columns = _add_active_party_columns( + self._active_party_columns, + self._active_party_columns_mode, + partition_columns, + ) + + # Add shared columns to all partitions + for partition in partition_columns: + for column in self._shared_columns: + partition.append(column) + + self._partition_columns = partition_columns + self._partitions_determined = True + + def load_partition(self, partition_id: int) -> datasets.Dataset: + """Load a partition based on the partition index. + + Parameters + ---------- + partition_id : int + The index that corresponds to the requested partition. + + Returns + ------- + dataset_partition : Dataset + Single partition of a dataset. + """ + self._determine_partitions_if_needed() + assert self._partition_columns is not None + if partition_id < 0 or partition_id >= len(self._partition_columns): + raise ValueError(f"Invalid partition_id {partition_id}.") + columns = self._partition_columns[partition_id] + return self.dataset.select_columns(columns) + + @property + def num_partitions(self) -> int: + """Number of partitions.""" + self._determine_partitions_if_needed() + assert self._partition_columns is not None + return len(self._partition_columns) + + def _validate_parameters_in_init(self) -> None: + if self._num_partitions < 1: + raise ValueError("`column_distribution` as int must be >= 1.") + + valid_modes = { + "add_to_first", + "add_to_last", + "create_as_first", + "create_as_last", + "add_to_all", + } + if not ( + isinstance(self._active_party_columns_mode, int) + or self._active_party_columns_mode in valid_modes + ): + raise ValueError( + "`active_party_column_mode` must be an int or one of " + "'add_to_first', 'add_to_last', 'create_as_first', 'create_as_last', " + "'add_to_all'." + ) + + def _validate_parameters_while_partitioning( + self, + all_columns: list[str], + shared_columns: list[str], + active_party_column: Union[str, list[str]], + ) -> None: + if isinstance(active_party_column, str): + active_party_column = [active_party_column] + # Shared columns existance check + for column in shared_columns: + if column not in all_columns: + raise ValueError(f"Shared column '{column}' not found in the dataset.") + # Active party columns existence check + for column in active_party_column: + if column not in all_columns: + raise ValueError( + f"Active party column '{column}' not found in the dataset." + ) diff --git a/datasets/flwr_datasets/partitioner/vertical_even_partitioner_test.py b/datasets/flwr_datasets/partitioner/vertical_even_partitioner_test.py new file mode 100644 index 000000000000..aa93db7a9fd5 --- /dev/null +++ b/datasets/flwr_datasets/partitioner/vertical_even_partitioner_test.py @@ -0,0 +1,202 @@ +# Copyright 2024 Flower Labs GmbH. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""VerticalEvenPartitioner class tests.""" +# mypy: disable-error-code=list-item +import unittest + +import numpy as np + +from datasets import Dataset +from flwr_datasets.partitioner.vertical_even_partitioner import VerticalEvenPartitioner + + +def _create_dummy_dataset(column_names: list[str], num_rows: int = 100) -> Dataset: + """Create a dummy dataset with random data for testing.""" + data = {} + rng = np.random.default_rng(seed=42) + for col in column_names: + # Just numeric data; could also be strings, categoricals, etc. + data[col] = rng.integers(0, 100, size=num_rows).tolist() + return Dataset.from_dict(data) + + +class TestVerticalEvenPartitioner(unittest.TestCase): + """Unit tests for VerticalEvenPartitioner.""" + + def test_init_with_invalid_num_partitions(self) -> None: + """Test that initializing with an invalid number of partitions.""" + with self.assertRaises(ValueError): + VerticalEvenPartitioner(num_partitions=0) + + def test_init_with_invalid_active_party_mode(self) -> None: + """Test initialization with invalid active_party_columns_mode.""" + with self.assertRaises(ValueError): + VerticalEvenPartitioner( + num_partitions=2, + active_party_columns_mode="invalid_mode", # type: ignore[arg-type] + ) + + def test_init_with_non_string_drop_columns(self) -> None: + """Test initialization with non-string elements in drop_columns.""" + with self.assertRaises(TypeError): + VerticalEvenPartitioner(num_partitions=2, drop_columns=[1, "a", 3]) + + def test_init_with_non_string_shared_columns(self) -> None: + """Test initialization with non-string elements in shared_columns.""" + with self.assertRaises(TypeError): + VerticalEvenPartitioner(num_partitions=2, shared_columns=["col1", 123]) + + def test_init_with_non_string_active_party_column(self) -> None: + """Test initialization with non-string elements in active_party_column.""" + with self.assertRaises(TypeError): + VerticalEvenPartitioner( + num_partitions=2, active_party_columns=["col1", None] + ) + + def test_partitioning_basic(self) -> None: + """Test basic partitioning with no special columns or dropping.""" + columns = ["feature1", "feature2", "feature3", "feature4"] + dataset = _create_dummy_dataset(columns, num_rows=50) + partitioner = VerticalEvenPartitioner(num_partitions=2, shuffle=False) + partitioner.dataset = dataset + + self.assertEqual(partitioner.num_partitions, 2) + + p0 = partitioner.load_partition(0) + p1 = partitioner.load_partition(1) + + self.assertEqual(len(p0.column_names), 2) + self.assertEqual(len(p1.column_names), 2) + self.assertIn("feature1", p0.column_names) + self.assertIn("feature2", p0.column_names) + self.assertIn("feature3", p1.column_names) + self.assertIn("feature4", p1.column_names) + + def test_partitioning_with_drop_columns(self) -> None: + """Test partitioning while dropping some columns.""" + columns = ["feature1", "feature2", "drop_me", "feature3", "feature4"] + dataset = _create_dummy_dataset(columns, num_rows=50) + partitioner = VerticalEvenPartitioner( + num_partitions=2, drop_columns=["drop_me"], shuffle=False, seed=42 + ) + partitioner.dataset = dataset + + p0 = partitioner.load_partition(0) + p1 = partitioner.load_partition(1) + all_partition_columns = p0.column_names + p1.column_names + + # The drop_me should not be in any partition + self.assertNotIn("drop_me", all_partition_columns) + # The rest of columns should be distributed + self.assertIn("feature1", all_partition_columns) + self.assertIn("feature2", all_partition_columns) + self.assertIn("feature3", all_partition_columns) + self.assertIn("feature4", all_partition_columns) + + def test_partitioning_with_shared_columns(self) -> None: + """Test that shared columns are present in all partitions.""" + columns = ["f1", "f2", "f3", "f4", "shared_col"] + dataset = _create_dummy_dataset(columns, num_rows=50) + partitioner = VerticalEvenPartitioner( + num_partitions=2, shared_columns=["shared_col"], shuffle=False, seed=42 + ) + partitioner.dataset = dataset + + p0 = partitioner.load_partition(0) + p1 = partitioner.load_partition(1) + + self.assertIn("shared_col", p0.column_names) + self.assertIn("shared_col", p1.column_names) + + def test_partitioning_with_active_party_columns_add_to_last(self) -> None: + """Test active party columns are appended to the last partition.""" + columns = ["f1", "f2", "f3", "f4", "income"] + dataset = _create_dummy_dataset(columns, num_rows=50) + partitioner = VerticalEvenPartitioner( + num_partitions=2, + active_party_columns=["income"], + active_party_columns_mode="add_to_last", + shuffle=False, + seed=42, + ) + partitioner.dataset = dataset + + p0 = partitioner.load_partition(0) + p1 = partitioner.load_partition(1) + + # The income should be only in the last partition + self.assertNotIn("income", p0.column_names) + self.assertIn("income", p1.column_names) + + def test_partitioning_with_active_party_columns_create_as_first(self) -> None: + """Test creating a new partition solely for active party columns.""" + columns = ["f1", "f2", "f3", "f4", "income"] + dataset = _create_dummy_dataset(columns, num_rows=50) + partitioner = VerticalEvenPartitioner( + num_partitions=2, + active_party_columns=["income"], + active_party_columns_mode="create_as_first", + shuffle=False, + ) + partitioner.dataset = dataset + + # The first partition should be just the active party columns + # and then two more partitions from original splitting. + self.assertEqual(partitioner.num_partitions, 3) + + p0 = partitioner.load_partition(0) # active party partition + p1 = partitioner.load_partition(1) + p2 = partitioner.load_partition(2) + + self.assertEqual(p0.column_names, ["income"]) + self.assertIn("f1", p1.column_names) + self.assertIn("f2", p1.column_names) + self.assertIn("f3", p2.column_names) + self.assertIn("f4", p2.column_names) + + def test_partitioning_with_nonexistent_active_party_column(self) -> None: + """Test that a ValueError is raised if active party column does not exist.""" + columns = ["f1", "f2", "f3", "f4"] + dataset = _create_dummy_dataset(columns, num_rows=50) + partitioner = VerticalEvenPartitioner( + num_partitions=2, + active_party_columns=["income"], # Not present in dataset + active_party_columns_mode="add_to_last", + shuffle=False, + ) + partitioner.dataset = dataset + + with self.assertRaises(ValueError) as context: + partitioner.load_partition(0) + self.assertIn("Active party column 'income' not found", str(context.exception)) + + def test_partitioning_with_nonexistent_shared_columns(self) -> None: + """Test that a ValueError is raised if shared column does not exist.""" + columns = ["f1", "f2", "f3"] + dataset = _create_dummy_dataset(columns, num_rows=50) + partitioner = VerticalEvenPartitioner( + num_partitions=2, shared_columns=["nonexistent_col"], shuffle=False + ) + partitioner.dataset = dataset + + with self.assertRaises(ValueError) as context: + partitioner.load_partition(0) + self.assertIn( + "Shared column 'nonexistent_col' not found", str(context.exception) + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/datasets/flwr_datasets/partitioner/vertical_partitioner_utils.py b/datasets/flwr_datasets/partitioner/vertical_partitioner_utils.py index e9e7e3855ef4..f9c40a1b0554 100644 --- a/datasets/flwr_datasets/partitioner/vertical_partitioner_utils.py +++ b/datasets/flwr_datasets/partitioner/vertical_partitioner_utils.py @@ -47,8 +47,8 @@ def _list_split(lst: list[Any], num_sublists: int) -> list[list[Any]]: return sublists -def _add_active_party_columns( - active_party_columns: list[str], +def _add_active_party_columns( # pylint: disable=R0912 + active_party_columns: Union[str, list[str]], active_party_columns_mode: Union[ Literal[ "add_to_first", @@ -65,7 +65,7 @@ def _add_active_party_columns( Parameters ---------- - active_party_columns : list[str] + active_party_columns : Union[str, list[str]] List of active party columns. active_party_columns_mode : Union[Literal["add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all"], int] Mode to add active party columns to partition columns. @@ -75,6 +75,8 @@ def _add_active_party_columns( partition_columns: list[list[str]] List of partition columns after the modyfication. """ + if isinstance(active_party_columns, str): + active_party_columns = [active_party_columns] if isinstance(active_party_columns_mode, int): partition_id = active_party_columns_mode if partition_id < 0 or partition_id >= len(partition_columns): @@ -101,3 +103,29 @@ def _add_active_party_columns( for partition in partition_columns: partition.append(column) return partition_columns + + +def _init_optional_str_or_list_str(parameter: Union[str, list[str], None]) -> list[str]: + """Initialize a parameter as a list of strings. + + Parameters + ---------- + parameter : Union[str, list[str], None] + A parameter that should be a string, a list of strings, or None. + + Returns + ------- + parameter: list[str] + The parameter as a list of strings. + """ + if parameter is None: + return [] + if not isinstance(parameter, (str, list)): + raise TypeError("Parameter must be a string or a list of strings") + if isinstance(parameter, list) and not all( + isinstance(single_param, str) for single_param in parameter + ): + raise TypeError("All elements in the list must be strings") + if isinstance(parameter, str): + return [parameter] + return parameter diff --git a/datasets/flwr_datasets/partitioner/vertical_size_partitioner.py b/datasets/flwr_datasets/partitioner/vertical_size_partitioner.py index 462a76a2e3f5..31fee5bf3724 100644 --- a/datasets/flwr_datasets/partitioner/vertical_size_partitioner.py +++ b/datasets/flwr_datasets/partitioner/vertical_size_partitioner.py @@ -24,6 +24,7 @@ from flwr_datasets.partitioner.partitioner import Partitioner from flwr_datasets.partitioner.vertical_partitioner_utils import ( _add_active_party_columns, + _init_optional_str_or_list_str, ) @@ -51,7 +52,7 @@ class VerticalSizePartitioner(Partitioner): toward the partition sizes. In case fo list[int]: sum(partition_sizes) == len(columns) - len(drop_columns) - len(shared_columns) - len(active_party_columns) - active_party_column : Optional[Union[str, list[str]]] + active_party_columns : Optional[Union[str, list[str]]] Column(s) (typically representing labels) associated with the "active party" (which can be the server). active_party_columns_mode : Union[Literal[["add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all"], int] @@ -63,9 +64,9 @@ class VerticalSizePartitioner(Partitioner): - `"create_as_last"`: Create a new partition at the end containing only these columns. - `"add_to_all"`: Append active party columns to all partitions. - int: Append active party columns to the specified partition index. - drop_columns : Optional[list[str]] + drop_columns : Optional[Union[str, list[str]]] Columns to remove entirely from the dataset before partitioning. - shared_columns : Optional[list[str]] + shared_columns : Optional[Union[str, list[str]]] Columns to duplicate into every partition after initial partitioning. shuffle : bool Whether to shuffle the order of columns before partitioning. @@ -79,7 +80,7 @@ class VerticalSizePartitioner(Partitioner): >>> >>> partitioner = VerticalSizePartitioner( ... partition_sizes=[8, 4, 2], - ... active_party_column="income", + ... active_party_columns="income", ... active_party_columns_mode="create_as_last" ... ) >>> fds = FederatedDataset( @@ -93,7 +94,7 @@ class VerticalSizePartitioner(Partitioner): def __init__( self, partition_sizes: Union[list[int], list[float]], - active_party_column: Optional[Union[str, list[str]]] = None, + active_party_columns: Optional[Union[str, list[str]]] = None, active_party_columns_mode: Union[ Literal[ "add_to_first", @@ -104,18 +105,20 @@ def __init__( ], int, ] = "add_to_last", - drop_columns: Optional[list[str]] = None, - shared_columns: Optional[list[str]] = None, + drop_columns: Optional[Union[str, list[str]]] = None, + shared_columns: Optional[Union[str, list[str]]] = None, shuffle: bool = True, seed: Optional[int] = 42, ) -> None: super().__init__() self._partition_sizes = partition_sizes - self._active_party_columns = self._init_active_party_column(active_party_column) + self._active_party_columns = _init_optional_str_or_list_str( + active_party_columns + ) self._active_party_columns_mode = active_party_columns_mode - self._drop_columns = drop_columns or [] - self._shared_columns = shared_columns or [] + self._drop_columns = _init_optional_str_or_list_str(drop_columns) + self._shared_columns = _init_optional_str_or_list_str(shared_columns) self._shuffle = shuffle self._seed = seed self._rng = np.random.default_rng(seed=self._seed) @@ -201,8 +204,17 @@ def _validate_parameters_in_init(self) -> None: raise ValueError("partition_sizes must be a list.") if all(isinstance(fraction, float) for fraction in self._partition_sizes): fraction_sum = sum(self._partition_sizes) + # Tolerance 0.01 for the floating point numerical problems + if 0.99 < fraction_sum < 1.01: + self._partition_sizes = self._partition_sizes[:-1] + [ + 1.0 - self._partition_sizes[-1] + ] + fraction_sum = 1.0 if fraction_sum != 1.0: - raise ValueError("Float ratios in `partition_sizes` must sum to 1.0.") + raise ValueError( + "Float ratios in `partition_sizes` must sum to 1.0. " + f"Instead got: {fraction_sum}." + ) if any( fraction < 0.0 or fraction > 1.0 for fraction in self._partition_sizes ): @@ -276,17 +288,6 @@ def _validate_parameters_while_partitioning( "active_party_columns are not included in the division." ) - def _init_active_party_column( - self, active_party_column: Optional[Union[str, list[str]]] - ) -> list[str]: - if active_party_column is None: - return [] - if isinstance(active_party_column, str): - return [active_party_column] - if isinstance(active_party_column, list): - return active_party_column - raise ValueError("active_party_column must be a string or a list of strings.") - def _count_split(columns: list[str], counts: list[int]) -> list[list[str]]: partition_columns = [] diff --git a/datasets/flwr_datasets/partitioner/vertical_size_partitioner_test.py b/datasets/flwr_datasets/partitioner/vertical_size_partitioner_test.py index d2c483c2be88..444105d676d0 100644 --- a/datasets/flwr_datasets/partitioner/vertical_size_partitioner_test.py +++ b/datasets/flwr_datasets/partitioner/vertical_size_partitioner_test.py @@ -67,8 +67,8 @@ def test_init_invalid_mode(self) -> None: def test_init_active_party_column_invalid_type(self) -> None: """Check ValueError if active_party_column is not str/list.""" - with self.assertRaises(ValueError): - VerticalSizePartitioner(partition_sizes=[2, 2], active_party_column=123) + with self.assertRaises(TypeError): + VerticalSizePartitioner(partition_sizes=[2, 2], active_party_columns=123) def test_partitioning_with_int_sizes(self) -> None: """Check correct partitioning with integer sizes.""" @@ -124,7 +124,7 @@ def test_partitioning_with_active_party_add_to_last(self) -> None: dataset = _create_dummy_dataset(columns) partitioner = VerticalSizePartitioner( partition_sizes=[2], - active_party_column="label", + active_party_columns="label", active_party_columns_mode="add_to_last", shuffle=False, ) @@ -138,7 +138,7 @@ def test_partitioning_with_active_party_create_as_first(self) -> None: dataset = _create_dummy_dataset(columns) partitioner = VerticalSizePartitioner( partition_sizes=[2], - active_party_column="label", + active_party_columns="label", active_party_columns_mode="create_as_first", shuffle=False, ) @@ -166,7 +166,7 @@ def test_partitioning_with_nonexistent_active_party_column(self) -> None: columns = ["f1", "f2"] dataset = _create_dummy_dataset(columns) partitioner = VerticalSizePartitioner( - partition_sizes=[1], active_party_column="missing_label", shuffle=False + partition_sizes=[1], active_party_columns="missing_label", shuffle=False ) partitioner.dataset = dataset with self.assertRaises(ValueError): diff --git a/src/py/flwr/client/supernode/app.py b/src/py/flwr/client/supernode/app.py index 0aad227695b9..c7dad2dfd426 100644 --- a/src/py/flwr/client/supernode/app.py +++ b/src/py/flwr/client/supernode/app.py @@ -114,9 +114,8 @@ def run_client_app() -> None: event(EventType.RUN_CLIENT_APP_ENTER) log( ERROR, - "The command `flower-client-app` has been replaced by `flower-supernode`.", + "The command `flower-client-app` has been replaced by `flwr run`.", ) - log(INFO, "Execute `flower-supernode --help` to learn how to use it.") register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) diff --git a/src/py/flwr/common/telemetry.py b/src/py/flwr/common/telemetry.py index 9f6a51901f54..53c32d64ce5f 100644 --- a/src/py/flwr/common/telemetry.py +++ b/src/py/flwr/common/telemetry.py @@ -151,6 +151,16 @@ def _generate_next_value_(name: str, start: int, count: int, last_values: list[A # Not yet implemented + # --- `flwr-*` commands ------------------------------------------------------------ + + # CLI: flwr-simulation + FLWR_SIMULATION_RUN_ENTER = auto() + FLWR_SIMULATION_RUN_LEAVE = auto() + + # CLI: flwr-serverapp + FLWR_SERVERAPP_RUN_ENTER = auto() + FLWR_SERVERAPP_RUN_LEAVE = auto() + # --- Simulation Engine ------------------------------------------------------------ # CLI: flower-simulation @@ -171,12 +181,12 @@ def _generate_next_value_(name: str, start: int, count: int, last_values: list[A RUN_SUPERNODE_ENTER = auto() RUN_SUPERNODE_LEAVE = auto() - # CLI: `flower-server-app` + # --- DEPRECATED ------------------------------------------------------------------- + + # [DEPRECATED] CLI: `flower-server-app` RUN_SERVER_APP_ENTER = auto() RUN_SERVER_APP_LEAVE = auto() - # --- DEPRECATED ------------------------------------------------------------------- - # [DEPRECATED] CLI: `flower-client-app` RUN_CLIENT_APP_ENTER = auto() RUN_CLIENT_APP_LEAVE = auto() diff --git a/src/py/flwr/server/run_serverapp.py b/src/py/flwr/server/run_serverapp.py index 23d4102e77dd..49660c5ff077 100644 --- a/src/py/flwr/server/run_serverapp.py +++ b/src/py/flwr/server/run_serverapp.py @@ -15,12 +15,12 @@ """Run ServerApp.""" -import sys from logging import DEBUG, ERROR from typing import Optional -from flwr.common import Context -from flwr.common.logger import log, warn_unsupported_feature +from flwr.common import Context, EventType, event +from flwr.common.exit_handlers import register_exit_handlers +from flwr.common.logger import log from flwr.common.object_ref import load_app from .driver import Driver @@ -66,12 +66,11 @@ def _load() -> ServerApp: return context -# pylint: disable-next=too-many-branches,too-many-statements,too-many-locals def run_server_app() -> None: """Run Flower server app.""" - warn_unsupported_feature( - "The command `flower-server-app` is deprecated and no longer in use. " - "Use the `flwr-serverapp` exclusively instead." + event(EventType.RUN_SERVER_APP_ENTER) + log( + ERROR, + "The command `flower-server-app` has been replaced by `flwr run`.", ) - log(ERROR, "`flower-server-app` used.") - sys.exit() + register_exit_handlers(event_type=EventType.RUN_SERVER_APP_LEAVE) diff --git a/src/py/flwr/server/serverapp/app.py b/src/py/flwr/server/serverapp/app.py index 8fbfec00272d..70b0308da2b0 100644 --- a/src/py/flwr/server/serverapp/app.py +++ b/src/py/flwr/server/serverapp/app.py @@ -25,6 +25,7 @@ from flwr.cli.config_utils import get_fab_metadata from flwr.cli.install import install_from_fab +from flwr.cli.utils import get_sha256_hash from flwr.common.args import add_args_flwr_app_common from flwr.common.config import ( get_flwr_dir, @@ -51,6 +52,7 @@ run_from_proto, run_status_to_proto, ) +from flwr.common.telemetry import EventType, event from flwr.common.typing import RunNotRunningException, RunStatus from flwr.proto.run_pb2 import UpdateRunStatusRequest # pylint: disable=E0611 from flwr.proto.serverappio_pb2 import ( # pylint: disable=E0611 @@ -113,7 +115,7 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915 # Resolve directory where FABs are installed flwr_dir_ = get_flwr_dir(flwr_dir) log_uploader = None - + success = True while True: try: @@ -129,6 +131,8 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915 run = run_from_proto(res.run) fab = fab_from_proto(res.fab) + hash_run_id = get_sha256_hash(run.run_id) + driver.set_run(run.run_id) # Start log uploader for this run @@ -171,6 +175,11 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915 UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto) ) + event( + EventType.FLWR_SERVERAPP_RUN_ENTER, + event_details={"run-id-hash": hash_run_id}, + ) + # Load and run the ServerApp with the Driver updated_context = run_( driver=driver, @@ -187,17 +196,18 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915 _ = driver._stub.PushServerAppOutputs(out_req) run_status = RunStatus(Status.FINISHED, SubStatus.COMPLETED, "") - except RunNotRunningException: log(INFO, "") log(INFO, "Run ID %s stopped.", run.run_id) log(INFO, "") run_status = None + success = False except Exception as ex: # pylint: disable=broad-exception-caught exc_entity = "ServerApp" log(ERROR, "%s raised an exception", exc_entity, exc_info=ex) run_status = RunStatus(Status.FINISHED, SubStatus.FAILED, str(ex)) + success = False finally: # Stop log uploader for this run and upload final logs @@ -213,6 +223,10 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915 run_id=run.run_id, run_status=run_status_proto ) ) + event( + EventType.FLWR_SERVERAPP_RUN_LEAVE, + event_details={"run-id-hash": hash_run_id, "success": success}, + ) # Stop the loop if `flwr-serverapp` is expected to process a single run if run_once: diff --git a/src/py/flwr/simulation/app.py b/src/py/flwr/simulation/app.py index 4aab3d75156f..a60b9a5e9150 100644 --- a/src/py/flwr/simulation/app.py +++ b/src/py/flwr/simulation/app.py @@ -24,7 +24,8 @@ from flwr.cli.config_utils import get_fab_metadata from flwr.cli.install import install_from_fab -from flwr.common import EventType +from flwr.cli.utils import get_sha256_hash +from flwr.common import EventType, event from flwr.common.args import add_args_flwr_app_common from flwr.common.config import ( get_flwr_dir, @@ -202,6 +203,15 @@ def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R09 verbose: bool = fed_opt.get("verbose", False) enable_tf_gpu_growth: bool = fed_opt.get("enable_tf_gpu_growth", False) + event( + EventType.FLWR_SIMULATION_RUN_ENTER, + event_details={ + "backend": "ray", + "num-supernodes": num_supernodes, + "run-id-hash": get_sha256_hash(run.run_id), + }, + ) + # Launch the simulation updated_context = _run_simulation( server_app_attr=server_app_attr, @@ -214,7 +224,7 @@ def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R09 verbose_logging=verbose, server_app_run_config=fused_config, is_app=True, - exit_event=EventType.CLI_FLOWER_SIMULATION_LEAVE, + exit_event=EventType.FLWR_SIMULATION_RUN_LEAVE, ) # Send resulting context diff --git a/src/py/flwr/simulation/run_simulation.py b/src/py/flwr/simulation/run_simulation.py index cc8844405b0b..1c53ff2a93e9 100644 --- a/src/py/flwr/simulation/run_simulation.py +++ b/src/py/flwr/simulation/run_simulation.py @@ -28,6 +28,7 @@ from typing import Any, Optional from flwr.cli.config_utils import load_and_validate +from flwr.cli.utils import get_sha256_hash from flwr.client import ClientApp from flwr.common import Context, EventType, RecordSet, event, log, now from flwr.common.config import get_fused_config_from_dir, parse_config_args @@ -394,7 +395,13 @@ def _main_loop( finally: # Trigger stop event f_stop.set() - event(exit_event, event_details={"success": success}) + event( + exit_event, + event_details={ + "run-id-hash": get_sha256_hash(run.run_id), + "success": success, + }, + ) if serverapp_th: serverapp_th.join() if server_app_thread_has_exception.is_set():