Skip to content

Commit

Permalink
Merge pull request #142 from KarhouTam/dev
Browse files Browse the repository at this point in the history
Periodic update
  • Loading branch information
KarhouTam authored Dec 22, 2024
2 parents acfd5e3 + a393db5 commit d0168f6
Show file tree
Hide file tree
Showing 45 changed files with 1,545 additions and 469 deletions.
552 changes: 539 additions & 13 deletions .env/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions .env/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ ray = { extras = ["default"], version = "2.36.1" }
tensorboard = "^2.17.1"
cvxpy = "^1.5.1"
hydra-core = "^1.3.2"
flwr-datasets = "^0.4.0"
statsmodels = "^0.14.4"
pytorch-minimize = "^0.0.2"

Expand Down
1 change: 1 addition & 0 deletions .env/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ ray[default]~=2.38.0
tensorboard~=2.17.1
cvxpy~=1.5.1
hydra-core~=1.3.2
flwr-datasets~=0.4.0
statsmodels~=0.14.4
pytorch-minimize~=0.0.2
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ out
multirun

# datasets

data/cifar10
data/cifar100
data/mnist
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ FL-bench welcomes PR on everything that can make this project better.
- ***FLUTE*** -- [Federated Representation Learning in the Under-Parameterized Regime](https://openreview.net/forum?id=LIQYhV45D4) (ICML'24)
- ***FedAS*** -- [FedAS: Bridging Inconsistency in Personalized Federated Learning](https://openaccess.thecvf.com/content/CVPR2024/html/Yang_FedAS_Bridging_Inconsistency_in_Personalized_Federated_Learning_CVPR_2024_paper.html) (CVPR'24)
- ***pFedFDA*** -- [pFedFDA: Personalized Federated Learning via Feature Distribution Adaptation](http://arxiv.org/abs/2411.00329) (NeurIPS 2024)
- ***Floco*** -- [Federated Learning over Connected Modes](https://openreview.net/forum?id=JL2eMCfDW8) (NeurIPS'24)
<!-- </details> -->


Expand Down
45 changes: 28 additions & 17 deletions config/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,7 @@ common:
local_epoch: 5 # Number if epochs of client local training.
finetune_epoch: 0 # Number of epochs of clients fine-tunning their models before test.
batch_size: 32 # Data batch size for client local training.
test_interval: 100 # Interval round of performing test on all test clients.
test_server_interval: -1 # Interval round of performing test on centralized server model.
reset_optimizer_on_global_epoch: true # Whether to reset optimizer on each global epoch.
test_server_in_train_mode: false # Whether to evaluate the server model in train mode or eval mode. Evaluating in train mode allows to get better batchnorm statistics, but is dependent on the order of the data.


# The ratio of stragglers (set in [0, 1]).
# Stragglers would not perform full-epoch local training as normal clients.
Expand All @@ -99,25 +95,40 @@ common:
# drop: clients will drop their buffers after training done.
buffers: global # [local, global, drop]

# Set eval_<...> as true for performing evaluation on <...>sets held by
# this round's joined clients before and after local training.
eval_test: true
eval_val: false
eval_train: false

# Set test_<...> as true for performing testing on <...>sets on
# all clients or the server.
test_test: false
test_val: false
test_train: false
# Whether to evaluate client local models (that before and after local training) on client side.
# You can deactivating this for acclerating training.
# NOTE: deactivate this feature will affect features like logging and monitoring.
client_side_evaluation: true

# The evaluation settings for client side and server side.
test:
# For example, Set client.testset as true to evaluate on local testsets from selected clients this round with
# client local models before and after local training.
# Frequency is set by `client.interval`. Negative value for disabling.
client:
interval: 100
train: false
val: false
test: true
# For example, set server.trainset as true to evaluate on a centralized testset (created by aggregating all clients' local testsets)
# with the updated global model at the end of a communication round.
# Frequency is set by `server.interval`. Negative value for disabling.
server:
interval: -1
train: false
val: false
test: false
# Whether to evaluate the global model in train mode or eval mode.
# Evaluating in train mode allows to get better batchnorm statistics, but is dependent on the order of the data.
model_in_train_mode: false

verbose_gap: 10 # Interval round of displaying clients training performance on terminal.
monitor: null # [null, visdom, tensorboard]
use_cuda: true # Whether to use cuda for training.

save_log: true # Whether to save log files in out/<method>/<start_time>.
save_model: false # Whether to save model weights (*.pt) in out/<method>/<start_time>.
save_fig: true # Whether to save learning curve firgure (*.png) in out/<method>/<start_time>.
save_learning_curve_plot: true # Whether to save learning curve figure (*.png) in out/<method>/<start_time>.
save_metrics: true # Whether to save metrics (*.csv) in out/<method>/<start_time>.

# Whether to delete output files after user press `Ctrl + C`,
Expand All @@ -132,4 +143,4 @@ common:
fedprox:
mu: 0.01
pfedsim:
warmup_round: 0.5
warmup_round: 0.5
13 changes: 12 additions & 1 deletion data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ python generate_data.py -d cifar10 --iid 0.5 --alpha 0.1 -cn 20
Refers to [Measuring the Effects of Non-Identical Data Distribution for Federated Visual Classification (*FedAvgM*)](https://arxiv.org/abs/1909.06335). Dataset would be splitted according to $Dir(\alpha)$. Smaller $\alpha$ means stronger label heterogeneity.

- `--alpha, -a`: The parameter for controlling intensity of label heterogeneity.
- `--least_samples, -ls`: The parameter for defining the minimum number of samples each client would be distributed. *A small `--least_samples` along with small `--alpha` or big `--client_num` might considerablely prolong the partition.*
- `--min_samples_per_client, -ms`: The parameter for defining the minimum number of samples each client would be distributed. *A small `--min_samples_per_client` along with small `--alpha` or big `--client_num` might considerablely prolong the partition.*

```shell
python generate_data.py -d cifar10 -a 0.1 -cn 20
Expand Down Expand Up @@ -109,6 +109,17 @@ python generate_data.py -d cifar10 -sm 1 -cn 20
```
<img src="../.github/images/distributions/semantic.png" alt="Image" width="350"/>

## Flower Partitioner 🌼

This benchmark also supports external partitioners provided by [flwr_datasets](https://flower.ai/docs/datasets/), enabling the comparison with built-in partitioning schemes and additional schemese that exist in flwr_datasets. To use flwr partitioners, you need to specify the class path of the partitioner you want to use and all its parameters in a seperate dictionary.
> \[!NOTE\]
> To use flwr's partitioners, internally a mock dataset is created that has a column called `label`. If the partitioning scheme depends on label information, please insert `label` as the label column.
This is how you would use the `DirichletPartitioner` from flwr:
```shell
python generate_data.py -d cifar10 -cn 10 -fpc "flwr_datasets.partitioner.DirichletPartitioner" -fpk '{"alpha": 100.0, "partition_by": "label"}'
```

# Usage 🚀

## Synthetic Dataset in FedProx
Expand Down
3 changes: 1 addition & 2 deletions data/tune_ratios_manually.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -29,7 +29,6 @@
" ],\n",
" axis=0,\n",
" )\n",
" np.random.shuffle(indices)\n",
" new_testset_size = int(len(indices) * new_testset_ratio)\n",
" new_valset_size = int(len(indices) * new_valset_ratio)\n",
"\n",
Expand Down
22 changes: 20 additions & 2 deletions data/utils/process.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
import json
import os
from argparse import Namespace
Expand Down Expand Up @@ -52,7 +53,7 @@ def prune_args(args: Namespace) -> dict:
args_dict["split"] = preprocess_args["t"]
args_dict["sample_seed"] = preprocess_args["smplseed"]
args_dict["split_seed"] = preprocess_args["spltseed"]
args_dict["least_samples"] = preprocess_args["k"]
args_dict["min_samples_per_client"] = preprocess_args["k"]
args_dict["test_ratio"] = 1.0 - preprocess_args["tf"]
args_dict["val_ratio"] = 0.0
args_dict["monitor_window_name_suffix"] = "{}-{}clients-k{}-{}".fotmat(
Expand All @@ -70,7 +71,7 @@ def prune_args(args: Namespace) -> dict:
# Dirchlet
if args.alpha > 0:
args_dict["alpha"] = args.alpha
args_dict["least_samples"] = args.least_samples
args_dict["min_samples_per_client"] = args.min_samples_per_client
args_dict["monitor_window_name_suffix"] += f"-Dir({args.alpha})"
# randomly assign classes
elif args.classes > 0:
Expand Down Expand Up @@ -551,3 +552,20 @@ def plot_distribution(client_num: int, label_counts: np.ndarray, save_path: str)
ax.spines["top"].set_visible(False)
ax.legend(bbox_to_anchor=(1.2, 1))
plt.savefig(save_path, bbox_inches="tight")

def class_from_string(class_string: str) -> type:
"""
Dynamically loads a class from a string representation.
Args:
class_string (str): The string representation of the class, including the module path.
Returns:
type: The loaded class.
Example:
class_from_string('path.to.module.ClassName') returns the class 'ClassName' from the module 'path.to.module'.
"""
module = importlib.import_module('.'.join(class_string.split('.')[:-1]))
class_ = getattr(module, class_string.split('.')[-1])
return class_
93 changes: 53 additions & 40 deletions data/utils/schemes/dirichlet.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,81 @@
from collections import Counter
from typing import Any, Dict, Set

import numpy as np


def dirichlet(
targets: np.ndarray,
target_indices: np.ndarray,
label_set: set,
label_set: Set[int],
client_num: int,
alpha: float,
least_samples: int,
partition: dict,
stats: dict,
min_samples_per_client: int,
partition: Dict[str, Any],
stats: Dict[int, Dict[str, Any]],
):
"""Partition dataset according to Dirichlet with concentration parameter
`alpha`.
"""Partition the dataset according to the Dirichlet distribution using a
specified concentration parameter, `alpha`.
Args:
targets (np.ndarray): Data label array.
target_indices (np.ndarray): Indices of targets. If you haven't set `--iid`, then it will be np.arange(len(targets))
Otherwise, it will be the absolute indices of the full targets.
label_set (set): Label set.
targets (np.ndarray): Array of data labels.
target_indices (np.ndarray): Indices of targets. If not set to `--iid`, it will be np.arange(len(targets)).
Otherwise, it holds the absolute indices of the full targets.
label_set (Set[int]): Set of unique labels.
client_num (int): Number of clients.
alpha (float): Concentration parameter. Smaller alpha indicates strong data heterogeneity.
least_samples (int): Lease number of data samples each client should have.
partition (Dict): Output data indices dict.
stats (Dict): Output dict that recording clients data distribution.
alpha (float): Concentration parameter; smaller values indicate stronger data heterogeneity.
min_samples_per_client (int): Minimum number of data samples each client should have.
partition (Dict[str, Any]): Dictionary to hold output data indices for each client.
stats (Dict[int, Dict[str, Any]]): Dictionary to record clients' data distribution.
"""

min_size = 0
indices_4_labels = {i: np.where(targets == i)[0] for i in label_set}
# Map each label to its corresponding indices in the target array
indices_per_label = {label: np.where(targets == label)[0] for label in label_set}

while min_size < least_samples:
# Initialize data indices for each client
while min_size < min_samples_per_client:
# Initialize empty lists to hold data indices for each client
partition["data_indices"] = [[] for _ in range(client_num)]

# Iterate over each label in the label set
# Iterate through each label in the label_set
for label in label_set:
# Shuffle the indices associated with the current label
np.random.shuffle(indices_4_labels[label])
# Generate a Dirichlet distribution for splitting data among clients
# Shuffle the indices corresponding to the current label
np.random.shuffle(indices_per_label[label])

# Generate a Dirichlet distribution for partitioning data among clients
distribution = np.random.dirichlet(np.repeat(alpha, client_num))

# Calculate split indices based on the generated distribution
cumulative_indices = np.cumsum(distribution) * len(indices_4_labels[label])
split_indices_position = cumulative_indices.astype(int)[:-1]

# Split the indices for the current label
split_indices = np.split(indices_4_labels[label], split_indices_position)

# Assign split indices to each client

# Calculate the cumulative distribution to get split indices
cumulative_distribution = np.cumsum(distribution) * len(
indices_per_label[label]
)
split_indices_position = cumulative_distribution.astype(int)[:-1]

# Split the indices based on the calculated positions
split_indices = np.split(indices_per_label[label], split_indices_position)

# Assign the split indices to each client
for client_id in range(client_num):
partition["data_indices"][client_id].extend(split_indices[client_id])

# Update the minimum size of the data across all clients
min_size = min(len(idx) for idx in partition["data_indices"])
# Update the minimum number of samples across all clients
min_size = min(len(indices) for indices in partition["data_indices"])

# Gather statistics and prepare the output for each client
for client_id in range(client_num):
stats[client_id]["x"] = len(targets[partition["data_indices"][client_id]])
stats[client_id]["y"] = dict(
Counter(targets[partition["data_indices"][client_id]].tolist())
)

for i in range(client_num):
stats[i]["x"] = len(targets[partition["data_indices"][i]])
stats[i]["y"] = dict(Counter(targets[partition["data_indices"][i]].tolist()))
partition["data_indices"][i] = target_indices[partition["data_indices"][i]]
# Update the data indices to use the original target indices
partition["data_indices"][client_id] = target_indices[
partition["data_indices"][client_id]
]

sample_num = np.array(list(map(lambda stat_i: stat_i["x"], stats.values())))
# Calculate the number of samples for each client and update statistics
sample_counts = np.array([stat["x"] for stat in stats.values()])
stats["samples_per_client"] = {
"std": sample_num.mean().item(),
"stddev": sample_num.std().item(),
"mean": sample_counts.mean().item(),
"stddev": sample_counts.std().item(),
}
52 changes: 52 additions & 0 deletions data/utils/schemes/flower.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

from collections import Counter
import json
import numpy as np
import datasets

from data.utils.process import class_from_string


def flower_partition(
targets: np.ndarray,
target_indices: np.ndarray,
label_set: set,
client_num: int,
flower_partitioner_class: str,
flower_partitioner_kwargs: str,
partition: dict,
stats: dict,
):
target_indices = [i for i in range(len(target_indices)) if targets[i] in label_set]
targets = targets[target_indices]
data = {
"data_indices": target_indices,
"label": targets
}

# Create a Hugging Face Dataset
dataset = datasets.Dataset.from_dict(data)

flower_partitioner_kwargs = json.loads(flower_partitioner_kwargs)
partitioner_class = class_from_string(flower_partitioner_class)
partitioner = partitioner_class(num_partitions=client_num, **flower_partitioner_kwargs)

# Assign the dataset to the partitioner
partitioner.dataset = dataset
num_samples = []

# Print each partition and the samples it contains
for i in range(client_num):
partition_i = partitioner.load_partition(i)
indices = partition_i["data_indices"]
partition["data_indices"][i] = indices
stats[i] = {"x": None, "y": None}
stats[i]["x"] = len(indices)
stats[i]["y"] = dict(Counter(targets[indices].tolist()))
num_samples.append(len(partition_i))

num_samples = np.array(num_samples)
stats["samples_per_client"] = {
"std": num_samples.mean().item(),
"stddev": num_samples.std().item(),
}
Loading

0 comments on commit d0168f6

Please sign in to comment.