diff --git a/experiments/data.zip b/experiments/data.zip new file mode 100644 index 0000000..4e0afc7 Binary files /dev/null and b/experiments/data.zip differ diff --git a/experiments/hyperparams.py b/experiments/hyperparams.py new file mode 100644 index 0000000..d391e7b --- /dev/null +++ b/experiments/hyperparams.py @@ -0,0 +1,143 @@ + +LogRegClient_params = { + 'default': { + 'lr_patience': 5, + 'es_patience': 15, + 'n_rounds': 10, + 'eval_every': 10, + 'batch_size': 512, + 'optimizer': { + 'SGD': { + 'lr': 0.01, + 'momentum': 0.0, + 'weight_decay': 0.0, + 'dampening': 0.0, + } + }, + 'criterion': { + 'ClassBalancedLoss': { + 'gamma': 0.9 + } + } + }, + 'search_space': { + 'batch_size': [128, 256, 512], + 'optimizer': { + 'SGD': { + 'lr': (0.001, 0.1), + 'momentum': (0.0, 1.0), + 'weight_decay': (0.0, 1.0), + 'dampening': (0.0, 1.0), + }, + 'Adam': { + 'lr': (0.001, 0.1), + 'weight_decay': (0.0, 1.0), + 'amsgrad': [True, False] + } + }, + 'criterion': { + 'ClassBalancedLoss': { + 'gamma': (0.5, 0.9999) + }, + 'CrossEntropyLoss': {} + } + } +} + +DecisionTreeClient_params = { + 'default': { + 'criterion': 'gini', + 'splitter': 'best', + 'max_depth': None, + 'random_state': 42, + 'class_weight': 'balanced' + }, + 'search_space': { + 'criterion': ['gini', 'entropy', 'log_loss'], + 'splitter': ['best', 'random'], + 'max_depth': (1, 100), + 'class_weight': ['balanced', None] + } +} + +RandomForestClient_params = { + 'default': { + 'n_estimators': 100, + 'criterion': 'gini', + 'max_depth': None, + 'random_state': 42, + 'class_weight': 'balanced' + }, + 'search_space': { + 'n_estimators': (10, 1000), + 'criterion': ['gini', 'entropy', 'log_loss'], + 'max_depth': (1, 100), + 'class_weight': ['balanced', None] + } +} + +GradientBoostingClient_params = { + 'default': { + 'loss': 'log_loss', + 'learning_rate': 0.1, + 'n_estimators': 100, + 'criterion': 'friedman_mse', + 'max_depth': 3, + 'random_state': 42 + }, + 'search_space': { + 'loss': ['log_loss', 'exponential'], + 'learning_rate': (0.01, 1.0), + 'n_estimators': (10, 200), + 'criterion': ['friedman_mse', 'squared_error'], + 'max_depth': (2, 100), + 'random_state': 42 + } +} + +SVMClient_params = { + 'default': { + 'C': 1.0, + 'kernel': 'rbf', + 'degree': 3, + 'gamma': 'scale', + 'coef0': 0.0, + 'shrinking': True, + 'probability': False, + 'class_weight': 'balanced', + 'cache_size': 7000, + 'max_iter': 1000, + 'random_state': 42 + }, + 'search_space': { + 'C': (0.1, 10.0), + 'kernel': ['linear', 'poly', 'rbf', 'sigmoid'], + 'degree': (2, 5), + 'gamma': ['scale', 'auto'], + 'coef0': (0.0, 1.0), + 'shrinking': [True, False], + 'probability': [False, True], + 'class_weight': ['balanced', None], + 'random_state': 42 + } +} + +KNNClient_params = { + 'default': { + 'n_neighbors': 5, + 'weights': 'uniform', + 'algorithm': 'auto', + 'leaf_size': 30, + 'p': 2, + 'metric': 'minkowski', + 'n_jobs': -1 + }, + 'search_space': { + 'n_neighbors': (3, 100), + 'weights': ['uniform', 'distance'], + 'algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute'], + 'leaf_size': (10, 50), + 'p': [1, 2], + 'metric': 'minkowski' + } +} \ No newline at end of file diff --git a/experiments/train.py b/experiments/train.py index 47029bf..d2d5132 100644 --- a/experiments/train.py +++ b/experiments/train.py @@ -33,16 +33,17 @@ def main(): parser.add_argument('--seed', type=int, help='Seed.', default=42) parser.add_argument('--n_rounds', type=int, help='Number of traning rounds.', default=3) parser.add_argument('--eval_every', type=int, help='Number of rounds between evaluations.', default=1) - parser.add_argument('--local_epochs', type=int, help='Number of local epochs at clients.', default=1) + #parser.add_argument('--local_epochs', type=int, help='Number of local epochs at clients.', default=1) parser.add_argument('--batch_size', type=int, help='Batch size.', default=512) parser.add_argument('--lr', type=float, help='Learning rate.', default=0.02) parser.add_argument('--n_workers', type=int, help='Number of processes.', default=3) parser.add_argument('--device', type=str, help='Device for computations. Can be "cpu" or cuda device, e.g. "cuda:0".', default="cuda:0") + parser.add_argument('--lr_patience', type=int, help='Number of epochs to wait before reducing learning rate.', default=5) + parser.add_argument('--es_patience', type=int, help='Number of epochs to wait before early stopping.', default=15) parser.add_argument('--results_dir', type=str, default='/home/edvin/Desktop/flib/experiments/results/3_banks_homo_easy/') args = parser.parse_args() - print() - print(f'clients: {args.clients}') + print(f'\nclients: {args.clients}') print(f'settings: {args.settings}') print(f'traindata files:') for traindata_file in args.traindata_files: @@ -60,13 +61,12 @@ def main(): print(f'seed: {args.seed}') print(f'n_rounds: {args.n_rounds}') print(f'eval_every: {args.eval_every}') - print(f'local_epochs: {args.local_epochs}') + #print(f'local_epochs: {args.local_epochs}') print(f'batch_size: {args.batch_size}') print(f'lr: {args.lr}') print(f'n_workers: {args.n_workers}') print(f'device: {args.device}') - print(f'results_dir: {args.results_dir}') - print() + print(f'results_dir: {args.results_dir}\n') train_dfs = [] val_dfs = [] @@ -101,8 +101,8 @@ def main(): criterion=args.criterion, n_epochs=args.n_rounds, eval_every=args.eval_every, - lr_patience=5, - es_patience=15, + lr_patience=args.lr_patience, + es_patience=args.es_patience, optimizer=args.optimizer, beta=args.beta, batch_size=args.batch_size, @@ -117,8 +117,7 @@ def main(): os.makedirs(results_dir, exist_ok=True) with open(os.path.join(results_dir, 'results.pkl'), 'wb') as f: pickle.dump(results, f) - print(f'Saved results to {results_dir}/results.pkl') - print() + print(f'Saved results to {results_dir}/results.pkl\n') if 'federated' in args.settings: print(f'Training {client} in federated setting.') t = time.time() @@ -145,8 +144,7 @@ def main(): os.makedirs(results_dir, exist_ok=True) with open(os.path.join(results_dir, 'results.pkl'), 'wb') as f: pickle.dump(results, f) - print(f'Saved results to {results_dir}/results.pkl') - print() + print(f'Saved results to {results_dir}/results.pkl\n') if 'isolated' in args.settings: print(f'Training {client} in isolated setting') t = time.time() @@ -175,8 +173,7 @@ def main(): os.makedirs(results_dir, exist_ok=True) with open(os.path.join(results_dir, 'results.pkl'), 'wb') as f: pickle.dump(results, f) - print(f'Saved results to {results_dir}/results.pkl') - print() + print(f'Saved results to {results_dir}/results.pkl\n') if __name__ == '__main__': main() diff --git a/experiments/tune_hyperparams.py b/experiments/tune_hyperparams.py index 7b61bc8..cd9a0da 100644 --- a/experiments/tune_hyperparams.py +++ b/experiments/tune_hyperparams.py @@ -1,77 +1,171 @@ import argparse -from flib.train.federated import HyperparamTuner +import os +import time + +import pandas as pd + +from flib.train import centralized, federated, isolated, HyperparamTuner +import hyperparams def main(): parser = argparse.ArgumentParser() - parser.add_argument('--models', nargs='+', help='Types of models to train.', default=['LogisticRegressor']) - parser.add_argument('--settings', nargs='+', help='Types of settings to use. Can be "iso", "cen" or "fed".', default=['fed']) - parser.add_argument('--trainsets', nargs='+', help='Paths to trainsets.', default=[ - '/home/edvin/Desktop/flib/experiments/data/3_banks_homo_hard/preprocessed/a_nodes_train.csv', - '/home/edvin/Desktop/flib/experiments/data/3_banks_homo_hard/preprocessed/b_nodes_train.csv', - '/home/edvin/Desktop/flib/experiments/data/3_banks_homo_hard/preprocessed/c_nodes_train.csv' + parser.add_argument('--clients', nargs='+', help='Types of clients to train.', default=['LogRegClient', 'DecisionTreeClient', 'RandomForestClient', 'GradientBoostingClient', 'SVMClient', 'KNNClient']) # LogRegClient, DecisionTreeClient, RandomForestClient, GradientBoostingClient, SVMClient, KNNClient + parser.add_argument('--settings', nargs='+', help='Types of settings to use. Can be "isolated", "centralized" or "federated".', default=['centralized', 'federated', 'isolated']) + parser.add_argument('--traindata_files', nargs='+', help='Paths to trainsets.', default=[ + '/home/edvin/Desktop/flib/experiments/data/3_banks_homo_mid/preprocessed/a_nodes_train.csv', + '/home/edvin/Desktop/flib/experiments/data/3_banks_homo_mid/preprocessed/b_nodes_train.csv', + '/home/edvin/Desktop/flib/experiments/data/3_banks_homo_mid/preprocessed/c_nodes_train.csv' ]) - parser.add_argument('--optimizer', nargs='+', help='', default=['SGD']) - parser.add_argument('--criterion', nargs='+', help='', default=['ClassBalancedLoss']) - parser.add_argument('--beta', nargs='+', help='Value of beta for ClassBalancedLoss.', default=[0.9999, 0.9999999999]) - parser.add_argument('--seed', type=int, help='Seed.', default=42) - parser.add_argument('--n_rounds', type=int, help='Number of traning rounds.', default=30) - parser.add_argument('--local_epochs', nargs='+', help='Number of local epochs at clients.', default=[1]) - parser.add_argument('--batch_size', nargs='+', help='Batch size.', default=[128, 256, 512]) - parser.add_argument('--lr', nargs='+', help='Learning rate.', default=[0.0001, 1.0]) + parser.add_argument('--valdata_files', nargs='+', help='Paths to valsets', default=[ + None, + None, + None + ]) + parser.add_argument('--valset_size', type=float, default=0.2) + parser.add_argument('--n_workers', type=int, help='Number of processes.', default=3) parser.add_argument('--device', type=str, help='Device for computations. Can be "cpu" or cuda device, e.g. "cuda:0".', default="cuda:0") - parser.add_argument('--results_file', type=str, default='/home/edvin/Desktop/flib/experiments/results/3_banks_homo_hard/federated/best_params.txt') - parser.add_argument('--storage', type=str, default='sqlite:////home/edvin/Desktop/flib/experiments/results/3_banks_homo_hard/federated/study.db') - args = parser.parse_args() + parser.add_argument('--seed', type=int, help='Seed.', default=42) + parser.add_argument('--n_trials', type=int, help='Number of trials.', default=10) + parser.add_argument('--results_dir', type=str, default='/home/edvin/Desktop/flib/experiments/results/3_banks_homo_mid/') - args.beta = tuple(args.beta) - args.lr = tuple(args.lr) + args = parser.parse_args() print() - print(f'models: {args.models}') + print(f'clients: {args.clients}') print(f'settings: {args.settings}') - print(f'trainsets:') - for trainset in args.trainsets: - print(f' {trainset}') - print(f'optimizer: {args.optimizer}') - print(f'criterion: {args.criterion}') - print(f'beta: {args.beta}') - print(f'seed: {args.seed}') - print(f'n_rounds: {args.n_rounds}') - print(f'local_epochs: {args.local_epochs}') - print(f'batch_size: {args.batch_size}') - print(f'lr: {args.lr}') + print(f'traindata files:') + for traindata_file in args.traindata_files: + print(f' {traindata_file}') + print(f'valdata files:') + for valdata_file in args.valdata_files: + print(f' {valdata_file}') + print(f'valset_size: {args.valset_size}') print(f'n_workers: {args.n_workers}') print(f'device: {args.device}') - print(f'results_file: {args.results_file}') - print(f'storage: {args.storage}') + print(f'results_dir: {args.results_dir}') + print(f'n_trials: {args.n_trials}') print() - for model in args.models: - if 'cen' in args.settings: - pass - if 'fed' in args.settings: - print(f'Turning hyperparameters for {model} in a federated setting.') + train_dfs = [] + val_dfs = [] + for traindata_file, valdata_file in zip(args.traindata_files, args.valdata_files): + train_df = pd.read_csv(traindata_file).drop(columns=['account', 'bank']) + if valdata_file is not None: + val_df = pd.read_csv(valdata_file).drop(columns=['account', 'bank']) + elif args.valset_size is not None: + val_df = train_df.sample(frac=args.valset_size, random_state=args.seed) + train_df = train_df.drop(val_df.index) + else: + val_dfs = None + train_dfs.append(train_df) + val_dfs.append(val_df) + + for client in args.clients: + if 'centralized' in args.settings: + print(f'\nTurning hyperparameters for {client} in a centralized setting.') + t = time.time() + study_name = f'{client}_centralized' + os.makedirs(os.path.join(args.results_dir, f'centralized/{client}'), exist_ok=True) + storage = 'sqlite:///' + os.path.join(args.results_dir, f'centralized/{client}/hp_study.db') + hyperparamtuner = HyperparamTuner( + study_name=study_name, + obj_fn=centralized, + train_dfs=train_dfs, + val_dfs=val_dfs, + seed=args.seed, + storage=storage, + client=client, + n_workers = args.n_workers, + device = args.device, + params = getattr(hyperparams, f'{client}_params') + ) + best_trials = hyperparamtuner.optimize(n_trials=args.n_trials) + t = time.time() - t + print('Done') + print(f'Exec time: {t:.2f}s') + best_trials_file = os.path.join(args.results_dir, f'centralized/{client}/best_trials.txt') + with open(best_trials_file, 'w') as f: + for trial in best_trials: + print(f'\ntrial: {trial.number}') + f.write(f'\ntrial: {trial.number}\n') + print(f'values: {trial.values}') + f.write(f'values: {trial.values}\n') + for param in trial.params: + f.write(f'{param}: {trial.params[param]}\n') + print(f'{param}: {trial.params[param]}') + print() + + if 'federated' in args.settings and client == 'LogRegClient': + print(f'\nTurning hyperparameters for {client} in a federated setting.') + t = time.time() + study_name = f'{client}_federated' + os.makedirs(os.path.join(args.results_dir, f'federated/{client}'), exist_ok=True) + storage = 'sqlite:///' + os.path.join(args.results_dir, f'federated/{client}/hp_study.db') hyperparamtuner = HyperparamTuner( + study_name=study_name, + obj_fn=federated, + train_dfs=train_dfs, + val_dfs=val_dfs, seed=args.seed, - trainsets=args.trainsets, - n_rounds=args.n_rounds, - model=model, - optimizer=args.optimizer, - criterion=args.criterion, - batch_size=args.batch_size, n_workers=args.n_workers, - device=args.device, - storage=args.storage, - results_file=args.results_file + device = args.device, + storage=storage, + client=client, + params = getattr(hyperparams, f'{client}_params') ) - best_params, best_value = hyperparamtuner.optimize(n_trials=20) - print(f'Best hyperparameters: {best_params}') - print(f'Best value: {best_value}') + best_trials = hyperparamtuner.optimize(n_trials=args.n_trials) + t = time.time() - t + print('Done') + print(f'Exec time: {t:.2f}s') + best_trials_file = os.path.join(args.results_dir, f'federated/{client}/best_trials.txt') + with open(best_trials_file, 'w') as f: + for trial in best_trials: + print(f'\ntrial: {trial.number}') + f.write(f'\ntrial: {trial.number}\n') + print(f'values: {trial.values}') + f.write(f'values: {trial.values}\n') + for param in trial.params: + f.write(f'{param}: {trial.params[param]}\n') + print(f'{param}: {trial.params[param]}') + print() + + if 'isolated' in args.settings: + print(f'\nTurning hyperparameters for {client} in a isolated setting.') + t = time.time() + for i, (train_df, val_df) in enumerate(zip(train_dfs, val_dfs)): + study_name = f'{client}_isolated' + os.makedirs(os.path.join(args.results_dir, f'isolated/{client}/c{i}'), exist_ok=True) + storage = 'sqlite:///' + os.path.join(args.results_dir, f'isolated/{client}/c{i}/hp_study.db') + hyperparamtuner = HyperparamTuner( + study_name=study_name, + obj_fn=isolated, + train_dfs=[train_df], + val_dfs=[val_df], + seed=args.seed, + storage=storage, + client=client, + n_workers = args.n_workers, + device = args.device, + params = getattr(hyperparams, f'{client}_params') + ) + best_trials = hyperparamtuner.optimize(n_trials=args.n_trials) + + best_trials_file = os.path.join(args.results_dir, f'isolated/{client}/c{i}/best_trials.txt') + with open(best_trials_file, 'w') as f: + for trial in best_trials: + print(f'\ntrial: {trial.number}') + f.write(f'\ntrial: {trial.number}\n') + print(f'values: {trial.values}') + f.write(f'values: {trial.values}\n') + for param in trial.params: + f.write(f'{param}: {trial.params[param]}\n') + print(f'{param}: {trial.params[param]}') - if 'iso' in args.settings: - pass + t = time.time() - t + print('Done') + print(f'Exec time: {t:.2f}s\n') if __name__ == '__main__': main() diff --git a/flib/train/Clients/clients.py b/flib/train/Clients/clients.py index b6581fb..f36f8df 100644 --- a/flib/train/Clients/clients.py +++ b/flib/train/Clients/clients.py @@ -1,15 +1,18 @@ import torch -from sklearn.metrics import confusion_matrix, roc_curve +from sklearn.metrics import confusion_matrix, roc_auc_score from flib.utils import tensordatasets, dataloaders, decrease_lr from flib.train.models import LogisticRegressor from flib.train import criterions from sklearn.tree import DecisionTreeClassifier +from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier +from sklearn.svm import SVC +from sklearn.neighbors import KNeighborsClassifier import pandas as pd from sklearn.preprocessing import StandardScaler from tqdm import tqdm class LogRegClient(): - def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, device:str='cpu', batch_size=64, optimizer='SGD', criterion='ClassBalancedLoss', lr=0.01, **kwargs): + def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, device:str='cpu', batch_size=64, optimizer='SGD', optimizer_params={}, criterion='ClassBalancedLoss', criterion_params={}, **kwargs): self.name = name self.device = device @@ -20,9 +23,12 @@ def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, te output_dim = self.trainset.tensors[1].unique().shape[0] self.model = LogisticRegressor(input_dim=input_dim, output_dim=output_dim).to(self.device) - self.optimizer = getattr(torch.optim, optimizer)(self.model.parameters(), lr=lr) - n_samples_per_classes = [sum(self.trainset.tensors[1] == 0).detach().cpu().numpy(), sum(self.trainset.tensors[1] == 1).detach().cpu().numpy()] - self.criterion = getattr(criterions, criterion)(n_samples_per_classes=n_samples_per_classes, **kwargs) + self.optimizer = getattr(torch.optim, optimizer)(self.model.parameters(), **optimizer_params) + if criterion == 'ClassBalancedLoss': + n_samples_per_classes = [sum(self.trainset.tensors[1] == 0).detach().cpu().numpy(), sum(self.trainset.tensors[1] == 1).detach().cpu().numpy()] + self.criterion = criterions.ClassBalancedLoss(n_samples_per_classes=n_samples_per_classes, **criterion_params) + else: + self.criterion = getattr(torch.nn, criterion)(**criterion_params) def train(self, state_dict=None): if state_dict: @@ -53,6 +59,8 @@ def evaluate(self, state_dict=None, dataset='testset'): elif dataset == 'valset': dataloader = self.valloader elif dataset == 'testset': + if self.testloader == None: + return None, None dataloader = self.testloader self.model.eval() loss = 0.0 @@ -69,42 +77,46 @@ def evaluate(self, state_dict=None, dataset='testset'): tpfptnfn[threshold]['fn'] += cm[1,0] / len(dataloader) return loss, tpfptnfn - def run(self, state_dict=None, n_epochs=100, eval_every=10, lr_patience=5, es_patience=15, **kwargs): + def run(self, state_dict=None, n_rounds=100, eval_every=10, lr_patience=5, es_patience=15, **kwargs): if state_dict: self.model.load_state_dict(state_dict) + lr_patience_reset = lr_patience + es_patience_reset = es_patience results_dict = {0: {}} loss, tpfptnfn = self.evaluate(dataset='trainset') results_dict[0]['train'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - previous_loss = loss - if eval_every is not None: - if self.valset is not None: - loss, tpfptnfn = self.evaluate(dataset='valset') - results_dict[0]['val'] = {'loss': loss, 'tpfptnfn': tpfptnfn} + previous_train_loss = loss + if eval_every is not None and self.valset is not None: + loss, tpfptnfn = self.evaluate(dataset='valset') + results_dict[0]['val'] = {'loss': loss, 'tpfptnfn': tpfptnfn} + previous_val_loss = loss - for epoch in tqdm(range(1, n_epochs+1), desc='progress', leave=False): + for epoch in tqdm(range(1, n_rounds+1), desc='progress', leave=False): loss, tpfptnfn = self.train() results_dict[epoch] = {'train': {'loss': loss, 'tpfptnfn': tpfptnfn}} - - if loss >= previous_loss - 0.0005: + if loss >= previous_train_loss - 0.0005: lr_patience -= 1 - es_patience -= 1 else: - lr_patience = 5 - es_patience = 15 - previous_loss = loss - - if eval_every is not None and epoch % eval_every == 0: - if self.valset is not None: - loss, tpfptnfn = self.evaluate(dataset='valset') - results_dict[epoch]['val'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - + lr_patience = lr_patience_reset if lr_patience <= 0: + tqdm.write('Decreasing learning rate.') decrease_lr(self.optimizer, factor=0.5) + lr_patience = lr_patience_reset + previous_train_loss = loss - if es_patience <= 0 and (eval_every is None or epoch % eval_every == 0): - break + if eval_every is not None and epoch % eval_every == 0 and self.valset is not None: + loss, tpfptnfn = self.evaluate(dataset='valset') + results_dict[epoch]['val'] = {'loss': loss, 'tpfptnfn': tpfptnfn} + if loss >= previous_val_loss - 0.0005: + es_patience -= eval_every + else: + es_patience = es_patience_reset + if es_patience <= 0: + tqdm.write('Early stopping.') + break + previous_val_loss = loss if eval_every is not None and self.testset is not None: loss, tpfptnfn = self.evaluate(dataset='testset') @@ -123,186 +135,359 @@ def get_state_dict(self): model[key] = value.detach().cpu() return model -class OldLogRegClient(): - def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, device:str='cpu', n_epochs:int=30, eval_every:int=10, batch_size:int=64, optimizer:str='SGD', criterion:str='ClassBalancedLoss', lr:float=0.01, **kwargs): + +class DecisionTreeClient(): + def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, criterion='gini', splitter='best', max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0, max_features=None, max_leaf_nodes=None, min_impurity_decrease=0, class_weight='balanced', random_state =42, **kwargs): self.name = name - self.device = device - self.n_epochs = n_epochs - self.eval_every = eval_every - self.batch_size = batch_size - self.lr = lr - self.trainset, self.valset, self.testset = tensordatasets(train_df, val_df, test_df, normalize=True, device=self.device) - self.trainloader, self.valloader, self.testloader = dataloaders(self.trainset, self.valset, self.testset, self.batch_size) - input_dim = self.trainset.tensors[0].shape[1] - output_dim = self.trainset.tensors[1].unique().shape[0] - - self.model = LogisticRegressor(input_dim=input_dim, output_dim=output_dim).to(self.device) - self.optimizer = getattr(torch.optim, optimizer)(self.model.parameters(), lr=self.lr) - if criterion == 'ClassBalancedLoss': - n_samples_per_classes = [sum(self.trainset.tensors[1] == 0).detach().cpu().numpy(), sum(self.trainset.tensors[1] == 1).detach().cpu().numpy()] - self.criterion = getattr(criterions, criterion)(beta=kwargs.get('beta', 0.9), n_samples_per_classes=n_samples_per_classes, loss_type='sigmoid') + self.X_train = train_df.drop(columns=['is_sar']).to_numpy() + self.y_train = train_df['is_sar'].to_numpy() + scaler = StandardScaler() + self.X_train = scaler.fit_transform(self.X_train) + if val_df is not None: + self.X_val = val_df.drop(columns=['is_sar']).to_numpy() + self.X_val = scaler.transform(self.X_val) + self.y_val = val_df['is_sar'].to_numpy() + else: + self.X_val = None + self.y_val = None + if test_df is not None: + self.X_test = test_df.drop(columns=['is_sar']).to_numpy() + self.X_test = scaler.transform(self.X_test) + self.y_test = test_df['is_sar'].to_numpy() else: - self.criterion = getattr(criterions, criterion)() + self.X_test = None + self.y_test = None + + self.model = DecisionTreeClassifier( + criterion=criterion, + splitter=splitter, + max_depth=max_depth, + min_samples_split=min_samples_split, + min_samples_leaf=min_samples_leaf, + min_weight_fraction_leaf=min_weight_fraction_leaf, + max_features=max_features, + max_leaf_nodes=max_leaf_nodes, + min_impurity_decrease=min_impurity_decrease, + class_weight=class_weight, + random_state=random_state + ) + + def run(self, **kwargs): + self.train() + train_roc_auc, train_tpfptnfn = self.evaluate(dataset='trainset') + val_roc_auc, val_tpfptnfn = self.evaluate(dataset='valset') + test_roc_auc, test_tpfptnfn = self.evaluate(dataset='testset') + results = {0: {'train': {'loss': train_roc_auc, 'tpfptnfn': train_tpfptnfn}, 'val': {'loss': val_roc_auc, 'tpfptnfn': val_tpfptnfn}, 'test': {'loss': test_roc_auc, 'tpfptnfn': test_tpfptnfn}}} + return results + + def train(self): + self.model.fit(self.X_train, self.y_train) + return + + def evaluate(self, dataset='trainset'): + if dataset == 'trainset': + X, y = self.X_train, self.y_train + elif dataset == 'valset': + if self.X_val is None: + return None, None + else: + X, y = self.X_val, self.y_val + elif dataset == 'testset': + if self.X_test is None: + return None, None + else: + X, y = self.X_test, self.y_test + y_pred = self.model.predict_proba(X) + roc_auc = roc_auc_score(y, y_pred[:,1]) + tpfptnfn = {threshold: {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0} for threshold in range(0, 101)} + for threshold in range(0, 101): + cm = confusion_matrix(y, (y_pred[:,1] > (threshold / 100)), labels=[0, 1], normalize='all') + tpfptnfn[threshold]['tp'] = cm[1,1] + tpfptnfn[threshold]['fp'] = cm[0,1] + tpfptnfn[threshold]['tn'] = cm[0,0] + tpfptnfn[threshold]['fn'] = cm[1,0] + return -roc_auc, tpfptnfn + + def get_state_dict(self): + return None + + def load_state_dict(self, state_dict): + return - def run(self, state_dict=None): - lr_patience = 5 - es_patience = 15 + +class RandomForestClient(): + def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, n_estimators=100, criterion='gini', max_depth=None, class_weight='balanced', random_state =42, **kwargs): + self.name = name - if state_dict: - self.model.load_state_dict(state_dict) + self.X_train = train_df.drop(columns=['is_sar']).to_numpy() + self.y_train = train_df['is_sar'].to_numpy() + scaler = StandardScaler() + self.X_train = scaler.fit_transform(self.X_train) + if val_df is not None: + self.X_val = val_df.drop(columns=['is_sar']).to_numpy() + self.X_val = scaler.transform(self.X_val) + self.y_val = val_df['is_sar'].to_numpy() + else: + self.X_val = None + self.y_val = None + if test_df is not None: + self.X_test = test_df.drop(columns=['is_sar']).to_numpy() + self.X_test = scaler.transform(self.X_test) + self.y_test = test_df['is_sar'].to_numpy() + else: + self.X_test = None + self.y_test = None - results_dict = {0: {}} - loss, tpfptnfn = self.evaluate(dataset='trainset') - results_dict[0]['train'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - previous_loss = loss - if self.eval_every is not None: - if self.valset is not None: - loss, tpfptnfn = self.evaluate(dataset='valset') - results_dict[0]['val'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - - for epoch in range(1, self.n_epochs+1): - - loss, tpfptnfn = self.train() - results_dict[epoch] = {'train': {'loss': loss, 'tpfptnfn': tpfptnfn}} - - if loss >= previous_loss - 0.0005: - lr_patience -= 1 - es_patience -= 1 + self.model = RandomForestClassifier( + n_estimators=n_estimators, + criterion=criterion, + max_depth=max_depth, + class_weight=class_weight, + random_state=random_state + ) + + def run(self, **kwargs): + self.train() + train_roc_auc, train_tpfptnfn = self.evaluate(dataset='trainset') + val_roc_auc, val_tpfptnfn = self.evaluate(dataset='valset') + test_roc_auc, test_tpfptnfn = self.evaluate(dataset='testset') + results = {0: {'train': {'loss': train_roc_auc, 'tpfptnfn': train_tpfptnfn}, 'val': {'loss': val_roc_auc, 'tpfptnfn': val_tpfptnfn}, 'test': {'loss': test_roc_auc, 'tpfptnfn': test_tpfptnfn}}} + return results + + def train(self): + self.model.fit(self.X_train, self.y_train) + return + + def evaluate(self, dataset='trainset'): + if dataset == 'trainset': + X, y = self.X_train, self.y_train + elif dataset == 'valset': + if self.X_val is None: + return None, None else: - lr_patience = 5 - es_patience = 15 - previous_loss = loss - - if self.eval_every is not None and epoch % self.eval_every == 0: - if self.valset is not None: - loss, tpfptnfn = self.evaluate(dataset='valset') - results_dict[epoch]['val'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - - - if lr_patience <= 0: - for param_group in self.optimizer.param_groups: - param_group['lr'] *= 0.5 - self.lr *= 0.5 - - if es_patience <= 0 and (self.eval_every is None or epoch % self.eval_every == 0): - break - - if self.eval_every is not None and self.testset is not None: - loss, tpfptnfn = self.evaluate(dataset='testset') - results_dict[epoch]['test'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - - return results_dict - - def train(self, state_dict=None): - if state_dict: - self.model.load_state_dict(state_dict) - self.model.train() - loss = 0.0 + X, y = self.X_val, self.y_val + elif dataset == 'testset': + if self.X_test is None: + return None, None + else: + X, y = self.X_test, self.y_test + y_pred = self.model.predict_proba(X) + roc_auc = roc_auc_score(y, y_pred[:,1]) tpfptnfn = {threshold: {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0} for threshold in range(0, 101)} - for x_batch, y_batch in self.trainloader: - self.optimizer.zero_grad() - y_pred = self.model(x_batch) - l = self.criterion(y_pred, y_batch) - l.backward() - self.optimizer.step() - loss += l.item() / len(self.trainloader) - for threshold in range(0, 101): - cm = confusion_matrix(y_batch.cpu(), (y_pred[:,1] > (threshold / 100)).to(torch.int64).cpu(), labels=[0, 1], normalize='all') - tpfptnfn[threshold]['tp'] += cm[1,1] / len(self.trainloader) - tpfptnfn[threshold]['fp'] += cm[0,1] / len(self.trainloader) - tpfptnfn[threshold]['tn'] += cm[0,0] / len(self.trainloader) - tpfptnfn[threshold]['fn'] += cm[1,0] / len(self.trainloader) - return loss, tpfptnfn + for threshold in range(0, 101): + cm = confusion_matrix(y, (y_pred[:,1] > (threshold / 100)), labels=[0, 1], normalize='all') + tpfptnfn[threshold]['tp'] = cm[1,1] + tpfptnfn[threshold]['fp'] = cm[0,1] + tpfptnfn[threshold]['tn'] = cm[0,0] + tpfptnfn[threshold]['fn'] = cm[1,0] + return -roc_auc, tpfptnfn - def evaluate(self, state_dict=None, dataset='testset'): - if state_dict: - self.model.load_state_dict(state_dict) + def get_state_dict(self): + return None + + def load_state_dict(self, state_dict): + return + + +class GradientBoostingClient(): + def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, loss='log_loss', learning_rate=0.1, n_estimators=100, criterion='friedman_mse', max_depth=3, random_state=42, **kwargs): + self.name = name + + self.X_train = train_df.drop(columns=['is_sar']).to_numpy() + self.y_train = train_df['is_sar'].to_numpy() + scaler = StandardScaler() + self.X_train = scaler.fit_transform(self.X_train) + if val_df is not None: + self.X_val = val_df.drop(columns=['is_sar']).to_numpy() + self.X_val = scaler.transform(self.X_val) + self.y_val = val_df['is_sar'].to_numpy() + else: + self.X_val = None + self.y_val = None + if test_df is not None: + self.X_test = test_df.drop(columns=['is_sar']).to_numpy() + self.X_test = scaler.transform(self.X_test) + self.y_test = test_df['is_sar'].to_numpy() + else: + self.X_test = None + self.y_test = None + + self.model = GradientBoostingClassifier( + loss=loss, + learning_rate=learning_rate, + n_estimators=n_estimators, + criterion=criterion, + max_depth=max_depth, + random_state=random_state + ) + + def run(self, **kwargs): + self.train() + train_roc_auc, train_tpfptnfn = self.evaluate(dataset='trainset') + val_roc_auc, val_tpfptnfn = self.evaluate(dataset='valset') + test_roc_auc, test_tpfptnfn = self.evaluate(dataset='testset') + results = {0: {'train': {'loss': train_roc_auc, 'tpfptnfn': train_tpfptnfn}, 'val': {'loss': val_roc_auc, 'tpfptnfn': val_tpfptnfn}, 'test': {'loss': test_roc_auc, 'tpfptnfn': test_tpfptnfn}}} + return results + + def train(self): + self.model.fit(self.X_train, self.y_train) + return + + def evaluate(self, dataset='trainset'): if dataset == 'trainset': - dataloader = self.trainloader + X, y = self.X_train, self.y_train elif dataset == 'valset': - dataloader = self.valloader + if self.X_val is None: + return None, None + else: + X, y = self.X_val, self.y_val elif dataset == 'testset': - dataloader = self.testloader - self.model.eval() - loss = 0.0 + if self.X_test is None: + return None, None + else: + X, y = self.X_test, self.y_test + y_pred = self.model.predict_proba(X) + roc_auc = roc_auc_score(y, y_pred[:,1]) tpfptnfn = {threshold: {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0} for threshold in range(0, 101)} - with torch.no_grad(): - for x_batch, y_batch in dataloader: - y_pred = self.model(x_batch) - loss += self.criterion(y_pred, y_batch).item() / len(dataloader) - for threshold in range(0, 101): - cm = confusion_matrix(y_batch.cpu(), (y_pred[:,1] > (threshold / 100)).to(torch.int64).cpu(), labels=[0, 1], normalize='all') - tpfptnfn[threshold]['tp'] += cm[1,1] / len(dataloader) - tpfptnfn[threshold]['fp'] += cm[0,1] / len(dataloader) - tpfptnfn[threshold]['tn'] += cm[0,0] / len(dataloader) - tpfptnfn[threshold]['fn'] += cm[1,0] / len(dataloader) - return loss, tpfptnfn + for threshold in range(0, 101): + cm = confusion_matrix(y, (y_pred[:,1] > (threshold / 100)), labels=[0, 1], normalize='all') + tpfptnfn[threshold]['tp'] = cm[1,1] + tpfptnfn[threshold]['fp'] = cm[0,1] + tpfptnfn[threshold]['tn'] = cm[0,0] + tpfptnfn[threshold]['fn'] = cm[1,0] + return -roc_auc, tpfptnfn + + def get_state_dict(self): + return None def load_state_dict(self, state_dict): - for key, value in state_dict.items(): - state_dict[key] = value.to(self.device) - self.model.load_state_dict(state_dict) + return + + +class SVMClient(): + def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, C=1.0, kernel='rbf', degree=3, gamma='scale', coef0=0.0, shrinking=True, probability=False, class_weight='balanced', random_state=42, cache_size=200, max_iter=-1, **kwargs): + self.name = name + + self.X_train = train_df.drop(columns=['is_sar']).to_numpy() + self.y_train = train_df['is_sar'].to_numpy() + scaler = StandardScaler() + self.X_train = scaler.fit_transform(self.X_train) + if val_df is not None: + self.X_val = val_df.drop(columns=['is_sar']).to_numpy() + self.X_val = scaler.transform(self.X_val) + self.y_val = val_df['is_sar'].to_numpy() + else: + self.X_val = None + self.y_val = None + if test_df is not None: + self.X_test = test_df.drop(columns=['is_sar']).to_numpy() + self.X_test = scaler.transform(self.X_test) + self.y_test = test_df['is_sar'].to_numpy() + else: + self.X_test = None + self.y_test = None + + self.model = SVC( + C=C, + kernel=kernel, + degree=degree, + gamma=gamma, + coef0=coef0, + shrinking=shrinking, + probability=probability, + class_weight=class_weight, + cache_size=cache_size, + max_iter=max_iter, + random_state=random_state + ) + + def run(self, **kwargs): + self.train() + train_roc_auc, train_tpfptnfn = self.evaluate(dataset='trainset') + val_roc_auc, val_tpfptnfn = self.evaluate(dataset='valset') + test_roc_auc, test_tpfptnfn = self.evaluate(dataset='testset') + results = {0: {'train': {'loss': train_roc_auc, 'tpfptnfn': train_tpfptnfn}, 'val': {'loss': val_roc_auc, 'tpfptnfn': val_tpfptnfn}, 'test': {'loss': test_roc_auc, 'tpfptnfn': test_tpfptnfn}}} + return results + + def train(self): + self.model.fit(self.X_train, self.y_train) + return + + def evaluate(self, dataset='trainset'): + if dataset == 'trainset': + X, y = self.X_train, self.y_train + elif dataset == 'valset': + if self.X_val is None: + return None, None + else: + X, y = self.X_val, self.y_val + elif dataset == 'testset': + if self.X_test is None: + return None, None + else: + X, y = self.X_test, self.y_test + y_pred = self.model.predict_proba(X) + roc_auc = roc_auc_score(y, y_pred[:,1]) + tpfptnfn = {threshold: {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0} for threshold in range(0, 101)} + for threshold in range(0, 101): + cm = confusion_matrix(y, (y_pred[:,1] > (threshold / 100)), labels=[0, 1], normalize='all') + tpfptnfn[threshold]['tp'] = cm[1,1] + tpfptnfn[threshold]['fp'] = cm[0,1] + tpfptnfn[threshold]['tn'] = cm[0,0] + tpfptnfn[threshold]['fn'] = cm[1,0] + return -roc_auc, tpfptnfn def get_state_dict(self): - model = self.model.state_dict() - for key, value in model.items(): - model[key] = value.detach().cpu() - return model + return None + + def load_state_dict(self, state_dict): + return + -class DecisionTreeClient(): - def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, **kwargs): +class KNNClient(): + def __init__(self, name:str, train_df:pd.DataFrame, val_df:pd.DataFrame=None, test_df:pd.DataFrame=None, n_neighbors=5, weights='uniform', algorithm='auto', leaf_size=30, p=2, metric='minkowski', n_jobs=-1, **kwargs): self.name = name - valset_size = kwargs.get('valset_size', None) - if val_df is None and valset_size is not None: - val_df = train_df.sample(frac=valset_size, random_state=42) - train_df = train_df.drop(val_df.index) - self.X_train = train_df.drop(columns=['account', 'bank', 'is_sar']).to_numpy() + + self.X_train = train_df.drop(columns=['is_sar']).to_numpy() self.y_train = train_df['is_sar'].to_numpy() scaler = StandardScaler() self.X_train = scaler.fit_transform(self.X_train) if val_df is not None: - self.X_val = val_df.drop(columns=['account', 'bank', 'is_sar']).to_numpy() + self.X_val = val_df.drop(columns=['is_sar']).to_numpy() self.X_val = scaler.transform(self.X_val) self.y_val = val_df['is_sar'].to_numpy() else: self.X_val = None self.y_val = None if test_df is not None: - self.X_test = test_df.drop(columns=['account', 'bank', 'is_sar']).to_numpy() + self.X_test = test_df.drop(columns=['is_sar']).to_numpy() self.X_test = scaler.transform(self.X_test) self.y_test = test_df['is_sar'].to_numpy() else: self.X_test = None self.y_test = None - self.model = DecisionTreeClassifier( - criterion=kwargs.get('criterion', 'gini'), - splitter=kwargs.get('splitter', 'best'), - max_depth=kwargs.get('max_depth', None), - min_samples_split=kwargs.get('min_samples_split', 2), - min_samples_leaf=kwargs.get('min_samples_leaf', 1), - min_weight_fraction_leaf=kwargs.get('min_weight_fraction_leaf', 0.0), - max_features=kwargs.get('max_features', None), - max_leaf_nodes=kwargs.get('max_leaf_nodes', None), - min_impurity_decrease=kwargs.get('min_impurity_decrease', 0.0), - class_weight=kwargs.get('class_weight', 'balanced'), - random_state=kwargs.get('random_state', 42) + self.model = KNeighborsClassifier( + n_neighbors=n_neighbors, + weights=weights, + algorithm=algorithm, + leaf_size=leaf_size, + p=p, + metric=metric, + n_jobs=n_jobs ) - def run(self): + def run(self, **kwargs): self.train() - _, train_tpfptnfn = self.evaluate(dataset='trainset') - _, val_tpfptnfn = self.evaluate(dataset='valset') - _, test_tpfptnfn = self.evaluate(dataset='testset') - results = {0: {'train': {'loss': None, 'tpfptnfn': train_tpfptnfn}, 'val': {'loss': None, 'tpfptnfn': val_tpfptnfn}, 'test': {'loss': None, 'tpfptnfn': test_tpfptnfn}}} + train_roc_auc, train_tpfptnfn = self.evaluate(dataset='trainset') + val_roc_auc, val_tpfptnfn = self.evaluate(dataset='valset') + test_roc_auc, test_tpfptnfn = self.evaluate(dataset='testset') + results = {0: {'train': {'loss': train_roc_auc, 'tpfptnfn': train_tpfptnfn}, 'val': {'loss': val_roc_auc, 'tpfptnfn': val_tpfptnfn}, 'test': {'loss': test_roc_auc, 'tpfptnfn': test_tpfptnfn}}} return results - def train(self, tune_hyperparameters=False): - if tune_hyperparameters: - pass - else: - self.model.fit(self.X_train, self.y_train) + def train(self): + self.model.fit(self.X_train, self.y_train) return def evaluate(self, dataset='trainset'): @@ -319,8 +504,7 @@ def evaluate(self, dataset='trainset'): else: X, y = self.X_test, self.y_test y_pred = self.model.predict_proba(X) - curve = roc_curve(y, y_pred[:,1]) - print(curve) + roc_auc = roc_auc_score(y, y_pred[:,1]) tpfptnfn = {threshold: {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0} for threshold in range(0, 101)} for threshold in range(0, 101): cm = confusion_matrix(y, (y_pred[:,1] > (threshold / 100)), labels=[0, 1], normalize='all') @@ -328,10 +512,12 @@ def evaluate(self, dataset='trainset'): tpfptnfn[threshold]['fp'] = cm[0,1] tpfptnfn[threshold]['tn'] = cm[0,0] tpfptnfn[threshold]['fn'] = cm[1,0] - return None, tpfptnfn + return -roc_auc, tpfptnfn def get_state_dict(self): return None def load_state_dict(self, state_dict): - return \ No newline at end of file + return + + diff --git a/flib/train/__init__.py b/flib/train/__init__.py index 752db64..2387103 100644 --- a/flib/train/__init__.py +++ b/flib/train/__init__.py @@ -1,3 +1,4 @@ from flib.train.centralized import centralized from flib.train.federated import federated from flib.train.isolated import isolated +from flib.train.tune_hyperparams import HyperparamTuner diff --git a/flib/train/centralized.py b/flib/train/centralized.py index 7c1971c..2132654 100644 --- a/flib/train/centralized.py +++ b/flib/train/centralized.py @@ -2,7 +2,7 @@ from flib.utils import set_random_seed from flib.train import Clients -def centralized(seed=42, train_dfs=None, val_dfs=None, test_dfs=None, client='LogRegClient', **kwargs): +def centralized(train_dfs, val_dfs=[], test_dfs=[], seed=42, client='LogRegClient', **kwargs): set_random_seed(seed) diff --git a/flib/train/criterions/criterions.py b/flib/train/criterions/criterions.py index 241fa08..ba93507 100644 --- a/flib/train/criterions/criterions.py +++ b/flib/train/criterions/criterions.py @@ -3,17 +3,17 @@ from torch.nn import functional as F class ClassBalancedLoss(torch.nn.Module): - def __init__(self, beta, n_samples_per_classes, loss_type='sigmoid', **kwargs): + def __init__(self, gamma, n_samples_per_classes, loss_type='sigmoid'): super(ClassBalancedLoss, self).__init__() - self.beta = beta - self.effective_nums = 1.0 - np.power(beta, n_samples_per_classes) + self.gamma = gamma + self.effective_nums = 1.0 - np.power(gamma, n_samples_per_classes) self.n_classes = len(n_samples_per_classes) self.loss_type = loss_type def forward(self, logits, labels): labels = labels.to(torch.int64) labels_one_hot = F.one_hot(labels, self.n_classes).float() - weights = (1.0 - self.beta) / np.array(self.effective_nums) + weights = (1.0 - self.gamma) / np.array(self.effective_nums) weights = weights / np.sum(weights) * self.n_classes weights = torch.tensor(weights, device=logits.device).float() weights = weights.unsqueeze(0) diff --git a/flib/train/federated.py b/flib/train/federated.py index e2ed91f..2e00c1c 100644 --- a/flib/train/federated.py +++ b/flib/train/federated.py @@ -8,7 +8,7 @@ import optuna import time -def federated(seed=42, train_dfs=None, val_dfs=None, test_dfs=None, client='LogRegClient', criterion='ClassBalancedLoss', n_workers=3, n_rounds=100, eval_every=10, **kwargs): +def federated(train_dfs, val_dfs=[], test_dfs=[], seed=42, n_workers=3, n_rounds=100, eval_every=10, client='LogRegClient', **kwargs): set_random_seed(seed) @@ -22,13 +22,14 @@ def federated(seed=42, train_dfs=None, val_dfs=None, test_dfs=None, client='LogR # init clients clients = [] - for i, train_df, val_df, test_df in zip(range(len(train_dfs)), train_dfs, val_dfs, test_dfs): + for i, train_df in enumerate(train_dfs): + val_df = val_dfs[i] if i < len(val_dfs) else None + test_df = test_dfs[i] if i < len(test_dfs) else None client = Client( name=f'c{i}', train_df=train_df, val_df=val_df, test_df=test_df, - criterion=criterion, **kwargs ) clients.append(client) @@ -40,9 +41,9 @@ def federated(seed=42, train_dfs=None, val_dfs=None, test_dfs=None, client='LogR ) # run - results, state_dict, avg_loss = server.run(n_rounds=n_rounds, eval_every=eval_every) + results = server.run(n_rounds=n_rounds, eval_every=eval_every, **kwargs) - return results, state_dict, avg_loss + return results class HyperparamTuner(): def __init__(self, seed=42, trainsets=None, n_rounds=50, model='LogisticRegressor', optimizer=['SGD'], criterion=['ClassBalancedLoss'], beta=(0.9999, 0.99999999), local_epochs=[1], batch_size=[64, 128, 256], lr=(0.001, 0.1), n_workers=3, device='cuda:0', storage=None, results_file=None): diff --git a/flib/train/isolated.py b/flib/train/isolated.py index c45041e..f8053c1 100644 --- a/flib/train/isolated.py +++ b/flib/train/isolated.py @@ -1,12 +1,6 @@ from flib.train import Clients from flib.utils import set_random_seed import multiprocessing as mp -import torch -import os -import pickle -import optuna -import time -from tqdm import tqdm import numpy as np import copy @@ -18,7 +12,7 @@ def train_clients(clients, kwargs): results.append(client.run(**kwargs)) return client_names, results -def isolated(seed=42, train_dfs=None, val_dfs=None, test_dfs=None, client='LogRegClient', criterion='ClassBalancedLoss', n_workers=3, **kwargs): +def isolated(train_dfs, val_dfs=[], test_dfs=[], seed=42, n_workers=3, client='LogRegClient', **kwargs): set_random_seed(seed) @@ -31,13 +25,14 @@ def isolated(seed=42, train_dfs=None, val_dfs=None, test_dfs=None, client='LogRe # init clients clients = [] - for i, train_df, val_df, test_df in zip(range(len(train_dfs)), train_dfs, val_dfs, test_dfs): + for i, train_df in enumerate(train_dfs): + val_df = val_dfs[i] if i < len(val_dfs) else None + test_df = test_dfs[i] if i < len(test_dfs) else None client = Client( name=f'c{i}', train_df=train_df, val_df=val_df, test_df=test_df, - criterion=criterion, **kwargs ) clients.append(client) diff --git a/flib/train/optimizers/Optimizers.py b/flib/train/optimizers/Optimizers.py new file mode 100644 index 0000000..e69de29 diff --git a/flib/train/optimizers/__init__.py b/flib/train/optimizers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flib/train/servers/servers.py b/flib/train/servers/servers.py index 14a3529..7ad376d 100644 --- a/flib/train/servers/servers.py +++ b/flib/train/servers/servers.py @@ -53,11 +53,11 @@ def _average_state_dicts(self, state_dicts:OrderedDict, weights:list=None): avg_state_dict[key] = avg return avg_state_dict - def run(self, n_rounds=100, eval_every=10, state_dict=None, n_no_aggregation_rounds=0): + def run(self, n_rounds=100, eval_every=10, state_dict=None, n_no_aggregation_rounds=0, lr_patience=5, es_patience=15, **kwargs): - results_dict = {client.name: {round: {} for round in range(n_rounds+1)} for client in self.clients} - lr_patience = 5 - es_patience = 15 + results_dict = {client.name: {0: {}} for client in self.clients} + lr_patience_reset = lr_patience + es_patience_reset = es_patience avg_state_dict = None with mp.Pool(self.n_workers) as p: @@ -72,16 +72,18 @@ def run(self, n_rounds=100, eval_every=10, state_dict=None, n_no_aggregation_rou # evaluate initial model results = p.starmap(self._evaluate_clients, [(client_split, 'trainset') for client_split in client_splits]) - previous_avg_loss = 0.0 + previous_train_loss = 0.0 for result in results: for client, loss, tpfptnfn in zip(result[0], result[1], result[2]): results_dict[client][0]['train'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - previous_avg_loss += loss / len(self.clients) + previous_train_loss += loss / len(self.clients) if eval_every is not None: results = p.starmap(self._evaluate_clients, [(client_split, 'valset') for client_split in client_splits]) + previous_val_loss = 0.0 for result in results: for client, loss, tpfptnfn in zip(result[0], result[1], result[2]): results_dict[client][0]['val'] = {'loss': loss, 'tpfptnfn': tpfptnfn} + previous_val_loss += loss / len(self.clients) for round in tqdm(range(1, n_rounds+1), desc='progress', leave=False): @@ -90,9 +92,18 @@ def run(self, n_rounds=100, eval_every=10, state_dict=None, n_no_aggregation_rou avg_loss = 0.0 for result in results: for client, loss, tpfptnfn, state_dict in zip(result[0], result[1], result[2], result[3]): - results_dict[client][round]['train'] = {'loss': loss, 'tpfptnfn': tpfptnfn} + results_dict[client][round] = {'train': {'loss': loss, 'tpfptnfn': tpfptnfn}} state_dicts.append(state_dict) avg_loss += loss / len(self.clients) + if avg_loss >= previous_train_loss - 0.0005: + lr_patience -= 1 + else: + lr_patience = lr_patience_reset + if lr_patience <= 0: + tqdm.write('Decreasing learning rate.') + for client in self.clients: + decrease_lr(client.optimizer, factor=0.5) + previous_train_loss = avg_loss if round > n_no_aggregation_rounds: avg_state_dict = self._average_state_dicts(state_dicts) @@ -101,28 +112,19 @@ def run(self, n_rounds=100, eval_every=10, state_dict=None, n_no_aggregation_rou if eval_every is not None and round % eval_every == 0: results = p.starmap(self._evaluate_clients, [(client_split, 'valset') for client_split in client_splits]) + avg_loss = 0.0 for result in results: for client, loss, tpfptnfn in zip(result[0], result[1], result[2]): results_dict[client][round]['val'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - - if avg_loss >= previous_avg_loss - 0.0005: - lr_patience -= 1 - es_patience -= 1 - else: - lr_patience = 5 - es_patience = 15 - - if lr_patience <= 0: - tqdm.write('Decreasing learning rate.') - for client in self.clients: - decrease_lr(client.optimizer, factor=0.5) - lr_patience = 5 - - if es_patience <= 0 and (eval_every is None or round % eval_every == 0): - tqdm.write('Early stopping.') - break - - previous_avg_loss = avg_loss + avg_loss = loss / len(self.clients) + if avg_loss >= previous_val_loss - 0.0005: + es_patience -= eval_every + else: + es_patience = es_patience_reset + if es_patience <= 0: + tqdm.write('Early stopping.') + break + previous_val_loss = avg_loss if eval_every is not None: results = p.starmap(self._evaluate_clients, [(client_split, 'testset') for client_split in client_splits]) @@ -130,4 +132,4 @@ def run(self, n_rounds=100, eval_every=10, state_dict=None, n_no_aggregation_rou for client, loss, tpfptnfn in zip(result[0], result[1], result[2]): results_dict[client][round]['test'] = {'loss': loss, 'tpfptnfn': tpfptnfn} - return results_dict, avg_state_dict, avg_loss \ No newline at end of file + return results_dict \ No newline at end of file diff --git a/flib/train/tune_hyperparams.py b/flib/train/tune_hyperparams.py new file mode 100644 index 0000000..96c567b --- /dev/null +++ b/flib/train/tune_hyperparams.py @@ -0,0 +1,66 @@ +import optuna +import inspect +from flib.train import Clients + +class HyperparamTuner(): + def __init__(self, study_name, obj_fn, train_dfs, val_dfs, seed=42, device='cpu', n_workers=1, storage=None, client=None, params=None): + self.study_name = study_name + self.obj_fn = obj_fn + self.train_dfs = train_dfs + self.val_dfs = val_dfs + self.seed = seed + self.device = device + self.n_workers = n_workers + self.storage = storage + self.client = client + self.params = params + + def objective(self, trial: optuna.Trial): + params = {} + for param in self.params['search_space']: + if isinstance(self.params['search_space'][param], list): + params[param] = trial.suggest_categorical(param, self.params['search_space'][param]) + elif isinstance(self.params['search_space'][param], tuple): + if type(self.params['search_space'][param][0]) == int: + params[param] = trial.suggest_int(param, self.params['search_space'][param][0], self.params['search_space'][param][1]) + elif type(self.params['search_space'][param][0]) == float: + params[param] = trial.suggest_float(param, self.params['search_space'][param][0], self.params['search_space'][param][1]) + elif isinstance(self.params['search_space'][param], dict): + params[param] = trial.suggest_categorical(param, list(self.params['search_space'][param].keys())) + params[param+'_params'] = {} + for subparam in self.params['search_space'][param][params[param]]: + if isinstance(self.params['search_space'][param][params[param]][subparam], list): + params[param+'_params'][subparam] = trial.suggest_categorical(params[param]+'_'+subparam, self.params['search_space'][param][params[param]][subparam]) + elif isinstance(self.params['search_space'][param][params[param]][subparam], tuple): + if type(self.params['search_space'][param][params[param]][subparam][0]) == int: + params[param+'_params'][subparam] = trial.suggest_int(params[param]+'_'+subparam, self.params['search_space'][param][params[param]][subparam][0], self.params['search_space'][param][params[param]][subparam][1]) + elif type(self.params['search_space'][param][params[param]][subparam][0]) == float: + params[param+'_params'][subparam] = trial.suggest_float(params[param]+'_'+subparam, self.params['search_space'][param][params[param]][subparam][0], self.params['search_space'][param][params[param]][subparam][1]) + else: + params[param+'_params'][subparam] = self.params['search_space'][param][params[param]][subparam] + else: + params[param] = self.params['search_space'][param] + + for param in self.params['default']: + if param not in params: + if isinstance(self.params['default'][param], dict): + params[param] = next(iter(self.params['default'][param])) + params[param+'_params'] = {} + for subparam in self.params['default'][param][params[param]]: + params[param+'_params'][subparam] = self.params['default'][param][params[param]][subparam] + else: + params[param] = self.params['default'][param] + + results = self.obj_fn(seed=self.seed, train_dfs=self.train_dfs, val_dfs=self.val_dfs, n_workers=self.n_workers, device=self.device, client=self.client, **params) + avg_loss = 0.0 + for client in results: + round = max(results[client].keys()) + avg_loss += results[client][round]['val']['loss'] / len(results) + return avg_loss + + def optimize(self, n_trials=10): + study = optuna.create_study(storage=self.storage, sampler=optuna.samplers.TPESampler(), study_name=self.study_name, direction='minimize', load_if_exists=True) + study.optimize(self.objective, n_trials=n_trials) + return study.best_trials + + \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5922e1a..d311c02 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,6 @@ tqdm==4.66.5 scikit-learn==1.5.1 optuna==3.6.1 matplotlib==3.9.2 -#torch==2.4.0 -#torchaudio==2.4.0 -#torchvision==0.19.0 \ No newline at end of file +torch==2.4.0 +torchaudio==2.4.0 +torchvision==0.19.0 \ No newline at end of file