diff --git a/demo/horizontal/binning_woe_iv/2party/config/__init__.py b/demo/horizontal/binning_woe_iv/2party/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/demo/horizontal/binning_woe_iv/2party/config/fed_conf.json b/demo/horizontal/binning_woe_iv/2party/config/fed_conf.json new file mode 100644 index 0000000..589ad13 --- /dev/null +++ b/demo/horizontal/binning_woe_iv/2party/config/fed_conf.json @@ -0,0 +1,18 @@ +{ + "fed_info": { + "scheduler": { + "scheduler": "localhost:55001" + }, + "trainer": { + "node-1": "localhost:56001", + "node-2": "localhost:56002" + }, + "assist_trainer": { + "assist_trainer": "localhost:55002" + } + }, + "redis_server": "localhost:6379", + "grpc": { + "use_tls": false + } +} \ No newline at end of file diff --git a/demo/horizontal/binning_woe_iv/2party/config/trainer_config_assist_trainer.json b/demo/horizontal/binning_woe_iv/2party/config/trainer_config_assist_trainer.json new file mode 100644 index 0000000..762c171 --- /dev/null +++ b/demo/horizontal/binning_woe_iv/2party/config/trainer_config_assist_trainer.json @@ -0,0 +1,33 @@ +[ + { + "identity": "assist_trainer", + "model_info": { + "name": "horizontal_binning_woe_iv" + }, + "output": { + "path": "/opt/checkpoints/[JOB_ID]/[NODE_ID]", + "result": { + "name": "woe_iv_result_[STAGE_ID].json" + } + }, + "train_info": { + "train_params": { + "aggregation": { + "encryption": { + "method": "otp", + "key_bitlength": 64, + "data_type": "torch.Tensor", + "key_exchange": { + "key_bitlength": 3072, + "optimized": true + }, + "csprng": { + "name": "hmac_drbg", + "method": "sha512" + } + } + } + } + } + } +] \ No newline at end of file diff --git a/demo/horizontal/binning_woe_iv/2party/config/trainer_config_node-1.json b/demo/horizontal/binning_woe_iv/2party/config/trainer_config_node-1.json new file mode 100644 index 0000000..02a50f7 --- /dev/null +++ b/demo/horizontal/binning_woe_iv/2party/config/trainer_config_node-1.json @@ -0,0 +1,43 @@ +[ + { + "identity": "label_trainer", + "model_info": { + "name": "horizontal_binning_woe_iv" + }, + "input": { + "trainset": [ + { + "type": "csv", + "path": "./dataset/breast_cancer_wisconsin_horizontal/2party", + "name": "breast_cancer_wisconsin_horizontal_1.csv", + "has_id": true, + "has_label": true + } + ] + }, + "train_info": { + "train_params": { + "aggregation": { + "encryption": { + "method": "otp", + "key_bitlength": 64, + "data_type": "torch.Tensor", + "key_exchange": { + "key_bitlength": 3072, + "optimized": true + }, + "csprng": { + "name": "hmac_drbg", + "method": "sha512" + } + }, + "weight": 1 + }, + "binning": { + "method": "equal_width", + "bins": 5 + } + } + } + } +] \ No newline at end of file diff --git a/demo/horizontal/binning_woe_iv/2party/config/trainer_config_node-2.json b/demo/horizontal/binning_woe_iv/2party/config/trainer_config_node-2.json new file mode 100644 index 0000000..b4e2498 --- /dev/null +++ b/demo/horizontal/binning_woe_iv/2party/config/trainer_config_node-2.json @@ -0,0 +1,43 @@ +[ + { + "identity": "label_trainer", + "model_info": { + "name": "horizontal_binning_woe_iv" + }, + "input": { + "trainset": [ + { + "type": "csv", + "path": "./dataset/breast_cancer_wisconsin_horizontal/2party", + "name": "breast_cancer_wisconsin_horizontal_2.csv", + "has_id": true, + "has_label": true + } + ] + }, + "train_info": { + "train_params": { + "aggregation": { + "encryption": { + "method": "otp", + "key_bitlength": 64, + "data_type": "torch.Tensor", + "key_exchange": { + "key_bitlength": 3072, + "optimized": true + }, + "csprng": { + "name": "hmac_drbg", + "method": "sha512" + } + }, + "weight": 1 + }, + "binning": { + "method": "equal_width", + "bins": 5 + } + } + } + } +] \ No newline at end of file diff --git a/demo/horizontal/binning_woe_iv/2party/run.sh b/demo/horizontal/binning_woe_iv/2party/run.sh new file mode 100755 index 0000000..5342bb6 --- /dev/null +++ b/demo/horizontal/binning_woe_iv/2party/run.sh @@ -0,0 +1,57 @@ +#!/bin/sh + +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "$(uname)" = "Darwin" ]; then + export PROJECT_HOME=$(greadlink -f ../../../../) + echo "PROJECT_HOME:""$PROJECT_HOME" +elif [ "$(uname -s)" = "Linux" ]; then + export PROJECT_HOME=$(readlink -f ../../../../) + echo "PROJECT_HOME:""$PROJECT_HOME" +fi + +export PYTHONPATH=$PYTHONPATH:$PROJECT_HOME/python:$PROJECT_HOME/python/common/communication/gRPC/python + +datapath="${PROJECT_HOME}/dataset" +if [ ! -d "${PROJECT_HOME}/dataset/breast_cancer_wisconsin_horizontal/2party" ]; then + if [ ! -f "${PROJECT_HOME}/python/xfl.py" ]; then + python "${PROJECT_HOME}/common/dataset/breast_cancer_wisconsin.py" --mode "horizontal" --splits 2 --party "1" "2" + else + python "${PROJECT_HOME}/python/common/dataset/breast_cancer_wisconsin.py" --mode "horizontal" --splits 2 --party "1" "2" + fi +fi + +type="horizontal" +operator="binning_woe_iv" +party="2party" +code="${type}.${operator}.${party}" +config_path="${PROJECT_HOME}/demo/${type}/${operator}/${party}/config" + +if [ ! -f "${PROJECT_HOME}/python/xfl.py" ]; then + EXECUTE_PATH=${PROJECT_HOME}/xfl.py +else + EXECUTE_PATH=${PROJECT_HOME}/python/xfl.py +fi + +cd $PROJECT_HOME +python "$EXECUTE_PATH" -t node-1 --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -t node-2 --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -a --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -s --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -c start --config_path ${config_path} & diff --git a/demo/horizontal/binning_woe_iv/2party/stop.sh b/demo/horizontal/binning_woe_iv/2party/stop.sh new file mode 100755 index 0000000..8d08757 --- /dev/null +++ b/demo/horizontal/binning_woe_iv/2party/stop.sh @@ -0,0 +1,20 @@ +#!/bin/sh + +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +pid=$(ps -ef | grep xfl | grep -v grep | awk '{print $2}') +echo "XFL Process ${pid} killed" +ps -ef | grep python | grep -v grep | awk '{print $2}' | xargs kill -9; diff --git a/demo/local/data_statistic/1party/config/fed_conf.json b/demo/local/data_statistic/1party/config/fed_conf.json new file mode 100644 index 0000000..7374c3d --- /dev/null +++ b/demo/local/data_statistic/1party/config/fed_conf.json @@ -0,0 +1,14 @@ +{ + "fed_info": { + "scheduler": { + "node-1": "localhost:55001" + }, + "trainer": { + "node-1": "localhost:56001" + } + }, + "redis_server": "localhost:6379", + "grpc": { + "use_tls": false + } +} \ No newline at end of file diff --git a/demo/local/data_statistic/1party/config/trainer_config_node-1.json b/demo/local/data_statistic/1party/config/trainer_config_node-1.json new file mode 100644 index 0000000..ad77a33 --- /dev/null +++ b/demo/local/data_statistic/1party/config/trainer_config_node-1.json @@ -0,0 +1,34 @@ +[ + { + "identity": "label_trainer", + "model_info": { + "name": "local_data_statistic" + }, + "input": { + "dataset": [ + { + "type": "csv", + "path": "./dataset/breast_cancer_wisconsin_vertical/2party", + "name": "breast_cancer_wisconsin_vertical_labeled_train.csv", + "has_label": true, + "has_id": true + } + ] + }, + "output": { + "path": "/opt/checkpoints/[JOB_ID]/[NODE_ID]", + "summary": { + "name": "data_summary_[STAGE_ID].json" + } + }, + "train_info": { + "train_params": { + "quantile": [ + 0.5, + 0.8, + 0.9 + ] + } + } + } +] \ No newline at end of file diff --git a/demo/local/data_statistic/1party/run.sh b/demo/local/data_statistic/1party/run.sh new file mode 100644 index 0000000..a4a1395 --- /dev/null +++ b/demo/local/data_statistic/1party/run.sh @@ -0,0 +1,53 @@ +#!/bin/sh + +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "$(uname)" = "Darwin" ]; then + export PROJECT_HOME=$(greadlink -f ../../../../) + echo "PROJECT_HOME:""$PROJECT_HOME" +elif [ "$(uname -s)" = "Linux" ]; then + export PROJECT_HOME=$(readlink -f ../../../../) + echo "PROJECT_HOME:""$PROJECT_HOME" +fi + +export PYTHONPATH=$PYTHONPATH:$PROJECT_HOME/python:$PROJECT_HOME/python/common/communication/gRPC/python + +datapath="${PROJECT_HOME}/dataset" +if [ ! -d "${PROJECT_HOME}/dataset/breast_cancer_wisconsin_vertical/2party" ]; then + if [ ! -f "${PROJECT_HOME}/python/xfl.py" ]; then + python "${PROJECT_HOME}/common/dataset/breast_cancer_wisconsin.py" --mode "vertical" --splits 2 --party "labeled" "1" + else + python "${PROJECT_HOME}/python/common/dataset/breast_cancer_wisconsin.py" --mode "vertical" --splits 2 --party "labeled" "1" + fi +fi + +type="local" +operator="data_statistic" +party="1party" +code="${type}.${operator}.${party}" +config_path="${PROJECT_HOME}/demo/${type}/${operator}/${party}/config" + +if [ ! -f "${PROJECT_HOME}/python/xfl.py" ]; then + EXECUTE_PATH=${PROJECT_HOME}/xfl.py +else + EXECUTE_PATH=${PROJECT_HOME}/python/xfl.py +fi + +cd $PROJECT_HOME +python "$EXECUTE_PATH" -s --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -t node-1 --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -c start --config_path ${config_path} & diff --git a/demo/local/data_statistic/1party/stop.sh b/demo/local/data_statistic/1party/stop.sh new file mode 100644 index 0000000..ea744fc --- /dev/null +++ b/demo/local/data_statistic/1party/stop.sh @@ -0,0 +1,20 @@ +#!/bin/sh + +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +pid=$(ps -ef | grep xfl | grep -v grep | awk '{print $2}') +echo "XFL Process ${pid} killed" +ps -ef | grep xfl | grep -v grep | awk '{print $2}' | xargs kill -9 diff --git a/demo/vertical/linear_regression/2party/config/fed_conf.json b/demo/vertical/linear_regression/2party/config/fed_conf.json new file mode 100644 index 0000000..5093be5 --- /dev/null +++ b/demo/vertical/linear_regression/2party/config/fed_conf.json @@ -0,0 +1,18 @@ +{ + "fed_info": { + "scheduler": { + "node-1": "localhost:55001" + }, + "trainer": { + "node-1": "localhost:56001", + "node-2": "localhost:56002" + }, + "assist_trainer": { + "assist_trainer": "localhost:55002" + } + }, + "redis_server": "localhost:6379", + "grpc": { + "use_tls": false + } +} \ No newline at end of file diff --git a/demo/vertical/linear_regression/2party/config/trainer_config_assist_trainer.json b/demo/vertical/linear_regression/2party/config/trainer_config_assist_trainer.json new file mode 100644 index 0000000..3afe715 --- /dev/null +++ b/demo/vertical/linear_regression/2party/config/trainer_config_assist_trainer.json @@ -0,0 +1,8 @@ +[ + { + "identity": "assist_trainer", + "model_info": { + "name": "vertical_linear_regression" + } + } +] \ No newline at end of file diff --git a/demo/vertical/linear_regression/2party/config/trainer_config_node-1.json b/demo/vertical/linear_regression/2party/config/trainer_config_node-1.json new file mode 100644 index 0000000..4cf4018 --- /dev/null +++ b/demo/vertical/linear_regression/2party/config/trainer_config_node-1.json @@ -0,0 +1,94 @@ +[ + { + "identity": "label_trainer", + "model_info": { + "name": "vertical_linear_regression" + }, + "input": { + "trainset": [ + { + "type": "csv", + "path": "./dataset/boston_housing_price_vertical/2party", + "name": "boston_housing_price_vertical_labeled_train.csv", + "has_label": true, + "has_id": true + } + ], + "valset": [ + { + "type": "csv", + "path": "./dataset/boston_housing_price_vertical/2party", + "name": "boston_housing_price_vertical_labeled_test.csv", + "has_label": true, + "has_id": true + } + ], + "pretrained_model": { + "path": "", + "name": "" + } + }, + "output": { + "path": "/opt/checkpoints/[JOB_ID]/[NODE_ID]", + "model": { + "name": "vertical_linear_regression_[STAGE_ID].pt" + }, + "metric_train": { + "name": "linear_reg_metric_train_[STAGE_ID].csv" + }, + "metric_val": { + "name": "linear_reg_metric_val_[STAGE_ID].csv" + }, + "prediction_train": { + "name": "linear_reg_prediction_train_[STAGE_ID].csv" + }, + "prediction_val": { + "name": "linear_reg_prediction_val_[STAGE_ID].csv" + }, + "feature_importance": { + "name": "linear_reg_feature_importance_[STAGE_ID].csv" + } + }, + "train_info": { + "interaction_params": { + "save_frequency": -1, + "write_training_prediction": true, + "write_validation_prediction": true, + "echo_training_metrics": true + }, + "train_params": { + "global_epoch": 10, + "batch_size": 32, + "encryption": { + "ckks": { + "poly_modulus_degree": 8192, + "coeff_mod_bit_sizes": [ + 60, + 40, + 40, + 60 + ], + "global_scale_bit_size": 40 + } + }, + "optimizer": { + "lr": 0.01, + "p": 2, + "alpha": 1e-4 + }, + "metric": { + "mse": {}, + "mape": {}, + "mae": {}, + "rmse": {} + }, + "early_stopping": { + "key": "loss", + "patience": -1, + "delta": 0 + }, + "random_seed": 50 + } + } + } +] \ No newline at end of file diff --git a/demo/vertical/linear_regression/2party/config/trainer_config_node-2.json b/demo/vertical/linear_regression/2party/config/trainer_config_node-2.json new file mode 100644 index 0000000..0e08fba --- /dev/null +++ b/demo/vertical/linear_regression/2party/config/trainer_config_node-2.json @@ -0,0 +1,68 @@ +[ + { + "identity": "trainer", + "model_info": { + "name": "vertical_linear_regression" + }, + "input": { + "trainset": [ + { + "type": "csv", + "path": "./dataset/boston_housing_price_vertical/2party", + "name": "boston_housing_price_vertical_1_train.csv", + "has_label": false, + "has_id": true + } + ], + "valset": [ + { + "type": "csv", + "path": "./dataset/boston_housing_price_vertical/2party", + "name": "boston_housing_price_vertical_1_test.csv", + "has_label": false, + "has_id": true + } + ], + "pretrained_model": { + "path": "", + "name": "" + } + }, + "output": { + "path": "/opt/checkpoints/[JOB_ID]/[NODE_ID]", + "model": { + "name": "vertical_linear_regression_[STAGE_ID].pt" + } + }, + "train_info": { + "interaction_params": { + "save_frequency": -1, + "write_training_prediction": true, + "write_validation_prediction": true, + "echo_training_metrics": true + }, + "train_params": { + "global_epoch": 10, + "batch_size": 32, + "encryption": { + "ckks": { + "poly_modulus_degree": 8192, + "coeff_mod_bit_sizes": [ + 60, + 40, + 40, + 60 + ], + "global_scale_bit_size": 40 + } + }, + "optimizer": { + "lr": 0.01, + "p": 2, + "alpha": 1e-4 + }, + "random_seed": 50 + } + } + } +] \ No newline at end of file diff --git a/demo/vertical/linear_regression/2party/run.sh b/demo/vertical/linear_regression/2party/run.sh new file mode 100644 index 0000000..6e67aa7 --- /dev/null +++ b/demo/vertical/linear_regression/2party/run.sh @@ -0,0 +1,62 @@ +#!/bin/sh + +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +if [ "$(uname)" = "Darwin" ] +then + export PROJECT_HOME=$(greadlink -f ../../../../) + echo "PROJECT_HOME:""$PROJECT_HOME" +elif [ "$(uname -s)" = "Linux" ] +then + export PROJECT_HOME=$(readlink -f ../../../../) + echo "PROJECT_HOME:""$PROJECT_HOME" +fi + +export PYTHONPATH=$PYTHONPATH:$PROJECT_HOME/python:$PROJECT_HOME/python/common/communication/gRPC/python + +datapath="${PROJECT_HOME}/dataset" +if [ ! -d "${PROJECT_HOME}/dataset/boston_housing_price_vertical/2party" ]; then + if [ ! -f "${PROJECT_HOME}/python/xfl.py" ]; then + python "${PROJECT_HOME}/common/dataset/boston_housing_price.py" --mode "vertical" --splits 2 --party "labeled" "1" --norm true + else + python "${PROJECT_HOME}/python/common/dataset/boston_housing_price.py" --mode "vertical" --splits 2 --party "labeled" "1" --norm true + fi +fi + +type="vertical" +operator="linear_regression" +party="2party" +code="${type}.${operator}.${party}" +config_path="${PROJECT_HOME}/demo/${type}/${operator}/${party}/config" + +if [ ! -f "${PROJECT_HOME}/python/xfl.py" ]; then + EXECUTE_PATH=${PROJECT_HOME}/xfl.py +else + EXECUTE_PATH=${PROJECT_HOME}/python/xfl.py +fi + +cd $PROJECT_HOME +python "$EXECUTE_PATH" -s --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -t node-1 --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -t node-2 --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -a --config_path ${config_path} & +sleep 1 +python "$EXECUTE_PATH" -c start --config_path ${config_path} & + + diff --git a/demo/vertical/linear_regression/2party/stop.sh b/demo/vertical/linear_regression/2party/stop.sh new file mode 100644 index 0000000..ea744fc --- /dev/null +++ b/demo/vertical/linear_regression/2party/stop.sh @@ -0,0 +1,20 @@ +#!/bin/sh + +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +pid=$(ps -ef | grep xfl | grep -v grep | awk '{print $2}') +echo "XFL Process ${pid} killed" +ps -ef | grep xfl | grep -v grep | awk '{print $2}' | xargs kill -9 diff --git a/python/algorithm/config/horizontal_binning_woe_iv/__init__.py b/python/algorithm/config/horizontal_binning_woe_iv/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/algorithm/config/horizontal_binning_woe_iv/assist_trainer.json b/python/algorithm/config/horizontal_binning_woe_iv/assist_trainer.json new file mode 100644 index 0000000..a4b2c14 --- /dev/null +++ b/python/algorithm/config/horizontal_binning_woe_iv/assist_trainer.json @@ -0,0 +1,31 @@ +{ + "identity": "assist_trainer", + "model_info": { + "name": "horizontal_binning_woe_iv" + }, + "output": { + "path": "/opt/checkpoints/[JOB_ID]/[NODE_ID]", + "result": { + "name": "woe_iv_result_[STAGE_ID].json" + } + }, + "train_info": { + "train_params": { + "aggregation": { + "encryption": { + "method": "otp", + "key_bitlength": 64, + "data_type": "torch.Tensor", + "key_exchange": { + "key_bitlength": 3072, + "optimized": true + }, + "csprng": { + "name": "hmac_drbg", + "method": "sha512" + } + } + } + } + } +} \ No newline at end of file diff --git a/python/algorithm/config/horizontal_binning_woe_iv/trainer.json b/python/algorithm/config/horizontal_binning_woe_iv/trainer.json new file mode 100644 index 0000000..cb92a96 --- /dev/null +++ b/python/algorithm/config/horizontal_binning_woe_iv/trainer.json @@ -0,0 +1,41 @@ +{ + "identity": "label_trainer", + "model_info": { + "name": "horizontal_binning_woe_iv" + }, + "input": { + "trainset": [ + { + "type": "csv", + "path": "../../../../dataset/horizontal_breast", + "name": "breast_horizontal_guest.csv", + "has_id": true, + "has_label": true + } + ] + }, + "train_info": { + "train_params": { + "aggregation": { + "encryption": { + "method": "otp", + "key_bitlength": 64, + "data_type": "torch.Tensor", + "key_exchange": { + "key_bitlength": 3072, + "optimized": true + }, + "csprng": { + "name": "hmac_drbg", + "method": "sha512" + } + }, + "weight": 1 + }, + "binning": { + "method": "equal_width", + "bins": 5 + } + } + } +} diff --git a/python/algorithm/config/local_data_statistic/__init__.py b/python/algorithm/config/local_data_statistic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/algorithm/config/local_data_statistic/label_trainer.json b/python/algorithm/config/local_data_statistic/label_trainer.json new file mode 100644 index 0000000..f88686c --- /dev/null +++ b/python/algorithm/config/local_data_statistic/label_trainer.json @@ -0,0 +1,32 @@ +{ + "identity": "label_trainer", + "model_info": { + "name": "local_data_statistic" + }, + "input": { + "dataset": [ + { + "type": "csv", + "path": "/opt/dataset/2party", + "name": "epsilon_2party_guest_with_id.csv", + "has_label": true, + "has_id": true + } + ] + }, + "output": { + "path": "/opt/checkpoints/[JOB_ID]/[NODE_ID]", + "summary": { + "name": "data_summary_[STAGE_ID].json" + } + }, + "train_info": { + "train_params": { + "quantile": [ + 0.5, + 0.8, + 0.9 + ] + } + } +} \ No newline at end of file diff --git a/python/algorithm/config/vertical_linear_regression/__init__.py b/python/algorithm/config/vertical_linear_regression/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/algorithm/config/vertical_linear_regression/assist_trainer.json b/python/algorithm/config/vertical_linear_regression/assist_trainer.json new file mode 100644 index 0000000..6e81a6f --- /dev/null +++ b/python/algorithm/config/vertical_linear_regression/assist_trainer.json @@ -0,0 +1,6 @@ + { + "identity": "assist_trainer", + "model_info": { + "name": "vertical_linear_regression" + } + } \ No newline at end of file diff --git a/python/algorithm/config/vertical_linear_regression/label_trainer.json b/python/algorithm/config/vertical_linear_regression/label_trainer.json new file mode 100644 index 0000000..397ae09 --- /dev/null +++ b/python/algorithm/config/vertical_linear_regression/label_trainer.json @@ -0,0 +1,92 @@ +{ + "identity": "label_trainer", + "model_info": { + "name": "vertical_linear_regression" + }, + "input": { + "trainset": [ + { + "type": "csv", + "path": "../../../../dataset/boston_house_price_linear", + "name": "guest_train_norm.csv", + "has_label": true, + "has_id": true + } + ], + "valset": [ + { + "type": "csv", + "path": "../../../../dataset/boston_house_price_linear", + "name": "guest_test_norm.csv", + "has_label": true, + "has_id": true + } + ], + "pretrained_model": { + "path": "", + "name": "" + } + }, + "output": { + "path": "/opt/checkpoints/[JOB_ID]/[NODE_ID]", + "model": { + "name": "vertical_linear_regression_[STAGE_ID].pt" + }, + "metric_train": { + "name": "linear_reg_metric_train_[STAGE_ID].csv" + }, + "metric_val": { + "name": "linear_reg_metric_val_[STAGE_ID].csv" + }, + "prediction_train": { + "name": "linear_reg_prediction_train_[STAGE_ID].csv" + }, + "prediction_val": { + "name": "linear_reg_prediction_val_[STAGE_ID].csv" + }, + "feature_importance": { + "name": "linear_reg_feature_importance_[STAGE_ID].csv" + } + }, + "train_info": { + "interaction_params": { + "save_frequency": -1, + "write_training_prediction": true, + "write_validation_prediction": true, + "echo_training_metrics": true + }, + "train_params": { + "global_epoch": 10, + "batch_size": 32, + "encryption": { + "ckks": { + "poly_modulus_degree": 8192, + "coeff_mod_bit_sizes": [ + 60, + 40, + 40, + 60 + ], + "global_scale_bit_size": 40 + } + }, + "optimizer": { + "lr": 0.01, + "p": 2, + "alpha": 1e-4 + }, + "metric": { + "mse": {}, + "mape": {}, + "mae": {}, + "rmse": {} + }, + "early_stopping": { + "key": "loss", + "patience": -1, + "delta": 0 + }, + "random_seed": 50 + } + } +} \ No newline at end of file diff --git a/python/algorithm/config/vertical_linear_regression/trainer.json b/python/algorithm/config/vertical_linear_regression/trainer.json new file mode 100644 index 0000000..f02e32e --- /dev/null +++ b/python/algorithm/config/vertical_linear_regression/trainer.json @@ -0,0 +1,66 @@ +{ + "identity": "trainer", + "model_info": { + "name": "vertical_linear_regression" + }, + "input": { + "trainset": [ + { + "type": "csv", + "path": "../../../../dataset/boston_house_price_linear", + "name": "host_train_norm.csv", + "has_label": false, + "has_id": true + } + ], + "valset": [ + { + "type": "csv", + "path": "../../../../dataset/boston_house_price_linear", + "name": "host_test_norm.csv", + "has_label": false, + "has_id": true + } + ], + "pretrained_model": { + "path": "", + "name": "" + } + }, + "output": { + "path": "/opt/checkpoints/[JOB_ID]/[NODE_ID]", + "model": { + "name": "vertical_linear_regression_[STAGE_ID].pt" + } + }, + "train_info": { + "interaction_params": { + "save_frequency": -1, + "write_training_prediction": true, + "write_validation_prediction": true, + "echo_training_metrics": true + }, + "train_params": { + "global_epoch": 10, + "batch_size": 32, + "encryption": { + "ckks": { + "poly_modulus_degree": 8192, + "coeff_mod_bit_sizes": [ + 60, + 40, + 40, + 60 + ], + "global_scale_bit_size": 40 + } + }, + "optimizer": { + "lr": 0.01, + "p": 2, + "alpha": 1e-4 + }, + "random_seed": 50 + } + } +} \ No newline at end of file diff --git a/python/algorithm/framework/horizontal/binning_woe_iv/assist_trainer.py b/python/algorithm/framework/horizontal/binning_woe_iv/assist_trainer.py new file mode 100644 index 0000000..0a4866f --- /dev/null +++ b/python/algorithm/framework/horizontal/binning_woe_iv/assist_trainer.py @@ -0,0 +1,144 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import math +import os +from functools import reduce +from pathlib import Path + +import numpy as np +import pandas as pd + +from algorithm.core.horizontal.aggregation.api import get_aggregation_root_inst +from common.communication.gRPC.python.channel import DualChannel +from common.utils.config_parser import TrainConfigParser +from common.utils.logger import logger + +from service.fed_config import FedConfig + + +def equal_width(min_, max_, bins): + if min_ == max_: # adjust end points before binning + min_ -= .001 * abs(min_) if min_ != 0 else .001 + max_ += .001 * abs(max_) if max_ != 0 else .001 + bins = np.linspace(min_, max_, bins + 1, endpoint=True) + else: # adjust end points after binning + bins = np.linspace(min_, max_, bins + 1, endpoint=True) + adj = (max_ - min_) * 0.001 # 0.1% of the range + bins[0] -= adj + return list(bins) + + +class HorizontalBinningWoeIvAssistTrainer(object): + def __init__(self, train_conf: dict): + self.config = TrainConfigParser(train_conf) + self.aggregation = self.config.train_params.get("aggregation", {}) + self.encryption = self.aggregation.get("encryption") + self.nodes = FedConfig.get_label_trainer() + FedConfig.get_trainer() + self.node_self = FedConfig.get_assist_trainer() + self.dual_channel = {"min_max": {}, "woe": {}, "iv": {}} + for node in self.nodes: + self.dual_channel["min_max"][node] = DualChannel(name="min_max_" + node, ids=[self.node_self] + [node]) + tmp_lst = [] + for node in self.nodes: + tmp_lst.append(self.dual_channel["min_max"][node].recv()) + self.binning = tmp_lst[0] + self.fedagg_executor = get_aggregation_root_inst(sec_conf=self.encryption) + + def fit(self): + # compare local min and max + logger.info("Receive local min and max from trainers") + node_lst = [] + min_lst = [] + max_lst = [] + for node in self.nodes: + tmp = self.dual_channel["min_max"][node].recv() + node_lst.append(tmp[0]) + min_lst.append(tmp[1]["min"]) + max_lst.append(tmp[1]["max"]) + logger.info("Compare min and max of all trainers") + index_min = np.argmin(np.array(min_lst), axis=0) + index_max = np.argmax(np.array(max_lst), axis=0) + node_min = np.array([node_lst[i] for i in index_min]) + node_max = np.array([node_lst[j] for j in index_max]) + + # send back min and max signal to trainers + logger.info("Send back signal to all trainers") + for node in self.nodes: + min_msg = np.where(node_min == node, True, False) + max_msg = np.where(node_max == node, True, False) + self.dual_channel["min_max"][node].send({"min": min_msg, "max": max_msg}) + + # receive final min and max from all trainers + logger.info("Receive final min and max from all trainers") + min_final = [] + max_final = [] + for node in self.nodes: + final_rest = self.dual_channel["min_max"][node].recv() + min_final.append(final_rest["min"]) + max_final.append(final_rest["max"]) + final_min = np.sum(min_final, axis=0) + final_max = np.sum(max_final, axis=0) + + # split points + split_points = [] + if self.binning["method"] == "equal_width": + logger.info("Calculate split points when method is equal_width") + for ind in range(len(final_min)): + split_points.append(equal_width(final_min[ind], final_max[ind], self.binning["bins"])) + logger.info("Send split points to trainers") + for node in self.nodes: + self.dual_channel["min_max"][node].send(split_points) + + # receive pos_num and neg_num from trainers and calculate pos and neg ratios + logger.info("Receive pos_num and neg_num") + pos_aggr = self.fedagg_executor.aggregate(average=False) + neg_aggr = self.fedagg_executor.aggregate(average=False) + + # calculate total pos and neg + logger.info("Calculate total pos_num and neg_num") + pos_total = reduce(lambda x, y: x + y, list(pos_aggr.values())).sum() / len(pos_aggr) + neg_total = reduce(lambda x, y: x + y, list(neg_aggr.values())).sum() / len(neg_aggr) + assert pos_aggr[0].sum() == pos_total + assert neg_aggr[0].sum() == neg_total + + # calculate woe for features + woe_final = {} + iv_final = {} + for key in pos_aggr: + logger.info("Calculate pos_prob and neg_prob") + pos_prob = pos_aggr[key] / pos_total + neg_prob = neg_aggr[key] / neg_total + pos_prob = pd.Series(np.array(pos_prob).flatten()).apply(lambda x: 1e-7 if x == 0 else x) + neg_prob = pd.Series(np.array(neg_prob).flatten()).apply(lambda x: 1e-7 if x == 0 else x) + pos_aggr[key] = pos_prob + neg_aggr[key] = neg_prob + logger.info("Calculate woe") + woe_pre = pos_prob / neg_prob + woe = woe_pre.apply(lambda x: float("%.6f" % math.log(x))) + woe_final[key] = woe.to_dict() + logger.info("Calculate iv") + iv_final[key] = float("%.6f" % np.sum((pos_prob - neg_prob) * woe)) + + # save results + logger.info("Save results") + save_dir = self.config.output["path"] + if not os.path.exists(save_dir): + os.makedirs(save_dir) + guest_file_path = Path(save_dir)/self.config.output["result"]["name"] + with open(guest_file_path, "w") as wf: + json.dump({"woe": woe_final, "iv": iv_final, "split_points": dict(zip(range(len(split_points)), + split_points))}, wf) diff --git a/python/algorithm/framework/horizontal/binning_woe_iv/label_trainer.py b/python/algorithm/framework/horizontal/binning_woe_iv/label_trainer.py new file mode 100644 index 0000000..bffa51d --- /dev/null +++ b/python/algorithm/framework/horizontal/binning_woe_iv/label_trainer.py @@ -0,0 +1,139 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + +import numpy as np +import pandas as pd + +from algorithm.core.data_io import CsvReader +from algorithm.core.horizontal.aggregation.api import get_aggregation_leaf_inst +from common.communication.gRPC.python.channel import DualChannel +from common.crypto.key_agreement.diffie_hellman import DiffieHellman +from common.utils.config_parser import TrainConfigParser +from common.utils.logger import logger +from typing import OrderedDict + +from service.fed_config import FedConfig + + +class HorizontalBinningWoeIvLabelTrainer(object): + def __init__(self, train_conf: dict): + self.config = TrainConfigParser(train_conf) + self.aggregation = self.config.train_params.get("aggregation", {}) + self.encryption = self.aggregation.get("encryption") + self.data = self.init_data() + self.y = pd.DataFrame(self.data.label(), columns=["y"]) + self.leaf_ids = FedConfig.get_label_trainer() + FedConfig.get_trainer() + # generate random number + if self.encryption["method"] == "otp" and len(self.leaf_ids) == 2: + # key exchange + key_exchange_conf = self.encryption["key_exchange"] + self.random_protocol = DiffieHellman(self.leaf_ids, + key_bitlength=key_exchange_conf['key_bitlength'], + optimized=key_exchange_conf["optimized"], + channel_name="diffie_hellman_random") + + self.entropys = self.random_protocol.exchange(out_bytes=True) + self.random_num = int.from_bytes(self.entropys[:16], "big") + else: + self.random_num = 1 + + self.dual_channel = { + "min_max": DualChannel(name="min_max_" + FedConfig.node_id, + ids=[FedConfig.get_assist_trainer()] + [FedConfig.node_id])} + # send bins to assist_trainer + self.dual_channel["min_max"].send(self.config.train_params["binning"]) + # init input bin_map + self.bin_map = list() + # init aggregation channel + self.fedagg_executor = get_aggregation_leaf_inst(sec_conf=self.encryption) + + def init_data(self): + if len(self.config.input_trainset) == 0: + return None + conf = self.config.input_trainset[0] + if conf["type"] == "csv": + path = os.path.join(conf['path'], conf['name']) + has_label = conf["has_label"] + has_id = conf['has_id'] + return CsvReader(path, has_id, has_label) + else: + return None + + def map_bin(self, x, split_point): + bin_map = list(range(1, len(split_point) + 1)) + split_tile = np.tile(split_point, (len(x), 1)) + index = np.sum(x.to_numpy().reshape(-1, 1) - split_tile > 0, 1) + self.bin_map.append([bin_map[i] for i in index]) + + def bin_group(self, col_name): + data_bin_y = pd.DataFrame(self.bin_map[col_name], columns=[col_name]).join(self.y) + tmp_count = data_bin_y.groupby([col_name])['y'].agg({'count', 'sum'}) + pos_bin_count = tmp_count['sum'] + neg_bin_count = tmp_count['count'] - tmp_count['sum'] + pos_bin_count.name = "pos" + neg_bin_count.name = "neg" + # transform initial group result to the same length + tmp_fill = pd.DataFrame(index=list(range(1, self.config.train_params["binning"]["bins"] + 1))) + pos_bin_count = tmp_fill.join(pos_bin_count).fillna(0) + neg_bin_count = tmp_fill.join(neg_bin_count).fillna(0) + return [pos_bin_count, neg_bin_count] + + def fit(self): + # calculate local min and max + logger.info("Calculate local min and max of initial data") + local_min = self.data.features().min(axis=0) + local_max = self.data.features().max(axis=0) + enc_local_min = local_min * abs(self.random_num) + enc_local_max = local_max * abs(self.random_num) + logger.info("Send local min and max to assist_trainer") + self.dual_channel["min_max"].send((FedConfig.node_id, {"min": enc_local_min, "max": enc_local_max})) + + # receive signal from assist_trainer + logger.info("Receive signal from assist_trainer") + signal = self.dual_channel["min_max"].recv() + min_sig = signal["min"] + max_sig = signal["max"] + min_send = [local_min[ind] if i else 0 for ind, i in enumerate(min_sig)] + max_send = [local_max[ind] if j else 0 for ind, j in enumerate(max_sig)] + + # send final min and max to assist_trainer + logger.info("Send final min and max to assist_trainer") + self.dual_channel["min_max"].send({"min": min_send, "max": max_send}) + + # receive split points from assist_trainer + logger.info("Receive split points from assist_trainer") + split_points = self.dual_channel["min_max"].recv() + + # map input into bins + logger.info("Map raw data to bins") + data_df = pd.DataFrame(self.data.features()) + map_tmp = list(range(len(split_points))) + pd.Series(map_tmp).apply(lambda x: self.map_bin(data_df[x], split_points[x][1:])) + + # calculate pos_num and neg_num + logger.info("Calculate pos_num and neg_num") + data_bins_df = pd.DataFrame(self.bin_map).T + pos_neg_bin = list() + pd.Series(data_bins_df.columns).apply(lambda x: pos_neg_bin.append(self.bin_group(x))) + pos_bin = [np.array(i[0]) for i in pos_neg_bin] + neg_bin = [np.array(i[1]) for i in pos_neg_bin] + + # send pos_neg_bin to assist_trainer + pos_bin_dict = OrderedDict(zip(range(len(pos_bin)), pos_bin)) + neg_bin_dict = OrderedDict(zip(range(len(neg_bin)), neg_bin)) + self.fedagg_executor.upload(pos_bin_dict, self.aggregation["weight"]) + self.fedagg_executor.upload(neg_bin_dict, self.aggregation["weight"]) diff --git a/python/algorithm/framework/local/data_statistic/__init__.py b/python/algorithm/framework/local/data_statistic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/algorithm/framework/local/data_statistic/label_trainer.py b/python/algorithm/framework/local/data_statistic/label_trainer.py new file mode 100644 index 0000000..88457ba --- /dev/null +++ b/python/algorithm/framework/local/data_statistic/label_trainer.py @@ -0,0 +1,170 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import os +from pathlib import Path + +import numpy as np +import pandas as pd + +from algorithm.core.data_io import CsvReader +from common.utils.config_parser import TrainConfigParser +from common.utils.logger import logger + + +def float_transform(data): + if isinstance(data, pd.Series): + return data.apply(lambda x: float("%.6f" % x)) + elif isinstance(data, pd.DataFrame): + col_name = data.columns[0] + return pd.DataFrame(data[col_name].apply(lambda x: float("%.6f" % x))) + + +class LocalDataStatisticLabelTrainer: + def __init__(self, train_conf): + """ + support data statistic: + If more than one file, their header should be the same. + If there are missing values, they will be dropped before data statistic + quantile: list of float, if not set, quartile will be calculated + """ + # input config + self.config = TrainConfigParser(train_conf) + self.input_data = self.config.input.get("dataset", []) + self.missing_value = [np.NaN, "", None, " ", "nan", "none", "null", "na", "None"] + if self.input_data: + if len(self.input_data) == 1: + self.input_data_path = self.input_data[0].get("path") + self.input_data_name = self.input_data[0].get("name") + self.input_data_id = self.input_data[0].get("has_id", False) + self.input_data_label = self.input_data[0].get("has_label", False) + data_reader = CsvReader(path=os.path.join(self.input_data_path, self.input_data_name), + has_id=self.input_data_id, has_label=self.input_data_label) + self.data = data_reader.table.set_index(data_reader.ids) + else: + self.data = pd.DataFrame() + for dataset_conf in self.input_data: + input_data_path = dataset_conf.get("path") + input_data_name = dataset_conf.get("name") + input_data_id = dataset_conf.get("has_id", False) + self.input_data_label = dataset_conf.get("has_label", False) + data_reader = CsvReader(path=os.path.join(input_data_path, input_data_name), + has_id=input_data_id, has_label=self.input_data_label) + data = data_reader.table.set_index(data_reader.ids) + self.data = pd.concat([self.data, data]) + # drop label + if self.input_data_label: + self.y = pd.DataFrame(self.data.iloc[:, 0]) + self.data = self.data.iloc[:, 1:] + + # output config + self.output_flag = self.config.output.get("summary", None) + if self.output_flag is not None: + self.output_path = self.config.output["path"] + self.output_name = self.config.output["summary"]["name"] + self.output_path_name = Path(self.output_path, self.output_name) + if not os.path.exists(Path(self.output_path)): + Path(self.output_path).mkdir(parents=True, exist_ok=True) + # init summary dict + self.summary_dict = {} + self.indicators = ["mean", "median", "missing_ratio", "min", "max", "variance", "std", "quantile", + "skewness", "kurtosis"] + for i in self.indicators: + self.summary_dict[i] = {} + # missing value flag + self.missing_flag = dict(zip(self.data.columns, [False] * len(self.data.columns))) + # quantile config + self.quantile = self.config.train_params.get("quantile", [0.25, 0.5, 0.75]) + + def data_overview(self): + data_shape = np.shape(self.data) + self.summary_dict.update({"row_num": data_shape[0]}) + self.summary_dict.update({"column_num": data_shape[1]}) + self.summary_dict.update({"feature_names": list(self.data.columns)}) + + logger.info("The shape of input data is {}*{}".format(data_shape[0], data_shape[1])) + + def missing_overview(self): + + def missing_count(feat): + tmp = np.sum(self.data[feat].isin(self.missing_value)) + if tmp > 0: + self.missing_flag[feat] = True + self.summary_dict["missing_ratio"][feat] = float("%.6f" % (tmp / self.summary_dict["row_num"])) + # replace all missing values to np.NaN + self.data[feat] = self.data[feat].replace(self.missing_value, np.NaN) + + pd.Series(self.data.columns).apply(lambda x: missing_count(x)) + + def label_overview(self): + if self.input_data_label: + label_name = self.y.columns[0] + self.summary_dict.update({"label_num": self.y.groupby(label_name)[label_name].count().to_dict()}) + + def get_mean(self, df): + self.summary_dict["mean"].update(float_transform(df.mean()).to_dict()) + + def get_median(self, df): + self.summary_dict["median"].update(float_transform(df.median()).to_dict()) + + def get_min_max(self, df): + self.summary_dict["min"].update(float_transform(df.min()).to_dict()) + self.summary_dict["max"].update(float_transform(df.max()).to_dict()) + + def get_variance(self, df): + self.summary_dict["variance"].update(float_transform(df.var()).to_dict()) + + def get_std(self, df): + self.summary_dict["std"].update(float_transform(df.std()).to_dict()) + + def get_quantile(self, df): + self.summary_dict["quantile"].update(float_transform(df.quantile(self.quantile)).to_dict()) + + def get_skewness(self, df): + self.summary_dict["skewness"].update(float_transform(df.skew()).to_dict()) + + def get_kurtosis(self, df): + self.summary_dict["kurtosis"].update(float_transform(df.kurtosis()).to_dict()) + + def fit(self): + self.data_overview() + self.missing_overview() + self.label_overview() + + def feat_handle(feat): + if self.missing_flag[feat]: + data = pd.DataFrame(self.data[feat].dropna().apply(lambda x: eval(x))) + else: + data = self.data[[feat]] + return data + + def feat_statistic(feat): + feat_ = feat_handle(feat) + self.get_mean(feat_) + self.get_median(feat_) + self.get_min_max(feat_) + self.get_variance(feat_) + self.get_std(feat_) + self.get_quantile(feat_) + self.get_skewness(feat_) + self.get_kurtosis(feat_) + logger.info("Feature {} calculated!".format(feat)) + + pd.Series(self.data.columns).apply(lambda x: feat_statistic(x)) + # save + if self.output_flag is not None: + with open(self.output_path_name, "w") as wf: + json.dump(self.summary_dict, wf) diff --git a/python/algorithm/framework/local/data_statistic/trainer.py b/python/algorithm/framework/local/data_statistic/trainer.py new file mode 100644 index 0000000..b676862 --- /dev/null +++ b/python/algorithm/framework/local/data_statistic/trainer.py @@ -0,0 +1,19 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .label_trainer import LocalDataStatisticLabelTrainer + + +LocalDataStatisticTrainer = LocalDataStatisticLabelTrainer diff --git a/python/algorithm/framework/vertical/linear_regression/__init__.py b/python/algorithm/framework/vertical/linear_regression/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/algorithm/framework/vertical/linear_regression/assist_trainer.py b/python/algorithm/framework/vertical/linear_regression/assist_trainer.py new file mode 100644 index 0000000..f725037 --- /dev/null +++ b/python/algorithm/framework/vertical/linear_regression/assist_trainer.py @@ -0,0 +1,144 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import numpy as np +import tenseal as ts + +from common.communication.gRPC.python.channel import BroadcastChannel, DualChannel +from common.crypto.paillier.paillier import Paillier +from common.utils.logger import logger +from service.fed_config import FedConfig + + +class VerticalLinearRegressionAssistTrainer(object): + def __init__(self, *args, **kwargs): + """[summary] + assist_trainer + """ + self.broadcast_channel = BroadcastChannel(name="Public keys", root_id=FedConfig.get_assist_trainer()) + self.dual_channels = {"gradients_loss": {}} + self.party_id_list = FedConfig.get_label_trainer() + FedConfig.get_trainer() + for party_id in self.party_id_list: + self.dual_channels["gradients_loss"][party_id] = DualChannel(name="gradients_loss_" + party_id, + ids=[FedConfig.node_id, party_id]) + self.batch_num = self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].recv() + self.global_epoch = self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].recv() + self.batch_size = self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].recv() + self.encryption_config = self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].recv() + self.encryption_method = self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].recv() + self.private_context = None + self.public_context = None + # send encryption key to all parties + if self.encryption_method == "ckks": + self.private_context = ts.context( + ts.SCHEME_TYPE.CKKS, + poly_modulus_degree=self.encryption_config[self.encryption_method]["poly_modulus_degree"], + coeff_mod_bit_sizes=self.encryption_config[self.encryption_method]["coeff_mod_bit_sizes"] + ) + self.private_context.generate_galois_keys() + self.private_context.generate_relin_keys() + self.private_context.global_scale = 1 << self.encryption_config[self.encryption_method][ + "global_scale_bit_size"] + + serialized_public_context = self.private_context.serialize( + save_public_key=True, + save_secret_key=False, + save_galois_keys=True, + save_relin_keys=True + ) + logger.info("Broadcast ckks public keys.") + self.public_context_ser = serialized_public_context + self.broadcast_channel.broadcast(self.public_context_ser, use_pickle=False) + logger.info("Broadcast completed.") + elif self.encryption_method == "paillier": + self.num_cores = -1 if self.encryption_config[self.encryption_method]["parallelize_on"] else 1 + self.private_context = Paillier.context(self.encryption_config[self.encryption_method]["key_bit_size"], + djn_on=self.encryption_config[self.encryption_method]["djn_on"]) + logger.info("Broadcast paillier public keys.") + self.public_context_ser = self.private_context.to_public().serialize() + self.broadcast_channel.broadcast(self.public_context_ser, use_pickle=False) + logger.info("Broadcast completed.") + elif self.encryption_method == "plain": + pass + else: + raise ValueError(f"Encryption method {self.encryption_method} not supported! Valid methods are 'paillier', " + f"'ckks', 'plain'.") + + def fit(self): + """ train model + Model parameters need to be updated before fitting. + """ + # send encryption key to all parties + if self.encryption_method in ["ckks", "paillier"]: + logger.info("Broadcast ckks public keys.") + self.broadcast_channel.broadcast(self.public_context_ser, use_pickle=False) + logger.info("Broadcast completed.") + + # train + for epoch in range(1, self.global_epoch + 1): + for batch_idx in range(self.batch_num): + # receive and decrypt total encrypted loss and send to label_trainer + logger.info("Receive and decrypted total loss and send back to label_trainer.") + if self.encryption_method == "ckks": + enc_loss_batch = self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].recv( + use_pickle=False) + decrypted_loss_batch = ts.ckks_vector_from(self.private_context, enc_loss_batch).decrypt()[0] + elif self.encryption_method == "paillier": + enc_loss_batch = self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].recv( + use_pickle=False) + decrypted_loss_batch = Paillier.decrypt(self.private_context, Paillier.ciphertext_from( + None, enc_loss_batch), dtype='float', num_cores=self.num_cores) + elif self.encryption_method == "plain": + enc_loss_batch = self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].recv() + decrypted_loss_batch = enc_loss_batch + self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].send(decrypted_loss_batch) + logger.info( + "Loss of {} batch {} epoch is {}".format(batch_idx, epoch, decrypted_loss_batch / self.batch_size)) + + # receive encrypted noised gradients from other parties and decrypt and send back to other parties + if self.encryption_method == "ckks" or self.encryption_method == "paillier": + # trainer + for party_id in FedConfig.get_trainer(): + en_noised_gradient_trainer_w = self.dual_channels["gradients_loss"][party_id].recv( + use_pickle=False) + if self.encryption_method == "ckks": + noised_gradient_trainer_w = ts.ckks_vector_from(self.private_context, + en_noised_gradient_trainer_w).decrypt() + elif self.encryption_method == "paillier": + noised_gradient_trainer_w = Paillier.decrypt(self.private_context, Paillier.ciphertext_from( + None, en_noised_gradient_trainer_w), dtype='float', num_cores=self.num_cores) + self.dual_channels["gradients_loss"][party_id].send(noised_gradient_trainer_w) + # label_trainer + en_noised_gradient_label_trainer_w = self.dual_channels["gradients_loss"][ + FedConfig.get_label_trainer()[0]].recv(use_pickle=False) + en_noised_gradient_label_trainer_b = self.dual_channels["gradients_loss"][ + FedConfig.get_label_trainer()[0]].recv(use_pickle=False) + if self.encryption_method == "ckks": + noised_gradient_label_trainer_w = ts.ckks_vector_from( + self.private_context, en_noised_gradient_label_trainer_w).decrypt() + noised_gradient_label_trainer_b = ts.ckks_vector_from( + self.private_context, en_noised_gradient_label_trainer_b).decrypt() + elif self.encryption_method == "paillier": + noised_gradient_label_trainer_w = Paillier.decrypt( + self.private_context, Paillier.ciphertext_from(None, en_noised_gradient_label_trainer_w), + dtype='float', num_cores=self.num_cores) + noised_gradient_label_trainer_b = Paillier.decrypt( + self.private_context, Paillier.ciphertext_from(None, en_noised_gradient_label_trainer_b), + dtype='float', num_cores=self.num_cores) + # calculate sum of gradient b + noised_gradient_label_trainer_b = np.sum(noised_gradient_label_trainer_b) + grad_send = {"noised_gradient_label_trainer_w": noised_gradient_label_trainer_w, + "noised_gradient_label_trainer_b": noised_gradient_label_trainer_b} + self.dual_channels["gradients_loss"][FedConfig.get_label_trainer()[0]].send(grad_send) diff --git a/python/algorithm/framework/vertical/linear_regression/base.py b/python/algorithm/framework/vertical/linear_regression/base.py new file mode 100644 index 0000000..43b69ff --- /dev/null +++ b/python/algorithm/framework/vertical/linear_regression/base.py @@ -0,0 +1,156 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + +import torch +import torch.nn as nn +from torch.utils.data import DataLoader, TensorDataset + +from algorithm.core.data_io import CsvReader +from algorithm.framework.vertical.vertical_model_base import VerticalModelBase +from common.utils.logger import logger +from common.utils.model_preserver import ModelPreserver +from service.fed_config import FedConfig + + +class VerticalLinearRegression(nn.Module): + def __init__(self, input_dim: int, bias: bool = False): + super(VerticalLinearRegression, self).__init__() + self.linear = torch.nn.Linear(input_dim, 1, bias=bias) + self.linear.requires_grad_(False) + + def forward(self, x): + return self.linear(x) + + +class VerticalLinearRegressionBase(VerticalModelBase): + def __init__(self, train_conf: dict, label: bool = False, *args, **kwargs): + """_summary_ + + Args: + train_conf (dict): _description_ + label (bool, optional): _description_. Defaults to False. + """ + super().__init__(train_conf) + self._parse_config() + self.train_conf = train_conf + self.label = label + self.data_dim = None + self.model = None + self.train_dataloader, self.eval_dataloader = None, None + if FedConfig.node_id != "assist_trainer": + self._init_dataloader() + + def _parse_config(self) -> None: + super()._parse_config() + self.model_name = self.model_info.get("name") + self.save_model_name = self.output.get("model", {}).get("name") + if not os.path.exists(self.save_dir): + os.makedirs(self.save_dir) + self.global_epoch = self.train_params.get("global_epoch") + self.batch_size = self.train_params.get("batch_size") + self.encryption_config = self.train_params.get("encryption") + self.optimizer_config = self.train_params.get("optimizer") + self.pretrain_model_path = self.input.get("pretrained_model", {}).get("path") + self.random_seed = self.train_params.get("random_seed", None) + self.early_stopping_config = self.train_params.get("early_stopping") + self.save_frequency = self.interaction_params.get("save_frequency") + + @staticmethod + def set_seed(seed): + torch.manual_seed(seed) + torch.cuda.manual_seed(seed) + + def _init_model(self, bias: bool = False) -> None: + """ + Init linear regression model. + Returns: None + """ + logger.info("Init model start.") + self.model = VerticalLinearRegression(input_dim=self.data_dim, bias=bias) + # Load pretrained model if needed. + if self.pretrain_model_path is not None and self.pretrain_model_path != "": + checkpoint = ModelPreserver.load(os.path.join(self.pretrain_model_path, self.input.get( + "pretrained_model").get("name", None))) + self.model.load_state_dict(checkpoint["state_dict"]) + logger.info("Init model completed.") + + def __load_data(self, config) -> CsvReader: + config = config[0] + if config["type"] == "csv": + data_reader = CsvReader(path=os.path.join(config["path"], config["name"]), has_id=config["has_id"], + has_label=config["has_label"]) + else: + raise NotImplementedError("Dataset type {} is not supported.".format(config["type"])) + return data_reader + + def _init_data(self) -> None: + if len(self.input_trainset) > 0: + data: CsvReader = self.__load_data(self.input_trainset) + self.train = data.features() + self.train_label = data.label() + self.train_ids = data.ids + else: + raise NotImplementedError("Trainset was not configured.") + if self.label: + assert len(self.train) == len(self.train_label) + + if len(self.input_valset) > 0: + data: CsvReader = self.__load_data(self.input_valset) + self.val = data.features() + self.val_label = data.label() + self.val_ids = data.ids + if self.label: + assert len(self.val) == len(self.val_label) + + def _init_dataloader(self) -> None: + """ + Load raw data. + Returns: + + """ + logger.info("Dataloader initiation start.") + self._init_data() + if self.label: + self.train_dataloader = DataLoader( + dataset=TensorDataset(torch.tensor(self.train, dtype=torch.float32), + torch.unsqueeze(torch.tensor(self.train_label), dim=-1), + torch.unsqueeze(torch.tensor(self.train_ids), dim=-1)), + batch_size=self.batch_size, shuffle=True + ) + self.val_dataloader = DataLoader( + dataset=TensorDataset(torch.tensor(self.val, dtype=torch.float32), + torch.unsqueeze(torch.tensor(self.val_label), dim=-1), + torch.unsqueeze(torch.tensor(self.val_ids), dim=-1)), + batch_size=self.batch_size, shuffle=False + ) + self.data_dim = torch.tensor(self.train).shape[-1] + logger.info("Train data shape: {}.".format(list(torch.tensor(self.train).shape))) + else: + self.train_dataloader = DataLoader( + dataset=TensorDataset(torch.tensor(self.train, dtype=torch.float32), + torch.unsqueeze(torch.tensor(self.train_ids), dim=-1)), + batch_size=self.batch_size, shuffle=True + ) + self.val_dataloader = DataLoader( + dataset=TensorDataset(torch.tensor(self.val, dtype=torch.float32), + torch.unsqueeze(torch.tensor(self.val_ids), dim=-1)), + batch_size=self.batch_size, shuffle=False + ) + self.data_dim = torch.tensor(self.train).shape[-1] + logger.info("Train data shape: {}.".format(list(torch.tensor(self.train).shape))) + + logger.info("Dataloader initiation completed.") diff --git a/python/algorithm/framework/vertical/linear_regression/label_trainer.py b/python/algorithm/framework/vertical/linear_regression/label_trainer.py new file mode 100644 index 0000000..8e702d3 --- /dev/null +++ b/python/algorithm/framework/vertical/linear_regression/label_trainer.py @@ -0,0 +1,402 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import copy +import secrets +from functools import reduce +from pathlib import Path + +import numpy as np +import pandas as pd +import tenseal as ts +import torch + +from common.communication.gRPC.python.channel import BroadcastChannel, DualChannel +from common.crypto.paillier.paillier import Paillier +from common.utils.algo_utils import earlyStopping +from common.utils.logger import logger +from common.utils.model_preserver import ModelPreserver +from common.utils.utils import save_model_config +from service.fed_config import FedConfig +from service.fed_node import FedNode +from .base import VerticalLinearRegressionBase + + +class VerticalLinearRegressionLabelTrainer(VerticalLinearRegressionBase): + def __init__(self, train_conf: dict, *args, **kwargs): + """ + Vertical Linear Regression + Args: + train_conf: training parameters + *args: + **kwargs: + """ + super().__init__(train_conf, label=True, *args, **kwargs) + self._init_model(bias=True) + self.export_conf = [{ + "class_name": "VerticalLinearRegression", + "identity": self.identity, + "filename": self.save_model_name, + "input_dim": self.data_dim, + "bias": True + }] + if self.random_seed: + self.set_seed(self.random_seed) + self.es = earlyStopping(key=self.early_stopping_config["key"], + patience=self.early_stopping_config["patience"], + delta=self.early_stopping_config["delta"]) + self.best_model = None + self.broadcast_channel = BroadcastChannel(name="Public keys", root_id=FedConfig.get_assist_trainer()) + self.dual_channels = {"intermediate_label_trainer": {}, "gradients_loss": None} + for party_id in FedConfig.get_trainer(): + self.dual_channels["intermediate_label_trainer"][party_id] = DualChannel( + name="intermediate_label_trainer_" + party_id, ids=[FedConfig.node_id, party_id]) + self.dual_channels["gradients_loss"] = DualChannel(name="gradients_loss_" + FedConfig.node_id, + ids=[FedConfig.get_assist_trainer()] + [FedConfig.node_id]) + self.train_result = None + self.val_result = None + self.dual_channels["gradients_loss"].send(len(self.train_dataloader)) + self.dual_channels["gradients_loss"].send(self.global_epoch) + self.dual_channels["gradients_loss"].send(self.batch_size) + self.encryption_method = list(self.encryption_config.keys())[0].lower() + self.dual_channels["gradients_loss"].send(self.encryption_config) + self.dual_channels["gradients_loss"].send(self.encryption_method) + + def predict(self, input_data): + pred_prob_epoch, y_epoch = [], [] + for batch_idx, (x_batch, y_batch, _) in enumerate(input_data): + pred_trainer_list = [] + pred_label_trainer = self.model(x_batch).numpy().astype(np.float32).flatten() + for party_id in FedConfig.get_trainer(): + pred_trainer_list.append(self.dual_channels["intermediate_label_trainer"][party_id].recv( + use_pickle=True)) + # calculate prediction of batch + pred_total = pred_label_trainer + reduce(lambda x, y: x + y, pred_trainer_list) + # calculate prediction of epoch + pred_prob_epoch += pred_total.tolist() + y_epoch += y_batch.numpy().astype(np.float32).flatten().tolist() + return y_epoch, pred_prob_epoch + + def fit(self): + self.check_data() + public_context = None + num_cores = -1 + rng = secrets.SystemRandom() + + logger.info("Vertical linear regression training start") + # receive encryption key from assist trainer + if self.encryption_method == "ckks": + logger.info("Receive ckks public key.") + public_context = self.broadcast_channel.recv(use_pickle=False) + public_context = ts.context_from(public_context) + logger.info("Public key received.") + elif self.encryption_method == "paillier": + logger.info("Receive paillier public key.") + public_context = self.broadcast_channel.recv(use_pickle=False) + public_context = Paillier.context_from(public_context) + logger.info("Public key received.") + elif self.encryption_method == "plain": + pass + else: + raise ValueError(f"Encryption method {self.encryption_method} not supported! Valid methods are " + f"'paillier', 'ckks', 'plain'.") + # train + for epoch in range(1, self.global_epoch + 1): + loss_epoch = 0 + + for batch_idx, (x_batch, y_batch, _) in enumerate(self.train_dataloader): + pred_trainer = [] + loss_trainer = [] + loss_between_trainer = 0 + enc_pred_residual = None + enc_loss_label_trainer = None + regular_loss_tmp = 0 + regular_gradient_tmp = 0 + enc_regular_gradient_tmp = 0 + # calculate regular results + if self.optimizer_config['p'] == 1: + regular_loss_tmp = torch.abs(self.model.linear.weight).sum() * self.optimizer_config['alpha'] + regular_gradient_tmp = self.optimizer_config['alpha'] * (torch.abs(self.model.linear.weight) + / self.model.linear.weight) + elif self.optimizer_config['p'] == 2: + regular_loss_tmp = (self.model.linear.weight ** 2).sum() * self.optimizer_config['alpha'] / 2 + regular_gradient_tmp = self.optimizer_config['alpha'] * self.model.linear.weight + elif self.optimizer_config['p'] == 0: + pass + + # compute theta_scheduler * label_trainer and loss of label_trainer + logger.info("Calculate intermediate result of label trainer.") + pred_label_trainer = self.model(x_batch) + pred_residual = pred_label_trainer - y_batch + + # receive intermediate results from trainers + for party_id in FedConfig.get_trainer(): + if self.encryption_method == "ckks": + pred_trainer.append(ts.ckks_vector_from(public_context, self.dual_channels[ + "intermediate_label_trainer"][party_id].recv(use_pickle=False))) + loss_trainer.append(ts.ckks_vector_from(public_context, self.dual_channels[ + "intermediate_label_trainer"][party_id].recv(use_pickle=False))) + elif self.encryption_method == "paillier": + pred_trainer.append(Paillier.ciphertext_from(public_context, self.dual_channels[ + "intermediate_label_trainer"][party_id].recv(use_pickle=False))) + loss_trainer.append(Paillier.ciphertext_from(public_context, self.dual_channels[ + "intermediate_label_trainer"][party_id].recv(use_pickle=False))) + elif self.encryption_method == "plain": + pred_trainer.append(self.dual_channels["intermediate_label_trainer"][party_id].recv()) + loss_trainer.append(self.dual_channels["intermediate_label_trainer"][party_id].recv()) + logger.info("Received predictions from trainers, length of collect list is {}." + .format(len(pred_trainer))) + + # calculate total loss + logger.info("Calculate total loss.") + square_tmp = (pred_residual ** 2).sum() / 2 + loss_label_trainer = square_tmp + regular_loss_tmp + if self.encryption_method == "ckks": + loss_between_label_trainer = np.sum([pred_t.matmul(pred_residual.numpy()) for pred_t in pred_trainer + ]) + else: + loss_between_label_trainer = np.sum(pred_residual.numpy().flatten() * pred_trainer + ) + # calculate total loss_between_trainer when there are more than one trainer + if len(pred_trainer) > 1: + if self.encryption_method == "plain": + loss_between_trainer = np.sum([np.sum(i * j) if ind_i != ind_j else 0 + for ind_i, i in enumerate(pred_trainer) + for ind_j, j in enumerate(pred_trainer)]) / 2 + elif self.encryption_method == "ckks": + loss_between_trainer = np.sum([i.dot(j) if ind_i != ind_j else 0 + for ind_i, i in enumerate(pred_trainer) + for ind_j, j in enumerate(pred_trainer)]) * 0.5 + elif self.encryption_method == "paillier": + loss_between_trainer = [] + for party_id in FedConfig.get_trainer(): + tmp = self.dual_channels["intermediate_label_trainer"][party_id].recv(use_pickle=False) + tmp = Paillier.ciphertext_from(public_context, tmp) + loss_between_trainer.append(tmp) + loss_between_trainer = np.sum(loss_between_trainer) / 2 + + if self.encryption_method == "ckks": + enc_loss_label_trainer = ts.ckks_vector(public_context, + loss_label_trainer.numpy().astype(np.float32).flatten()) + elif self.encryption_method == "paillier": + enc_loss_label_trainer = Paillier.encrypt(public_context, + float(loss_label_trainer), + precision=self.encryption_config[self.encryption_method][ + "precision"], + obfuscation=True, + num_cores=num_cores) + elif self.encryption_method == "plain": + enc_loss_label_trainer = loss_label_trainer + enc_loss_batch = loss_between_trainer + loss_between_label_trainer + enc_loss_label_trainer + np.sum( + loss_trainer) + + # send total loss to assist_trainer + logger.info("Send total loss to assist_trainer.") + if self.encryption_method == "ckks": + self.dual_channels["gradients_loss"].send(enc_loss_batch.serialize(), use_pickle=False) + elif self.encryption_method == "paillier": + self.dual_channels["gradients_loss"].send(Paillier.serialize(enc_loss_batch), use_pickle=False) + elif self.encryption_method == "plain": + self.dual_channels["gradients_loss"].send(enc_loss_batch) + # receive decrypted loss from assist_trainer + logger.info("Receive total loss from assist_trainer.") + loss_batch = self.dual_channels["gradients_loss"].recv() + loss_batch = loss_batch / x_batch.shape[0] + logger.info("Loss of {} batch is {}".format(batch_idx, loss_batch)) + loss_epoch += loss_batch * x_batch.shape[0] + + # calculate intermediate result d + logger.info("Calculate intermediate result d.") + pred_rest_trainer = reduce(lambda x, y: x + y, pred_trainer) + if self.encryption_method == "ckks": + enc_pred_residual = ts.ckks_vector(public_context, + pred_residual.numpy().astype(np.float32).flatten()) + elif self.encryption_method == "paillier": + enc_pred_residual = Paillier.encrypt(public_context, + pred_residual.numpy().astype(np.float32).flatten(), + precision=self.encryption_config[self.encryption_method][ + "precision"], obfuscation=True, num_cores=num_cores) + elif self.encryption_method == "plain": + enc_pred_residual = pred_residual.numpy().astype(np.float32).flatten() + enc_d = enc_pred_residual + pred_rest_trainer + + # send intermediate result d to trainer + logger.info("Send intermediate result d to trainer.") + for party_id in FedConfig.get_trainer(): + if self.encryption_method == "ckks": + self.dual_channels["intermediate_label_trainer"][party_id].send(enc_d.serialize(), + use_pickle=False) + elif self.encryption_method == "paillier": + self.dual_channels["intermediate_label_trainer"][party_id].send(Paillier.serialize(enc_d), + use_pickle=False) + elif self.encryption_method == "plain": + self.dual_channels["intermediate_label_trainer"][party_id].send(enc_d) + # calculate gradient for label_trainer + logger.info("Calculate gradients for label_trainer.") + if self.encryption_method == "ckks": + enc_regular_gradient_tmp = ts.ckks_vector(public_context, + regular_gradient_tmp.numpy().astype(np.float32).flatten()) + elif self.encryption_method == "paillier": + enc_regular_gradient_tmp = Paillier.encrypt( + public_context, regular_gradient_tmp.numpy().astype(np.float32).flatten(), + precision=self.encryption_config[self.encryption_method]["precision"], + obfuscation=True, num_cores=num_cores) + elif self.encryption_method == "plain": + enc_regular_gradient_tmp = regular_gradient_tmp.numpy().astype(np.float32).flatten() + + if self.encryption_method == "ckks": + gradient_label_trainer_w = enc_d.matmul(x_batch.numpy()) + enc_regular_gradient_tmp + else: + gradient_label_trainer_w = np.matmul(enc_d.reshape(1, len(enc_d)), x_batch.numpy() + ) + enc_regular_gradient_tmp + gradient_label_trainer_b = enc_d + + if self.encryption_method == "ckks": + # add noise to encrypted gradients and send to assist_trainer + logger.info("Calculate noised gradients for label_trainer.") + noise = np.array([rng.randint(1 << 24, 1 << 26) - (1 << 25) for _ in range(x_batch.shape[1])], + dtype=np.float32) + noise /= 100000 + noise_b = np.array([rng.randint(1 << 24, 1 << 26) - (1 << 25) for _ in range(x_batch.shape[0])], + dtype=np.float32) + noise_b /= 100000 + noised_gradient_label_trainer_w = gradient_label_trainer_w + noise + noised_gradient_label_trainer_b = gradient_label_trainer_b + noise_b + logger.info("Send noised gradient to assist_trainer.") + self.dual_channels["gradients_loss"].send(noised_gradient_label_trainer_w.serialize(), + use_pickle=False) + self.dual_channels["gradients_loss"].send(noised_gradient_label_trainer_b.serialize(), + use_pickle=False) + # receive decrypted gradient from assist_trainer + logger.info("Receive decrypted gradient from assist_trainer.") + noised_decrypt_gradient = self.dual_channels["gradients_loss"].recv() + noised_decrypt_gradient_label_trainer_w = noised_decrypt_gradient["noised_gradient_label_trainer_w"] + noised_decrypt_gradient_label_trainer_b = noised_decrypt_gradient["noised_gradient_label_trainer_b"] + gradient_label_trainer_w = noised_decrypt_gradient_label_trainer_w - noise + gradient_label_trainer_b = noised_decrypt_gradient_label_trainer_b - np.sum(noise_b) + elif self.encryption_method == "paillier": + # add noise to encrypted gradients and send to assist_trainer + logger.info("Calculate noised gradients for label_trainer.") + noise = np.array([rng.randint(1 << 24, 1 << 26) - (1 << 25) for _ in range(x_batch.shape[1])], + dtype=np.float32) + noise /= 100000 + noise_b = np.array([rng.randint(1 << 24, 1 << 26) - (1 << 25) for _ in range(x_batch.shape[0])], + dtype=np.float32) + noise_b /= 100000 + noised_gradient_label_trainer_w = gradient_label_trainer_w + noise + noised_gradient_label_trainer_b = gradient_label_trainer_b + noise_b + logger.info("Send noised gradient to assist_trainer.") + self.dual_channels["gradients_loss"].send(Paillier.serialize(noised_gradient_label_trainer_w), + use_pickle=False) + self.dual_channels["gradients_loss"].send(Paillier.serialize(noised_gradient_label_trainer_b), + use_pickle=False) + # receive decrypted gradient from assist_trainer + logger.info("Receive decrypted gradient from assist_trainer.") + noised_decrypt_gradient = self.dual_channels["gradients_loss"].recv() + noised_decrypt_gradient_label_trainer_w = noised_decrypt_gradient["noised_gradient_label_trainer_w"] + noised_decrypt_gradient_label_trainer_b = noised_decrypt_gradient["noised_gradient_label_trainer_b"] + gradient_label_trainer_w = noised_decrypt_gradient_label_trainer_w - noise + gradient_label_trainer_b = noised_decrypt_gradient_label_trainer_b - np.sum(noise_b) + elif self.encryption_method == "plain": + gradient_label_trainer_b = gradient_label_trainer_b.sum() + + # update w and b of label_trainer + gradient_label_trainer_w = gradient_label_trainer_w / x_batch.shape[0] + gradient_label_trainer_b = gradient_label_trainer_b / x_batch.shape[0] + logger.info("Update weights of label trainer.") + self.model.linear.weight -= (torch.FloatTensor(gradient_label_trainer_w) * self.optimizer_config["lr"]) + self.model.linear.bias -= (gradient_label_trainer_b * self.optimizer_config["lr"]) + + loss_epoch = loss_epoch * (1 / len(self.train)) + logger.info("Loss of {} epoch is {}".format(epoch, loss_epoch)) + + # predict train and val results for metrics + logger.info("Predict train weights of label trainer.") + self.train_result = self.predict(self.train_dataloader) + loss_train_met = {"loss": loss_epoch} + self._calc_metrics(np.array(self.train_result[1], dtype=float), np.array(self.train_result[0]), + epoch, stage="train", loss=loss_train_met) + logger.info("Predict val weights of label trainer.") + self.val_result = self.predict(self.val_dataloader) + val_residual = np.array(self.val_result[1]) - np.array(self.val_result[0]) + loss_val_met = {"loss": np.mean((val_residual ** 2) / 2)} # no regular + val_metrics = self._calc_metrics(np.array(self.val_result[1], dtype=float), np.array(self.val_result[0]), + epoch, stage="val", loss=loss_val_met) + + # early stopping + val_metrics["loss"] = - val_metrics["loss"] + if self.early_stopping_config["patience"] > 0: + early_stop_flag, best_model_flag = self.es(val_metrics) + else: + early_stop_flag, best_model_flag = False, True + + # update best model + if best_model_flag: + self.best_model = copy.deepcopy(self.model) + # send flags to trainers + for party_id in FedConfig.get_trainer(): + self.dual_channels["intermediate_label_trainer"][party_id].send( + [early_stop_flag, best_model_flag, self.early_stopping_config["patience"]], use_pickle=True) + # if need to save results by epoch + if self.save_frequency > 0 and epoch % self.save_frequency == 0: + ModelPreserver.save(save_dir=self.save_dir, model_name=self.save_model_name, + state_dict=self.model.state_dict(), epoch=epoch) + # if early stopping, break + if early_stop_flag: + break + + # save model for infer + save_model_config(stage_model_config=self.export_conf, save_path=Path(self.save_dir)) + # if not early stopping, save probabilities and model + self._save_prob() + ModelPreserver.save(save_dir=self.save_dir, model_name=self.save_model_name, + state_dict=self.best_model.state_dict(), final=True) + # calculate feature importance + self._save_feature_importance(self.dual_channels) + + def _save_prob(self): + if self.interaction_params.get("write_training_prediction"): + self._write_prediction(self.train_result[1], self.train_result[0], self.train_ids.tolist(), + stage="train", final=True) + if self.interaction_params.get("write_validation_prediction"): + self._write_prediction(self.val_result[1], self.val_result[0], self.val_ids.tolist(), + stage="val", final=True) + + def check_data(self): + dim_channel = BroadcastChannel(name="check_data_com", ids=[FedConfig.node_id] + FedConfig.get_trainer()) + n = self.data_dim + dims = dim_channel.collect() + for dim in dims: + n += dim + if n <= 0: + raise ValueError("Number of the feature is zero. Stop training.") + + def _save_feature_importance(self, channel): + res = {"owner_id": [], "fid": [], "importance": []} + other_weight_list = [] + for party_id in FedConfig.get_trainer(): + other_weight_list.append(channel["intermediate_label_trainer"][party_id].recv(use_pickle=True)) + for (owner_id, weights) in other_weight_list: + for fid, weight in enumerate(weights): + res["owner_id"].append(owner_id) + res["fid"].append(fid) + res["importance"].append(float(weight)) + for fid, weight in enumerate(self.best_model.state_dict()["linear.weight"][0]): + res["owner_id"].append(FedNode.node_id) + res["fid"].append(fid) + res["importance"].append(float(weight)) + res = pd.DataFrame(res).sort_values(by="importance", key=lambda col: np.abs(col), ascending=False) + res.to_csv(Path(self.save_dir, self.output["feature_importance"]["name"]), header=True, index=False, + float_format="%.6g") diff --git a/python/algorithm/framework/vertical/linear_regression/trainer.py b/python/algorithm/framework/vertical/linear_regression/trainer.py new file mode 100644 index 0000000..4e1be57 --- /dev/null +++ b/python/algorithm/framework/vertical/linear_regression/trainer.py @@ -0,0 +1,268 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import copy +import secrets +from pathlib import Path + +import numpy as np +import tenseal as ts +import torch + +from common.communication.gRPC.python.channel import BroadcastChannel, DualChannel +from common.crypto.paillier.paillier import Paillier +from common.utils.logger import logger +from service.fed_config import FedConfig +from service.fed_node import FedNode +from common.utils.model_preserver import ModelPreserver +from common.utils.utils import save_model_config +from .base import VerticalLinearRegressionBase + + +class VerticalLinearRegressionTrainer(VerticalLinearRegressionBase): + def __init__(self, train_conf: dict, *args, **kwargs): + """[summary] + + Args: + train_conf (dict): [description] + """ + super().__init__(train_conf, label=False, *args, **kwargs) + self._init_model() + self.export_conf = [{ + "class_name": "VerticalLinearRegression", + "identity": self.identity, + "filename": self.save_model_name, + "input_dim": self.data_dim, + "bias": False + }] + if self.random_seed: + self.set_seed(self.random_seed) + self.best_model = None + self.broadcast_channel = BroadcastChannel(name="Public keys", root_id=FedConfig.get_assist_trainer()) + if len(FedConfig.get_trainer()) > 1: + self.broadcast_trainer = BroadcastChannel(name="Trainer exchange", root_id=FedConfig.node_id, + ids=FedConfig.get_trainer()) + self.dual_channels = { + "intermediate_label_trainer": DualChannel(name="intermediate_label_trainer_" + FedConfig.node_id, + ids=FedConfig.get_label_trainer() + [FedConfig.node_id]), + "gradients_loss": DualChannel(name="gradients_loss_" + FedConfig.node_id, + ids=[FedConfig.get_assist_trainer()] + [FedConfig.node_id]) + } + + def predict(self, input_data): + for batch_idx, (x_batch, _) in enumerate(input_data): + # calculate prediction of batch + pred_trainer = self.model(x_batch) + # send to label_trainer + self.dual_channels["intermediate_label_trainer"].send(pred_trainer.numpy().astype(np.float32).flatten(), + use_pickle=True) + + def fit(self): + """ train model + Model parameters need to be updated before fitting. + """ + self.check_data() + num_cores = -1 + encryption_config = self.encryption_config + encryption_method = list(self.encryption_config.keys())[0].lower() + + logger.info("Vertical linear regression training start") + # receive encryption key from assist trainer + public_context = None + + if encryption_method == "ckks": + logger.info("Receive ckks public key.") + public_context = self.broadcast_channel.recv(use_pickle=False) + public_context = ts.context_from(public_context) + logger.info("Public key received.") + elif encryption_method == "paillier": + logger.info("Receive paillier public key.") + public_context = self.broadcast_channel.recv(use_pickle=False) + public_context = Paillier.context_from(public_context) + logger.info("Public key received.") + elif encryption_method == "plain": + pass + else: + raise ValueError( + f"Encryption method {encryption_method} not supported! Valid methods are 'paillier', 'ckks', 'plain'.") + + rng = secrets.SystemRandom() + # train + for epoch in range(1, self.global_epoch + 1): + for batch_idx, (x_batch, _) in enumerate(self.train_dataloader): + regular_loss_tmp = 0 + regular_gradient_tmp = 0 + enc_regular_gradient_tmp = 0 + # calculate regular results + if self.optimizer_config['p'] == 1: + regular_loss_tmp = torch.abs(self.model.linear.weight) * self.optimizer_config['alpha'] + regular_gradient_tmp = self.optimizer_config['alpha'] * (torch.abs(self.model.linear.weight) + / self.model.linear.weight) + elif self.optimizer_config['p'] == 2: + regular_loss_tmp = (self.model.linear.weight ** 2).sum() * self.optimizer_config['alpha'] / 2 + regular_gradient_tmp = self.optimizer_config['alpha'] * self.model.linear.weight + elif self.optimizer_config['p'] == 0: + pass + + # compute theta_trainer * x_trainer and loss of x_trainer + pred_trainer = self.model(x_batch) + square_tmp = (pred_trainer ** 2).sum() / 2 + loss_trainer = square_tmp + regular_loss_tmp + + # send intermediate results to label trainer. + logger.info("Send intermediate result to label trainer.") + enc_pred_trainer = None + if encryption_method == "ckks": + enc_pred_trainer = ts.ckks_vector(public_context, pred_trainer.numpy().astype(np.float32).flatten()) + enc_loss_trainer = ts.ckks_vector(public_context, loss_trainer.numpy().astype(np.float32).flatten()) + self.dual_channels["intermediate_label_trainer"].send(enc_pred_trainer.serialize(), + use_pickle=False) + self.dual_channels["intermediate_label_trainer"].send(enc_loss_trainer.serialize(), + use_pickle=False) + elif encryption_method == "paillier": + enc_pred_trainer = Paillier.encrypt(public_context, + pred_trainer.numpy().astype(np.float32).flatten(), + precision=encryption_config[encryption_method]["precision"], + obfuscation=True, + num_cores=num_cores) + enc_loss_trainer = Paillier.encrypt(public_context, + loss_trainer.numpy().astype(np.float32).flatten(), + precision=encryption_config[encryption_method]["precision"], + obfuscation=True, + num_cores=num_cores) + self.dual_channels["intermediate_label_trainer"].send(Paillier.serialize(enc_pred_trainer), + use_pickle=False) + self.dual_channels["intermediate_label_trainer"].send(Paillier.serialize(enc_loss_trainer), + use_pickle=False) + elif encryption_method == "plain": + enc_pred_trainer = pred_trainer.numpy().astype(np.float32).flatten() + enc_loss_trainer = loss_trainer.numpy().astype(np.float32).flatten() + self.dual_channels["intermediate_label_trainer"].send(enc_pred_trainer, use_pickle=True) + self.dual_channels["intermediate_label_trainer"].send(enc_loss_trainer, use_pickle=True) + + # exchange theta_trainer * x_trainer to calculate loss_between_trainer when encryption is paillier + logger.info("Calculate trainer_sum to label trainer when encryption is paillier.") + if encryption_method == "paillier" and len(FedConfig.get_trainer()) > 1: + trainer_sum = 0 + logger.info("Send intermediate result to other trainers when encryption is paillier.") + self.broadcast_trainer.broadcast(Paillier.serialize(enc_pred_trainer), use_pickle=False) + logger.info("Receive intermediate result from other trainers when encryption is paillier.") + trainer_tmp = self.broadcast_trainer.collect(use_pickle=False) + for trainer_u in trainer_tmp: + trainer_u = Paillier.ciphertext_from(public_context, trainer_u) + trainer_sum += np.sum(trainer_u * pred_trainer.numpy().astype(np.float32).flatten()) + logger.info("Send trainer_sum to label trainer when encryption is paillier.") + self.dual_channels["intermediate_label_trainer"].send(Paillier.serialize(trainer_sum), + use_pickle=False) + + # receive intermediate result d from label_trainer + logger.info("Receive intermediate result d from label_trainer.") + if encryption_method == "ckks": + enc_d = self.dual_channels["intermediate_label_trainer"].recv(use_pickle=False) + enc_d = ts.ckks_vector_from(public_context, enc_d) + elif encryption_method == "paillier": + enc_d = self.dual_channels["intermediate_label_trainer"].recv(use_pickle=False) + enc_d = Paillier.ciphertext_from(public_context, enc_d) + elif encryption_method == "plain": + enc_d = self.dual_channels["intermediate_label_trainer"].recv() + + # calculate gradient for trainer and send to assist_trainer + logger.info("Calculate gradients for trainer.") + if encryption_method == "ckks": + enc_regular_gradient_tmp = ts.ckks_vector(public_context, + regular_gradient_tmp.numpy().astype(np.float32).flatten()) + elif encryption_method == "paillier": + enc_regular_gradient_tmp = Paillier.encrypt( + public_context, regular_gradient_tmp.numpy().astype(np.float32).flatten(), + precision=encryption_config[encryption_method]["precision"], + obfuscation=True, num_cores=num_cores) + elif encryption_method == "plain": + enc_regular_gradient_tmp = regular_gradient_tmp.numpy().astype(np.float32).flatten() + + if encryption_method == "ckks": + gradient_trainer_w = enc_d.matmul(x_batch.numpy()) + enc_regular_gradient_tmp + else: + gradient_trainer_w = np.matmul(enc_d.reshape(1, len(enc_d)), x_batch.numpy() + ) + enc_regular_gradient_tmp + + # add noise to encrypted gradients and send to assist_trainer + if encryption_method == "ckks": + logger.info("Calculate noised gradient for trainer.") + noise = np.array([rng.randint(1 << 24, 1 << 26) - (1 << 25) for _ in range(x_batch.shape[1])], + dtype=np.float32) + noise /= 100000 + noised_gradient_trainer_w = gradient_trainer_w + noise + logger.info("Send noised gradient to assist_trainer.") + self.dual_channels["gradients_loss"].send(noised_gradient_trainer_w.serialize(), use_pickle=False) + # receive decrypted gradient from assist_trainer + logger.info("Receive decrypted gradient from assist_trainer.") + noised_gradient_trainer_w = self.dual_channels["gradients_loss"].recv() + gradient_trainer_w = noised_gradient_trainer_w - noise + elif encryption_method == "paillier": + logger.info("Calculate noised gradient for trainer.") + noise = np.array([rng.randint(1 << 24, 1 << 26) - (1 << 25) for _ in range(x_batch.shape[1])], + dtype=np.float32) + noise /= 100000 + noised_gradient_trainer_w = gradient_trainer_w + noise + logger.info("Send noised gradient to assist_trainer.") + self.dual_channels["gradients_loss"].send(Paillier.serialize(noised_gradient_trainer_w), + use_pickle=False) + # receive decrypted gradient from assist_trainer + logger.info("Receive decrypted gradient from assist_trainer.") + noised_gradient_trainer_w = self.dual_channels["gradients_loss"].recv() + gradient_trainer_w = noised_gradient_trainer_w - noise + # gradient_trainer_w = torch.FloatTensor(gradient_trainer_w).unsqueeze(-1) + + # update w and b of trainer + gradient_trainer_w = gradient_trainer_w / x_batch.shape[0] + logger.info("Update weights of trainer.") + self.model.linear.weight -= (torch.FloatTensor(gradient_trainer_w) * self.optimizer_config["lr"]) + + # predict train and val for metrics + logger.info("Predict train weights of trainer.") + self.predict(self.train_dataloader) + logger.info("Predict val weights of trainer.") + self.predict(self.val_dataloader) + + # receive flags + early_stop_flag, best_model_flag, patient = self.dual_channels["intermediate_label_trainer"].recv( + use_pickle=True) + # update best model + if best_model_flag: + self.best_model = copy.deepcopy(self.model) + # if need to save results by epoch + if self.save_frequency > 0 and epoch % self.save_frequency == 0: + ModelPreserver.save(save_dir=self.save_dir, + model_name=self.save_model_name, + state_dict=self.model.state_dict(), + epoch=epoch) + # if early stopping, break + if early_stop_flag: + break + + # save model for infer + save_model_config(stage_model_config=self.export_conf, save_path=Path(self.save_dir)) + # if not early stopping, save model + ModelPreserver.save(save_dir=self.save_dir, model_name=self.save_model_name, + state_dict=self.best_model.state_dict(), final=True) + # send w to label trainer + self._save_feature_importance(self.dual_channels["intermediate_label_trainer"]) + + def _save_feature_importance(self, channel): + channel.send((FedNode.node_id, self.best_model.state_dict()["linear.weight"][0])) + + def check_data(self): + dim_channel = BroadcastChannel(name="check_data_com", ids=[FedConfig.node_id] + FedConfig.get_trainer()) + dim_channel.send(self.data_dim) diff --git a/test/algorithm/framework/horizontal/test_h_woe_iv.py b/test/algorithm/framework/horizontal/test_h_woe_iv.py new file mode 100644 index 0000000..1ed8d9e --- /dev/null +++ b/test/algorithm/framework/horizontal/test_h_woe_iv.py @@ -0,0 +1,255 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import os +import shutil +from typing import OrderedDict + +import torch + +from algorithm.framework.horizontal.binning_woe_iv.assist_trainer import \ + HorizontalBinningWoeIvAssistTrainer +from algorithm.framework.horizontal.binning_woe_iv.label_trainer import HorizontalBinningWoeIvLabelTrainer + +import numpy as np +import pandas as pd +import pytest + +import service.fed_config +from common.communication.gRPC.python.channel import DualChannel +from common.communication.gRPC.python.commu import Commu + +from common.crypto.key_agreement.diffie_hellman import DiffieHellman + + +def map_bin(x, split_point): + bin_map = list(range(1, len(split_point) + 1)) + split_tile = np.tile(split_point, (len(x), 1)) + index = np.sum(x.to_numpy().reshape(-1, 1) - split_tile > 0, 1) + return [bin_map[i] for i in index] + + +def equal_width(min_, max_, bins): + if min_ == max_: # adjust end points before binning + min_ -= .001 * abs(min_) if min_ != 0 else .001 + max_ += .001 * abs(max_) if max_ != 0 else .001 + bins = np.linspace(min_, max_, bins + 1, endpoint=True) + else: # adjust end points after binning + bins = np.linspace(min_, max_, bins + 1, endpoint=True) + adj = (max_ - min_) * 0.001 # 0.1% of the range + bins[0] -= adj + return list(bins) + + +def prepare_data(): + case_df = pd.DataFrame({ + 'x0': np.random.random(1000), + 'x1': [0] * 1000, + 'x2': 2 * np.random.random(1000) + 1.0, + 'x3': 3 * np.random.random(1000) - 1.0, + 'x4': np.random.random(1000) + }) + case_df['y'] = np.where(case_df['x0'] + case_df['x2'] + case_df['x3'] > 2.5, 1, 0) + case_df = case_df[['y', 'x0', 'x1', 'x2', 'x3', 'x4']] + case_df.head(800).to_csv( + "/opt/dataset/unit_test/train_data.csv", index=True + ) + case_df.tail(200).to_csv( + "/opt/dataset/unit_test/test_data.csv", index=True + ) + + +@pytest.fixture() +def get_label_trainer_conf(): + with open("python/algorithm/config/horizontal_binning_woe_iv/trainer.json") as f: + conf = json.load(f) + conf["input"]["trainset"][0]["path"] = "/opt/dataset/unit_test" + conf["input"]["trainset"][0]["name"] = "train_data.csv" + yield conf + + +@pytest.fixture() +def get_assist_trainer_conf(): + with open("python/algorithm/config/horizontal_binning_woe_iv/assist_trainer.json") as f: + conf_a = json.load(f) + conf_a["output"]["path"] = "/opt/checkpoints/unit_test1" + yield conf_a + + +@pytest.fixture(scope="module", autouse=True) +def env(): + if not os.path.exists("/opt/dataset/unit_test"): + os.makedirs("/opt/dataset/unit_test") + if not os.path.exists("/opt/checkpoints/unit_test"): + os.makedirs("/opt/checkpoints/unit_test") + prepare_data() + yield + if os.path.exists("/opt/dataset/unit_test"): + shutil.rmtree("/opt/dataset/unit_test") + if os.path.exists("/opt/checkpoints/unit_test"): + shutil.rmtree("/opt/checkpoints/unit_test") + if os.path.exists("/opt/checkpoints/unit_test1"): + shutil.rmtree("/opt/checkpoints/unit_test1") + + +class TestLogisticRegression: + @pytest.mark.parametrize("encryption_method", ["plain", "otp"]) + def test_trainer(self, get_label_trainer_conf, get_assist_trainer_conf, encryption_method, mocker): + conf = get_label_trainer_conf + conf_a = get_assist_trainer_conf + conf["train_info"]["train_params"]["aggregation"]["encryption"]["method"] = encryption_method + # mock init + if encryption_method == "plain": + mocker.patch.object( + DualChannel, "__init__", return_value=None + ) + elif encryption_method == "otp": + pass + mocker.patch.object(Commu, "trainer_ids", ['node-1', 'node-2']) + mocker.patch.object(Commu, "scheduler_id", "assist_trainer") + mocker.patch.object(Commu, "node_id", "node-1") + mocker.patch.object(service.fed_config.FedConfig, "node_id", "node-1") + mocker.patch.object( + DualChannel, "send", return_value=0 + ) + mocker.patch.object( + service.fed_config.FedConfig, "get_label_trainer", return_value=['node-1', 'node-2'] + ) + mocker.patch.object( + service.fed_config.FedConfig, "get_assist_trainer", return_value="assist_trainer" + ) + mocker.patch.object( + service.fed_config.FedConfig, "get_trainer", return_value=[] + ) + mocker.patch.object( + DiffieHellman, "exchange", return_value=bytes(30000) + ) + # init label_trainer + h_woe_iv = HorizontalBinningWoeIvLabelTrainer(conf) + local_min = h_woe_iv.data.features().min(axis=0) + local_max = h_woe_iv.data.features().max(axis=0) + tmp = ("node-1", {"min": local_min, "max": local_max}) + node_lst = [tmp[0]] + min_lst = [tmp[1]["min"]] + max_lst = [tmp[1]["max"]] + index_min = np.argmin(np.array(min_lst), axis=0) + index_max = np.argmax(np.array(max_lst), axis=0) + node_min = np.array([node_lst[i] for i in index_min]) + node_max = np.array([node_lst[j] for j in index_max]) + min_msg = np.where(node_min == "node-1", True, False) + max_msg = np.where(node_max == "node-1", True, False) + + min_send = [local_min[ind] if i else 0 for ind, i in enumerate(min_msg)] + max_send = [local_max[ind] if j else 0 for ind, j in enumerate(max_msg)] + + min_final = [min_send] + max_final = [max_send] + final_min = np.sum(min_final, axis=0) + final_max = np.sum(max_final, axis=0) + + split_points = [] + if conf["train_info"]["train_params"]["binning"]["method"] == "equal_width": + for ind in range(len(final_min)): + split_points.append(equal_width(final_min[ind], final_max[ind], + conf["train_info"]["train_params"]["binning"]["bins"])) + + def mock_recv_dual(*args, **kwargs): + if mock_label_recv_dual.call_count == 1: + return {"min": min_msg, "max": max_msg} + elif mock_label_recv_dual.call_count == 2: + return split_points + + mock_label_recv_dual = mocker.patch.object( + h_woe_iv.dual_channel["min_max"], "recv", side_effect=mock_recv_dual + ) + mocker.patch.object( + h_woe_iv.fedagg_executor, "upload", return_value=0 + ) + + # fit label_trainer + h_woe_iv.fit() + + # mock assist_trainer + mocker.patch.object( + service.fed_config.FedConfig, "get_label_trainer", return_value=['node-1'] + ) + mocker.patch.object( + DualChannel, "recv", return_value={ + "method": "equal_width", + "bins": 5 + } + ) + h_woe_iv_assist = HorizontalBinningWoeIvAssistTrainer(conf_a) + + # mock assist_trainer + def mock_recv_dual_a(*args, **kwargs): + if mock_assist_recv_dual.call_count == 1: + return ("node-1", {"min": local_min, "max": local_max}) + elif mock_assist_recv_dual.call_count == 2: + return {"min": min_send, "max": max_send} + + mock_assist_recv_dual = mocker.patch.object( + h_woe_iv_assist.dual_channel["min_max"]["node-1"], "recv", side_effect=mock_recv_dual_a + ) + + def bin_group(col_name, y): + data_bin_y = pd.DataFrame(bin_map[col_name], columns=[col_name]).join(y) + tmp_count = data_bin_y.groupby([col_name])['y'].agg({'count', 'sum'}) + pos_bin_count = tmp_count['sum'] + neg_bin_count = tmp_count['count'] - tmp_count['sum'] + pos_bin_count.name = "pos" + neg_bin_count.name = "neg" + # transform initial group result to the same length + tmp_fill = pd.DataFrame(index=list(range(1, h_woe_iv.config.train_params["binning"]["bins"] + 1))) + pos_bin_count = tmp_fill.join(pos_bin_count).fillna(0) + neg_bin_count = tmp_fill.join(neg_bin_count).fillna(0) + return [pos_bin_count, neg_bin_count] + + bin_map = list() + data_df = pd.DataFrame(h_woe_iv.data.features()) + map_tmp = list(range(len(split_points))) + pd.Series(map_tmp).apply(lambda x: bin_map.append(map_bin(data_df[x], split_points[x][1:]))) + data_bins_df = pd.DataFrame(bin_map).T + pos_neg_bin = list() + pd.Series(data_bins_df.columns).apply(lambda x: pos_neg_bin.append(bin_group(x, h_woe_iv.y))) + pos_bin = [np.array(i[0]) for i in pos_neg_bin] + neg_bin = [np.array(i[1]) for i in pos_neg_bin] + pos_bin_dict = OrderedDict(zip(range(len(pos_bin)), pos_bin)) + neg_bin_dict = OrderedDict(zip(range(len(neg_bin)), neg_bin)) + + def mock_aggregation(*args, **kwargs): + return_value = OrderedDict(zip(range(len(pos_bin)), np.zeros(len(pos_bin)))) + if mock_aggregation_a.call_count == 1: + for key in pos_bin_dict.keys(): + return_value[key] += pos_bin_dict[key] + elif mock_aggregation_a.call_count == 2: + for key in neg_bin_dict.keys(): + return_value[key] += neg_bin_dict[key] + return return_value + + mock_aggregation_a = mocker.patch.object( + h_woe_iv_assist.fedagg_executor, "aggregate", side_effect=mock_aggregation + ) + # fit assist_trainer + h_woe_iv_assist.fit() + + assert os.path.exists("/opt/checkpoints/unit_test1/woe_iv_result_[STAGE_ID].json") + with open("/opt/checkpoints/unit_test1/woe_iv_result_[STAGE_ID].json", "r", + encoding='utf-8') as f: + conf = json.loads(f.read()) + for k in ["woe", "iv", "split_points"]: + assert k in conf + assert len(conf[k]) == np.shape(h_woe_iv.data.features())[1] diff --git a/test/algorithm/framework/local/test_local_data_statistic.py b/test/algorithm/framework/local/test_local_data_statistic.py new file mode 100644 index 0000000..788d963 --- /dev/null +++ b/test/algorithm/framework/local/test_local_data_statistic.py @@ -0,0 +1,141 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import copy +import json +import os +import shutil +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +from algorithm.framework.local.data_statistic.label_trainer import \ + LocalDataStatisticLabelTrainer as LocalDataStatistic +from algorithm.framework.local.data_statistic.trainer import \ + LocalDataStatisticTrainer as LocalDataStatisticTrainer + + +@pytest.fixture(scope="module", autouse=True) +def env(): + # + if not os.path.exists("/opt/dataset/unit_test"): + os.makedirs("/opt/dataset/unit_test") + if not os.path.exists("/opt/checkpoints/unit_test"): + os.makedirs("/opt/checkpoints/unit_test") + # + case_df = pd.DataFrame({ + 'x01': np.random.random(1000), + 'x00': [np.NaN, '', None, ' ', 'nan'] + [0] * 995, + 'x03': 2 * np.random.random(1000) + 1.0, + 'x02': [0] * 300 + [1] * 700 + }) + case_df['y'] = np.where(case_df['x01'] + case_df['x02'] > 2.5, 1, 0) + case_df[['y', 'x00', 'x01', 'x02', 'x03']].to_csv( + "/opt/dataset/unit_test/data.csv", index=True, index_label='id' + ) + case_df[['y', 'x00', 'x01', 'x02', 'x03']].to_csv( + "/opt/dataset/unit_test/data_opt.csv", index=True, index_label='id' + ) + + yield + # + if os.path.exists("/opt/dataset/unit_test"): + shutil.rmtree("/opt/dataset/unit_test") + if os.path.exists("/opt/checkpoints/unit_test"): + shutil.rmtree("/opt/checkpoints/unit_test") + if os.path.exists("/opt/checkpoints/unit_test_1"): + shutil.rmtree("/opt/checkpoints/unit_test_1") + + +@pytest.fixture() +def get_conf(): + with open("python/algorithm/config/local_data_statistic/label_trainer.json") as f: + conf = json.load(f) + conf["input"]["dataset"][0]["path"] = "/opt/dataset/unit_test" + conf["input"]["dataset"][0]["name"] = "data.csv" + conf["output"]["path"] = "/opt/checkpoints/unit_test_1" + yield conf + + +class TestLocalDataStatistic: + @pytest.mark.parametrize('dataset', [[ + { + "type": "csv", + "path": "/opt/dataset/unit_test", + "name": "data.csv", + "has_label": True, + "has_id": True + } + ], + [ + { + "type": "csv", + "path": "/opt/dataset/unit_test", + "name": "data.csv", + "has_label": True, + "has_id": True + }, + { + "type": "csv", + "path": "/opt/dataset/unit_test", + "name": "data_opt.csv", + "has_label": True, + "has_id": True + } + ] + ]) + def test_default(self, get_conf, dataset): + conf = copy.deepcopy(get_conf) + conf["input"]["dataset"] = dataset + lds = LocalDataStatistic(conf) + + if len(dataset) == 1: + assert len(lds.data) == 1000 + else: + assert len(lds.data) == 2000 + + @pytest.mark.parametrize('train_info_params', [{}, {"quantile": [0.5, 0.8, 0.9]}]) + def test_fit(self, get_conf, train_info_params): + conf = copy.deepcopy(get_conf) + conf["train_info"]["train_params"] = train_info_params + lds = LocalDataStatistic(conf) + if train_info_params == {}: + assert lds.quantile == [0.25, 0.5, 0.75] + # test for no label + conf1 = copy.deepcopy(get_conf) + conf1["input"]["dataset"][0]["has_label"] = False + lds = LocalDataStatistic(conf1) + lds.fit() + assert "label_num" not in lds.summary_dict.keys() + assert lds.summary_dict["row_num"] == 1000 + assert lds.summary_dict["column_num"] == 5 + assert lds.summary_dict["feature_names"] == ["y", "x00", "x01", "x02", "x03"] + assert lds.summary_dict["missing_ratio"]["x00"] == float("%.6f" % (5/1000)) + else: + assert lds.quantile == [0.5, 0.8, 0.9] + lds.fit() + assert len(set(lds.summary_dict.keys()).difference( + {"mean", "median", "missing_ratio", "min", "max", "variance", "std", "quantile", "skewness", "kurtosis", + "quantile", "row_num", "label_num", "column_num", "feature_names"})) == 0 + assert lds.summary_dict["column_num"] == 4 + assert lds.summary_dict["feature_names"] == ["x00", "x01", "x02", "x03"] + + def test_trainer(self, get_conf): + LocalDataStatisticTrainer(get_conf) + + + diff --git a/test/algorithm/framework/vertical/test_linear_regression.py b/test/algorithm/framework/vertical/test_linear_regression.py new file mode 100644 index 0000000..3fbbd3e --- /dev/null +++ b/test/algorithm/framework/vertical/test_linear_regression.py @@ -0,0 +1,481 @@ +# Copyright 2022 The XFL Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import copy +import json +import os +import shutil +import tenseal as ts +import numpy as np +import pandas as pd +import pytest +from pytest_mock import mocker + +from common.crypto.paillier.paillier import Paillier +from service.fed_config import FedConfig +import service.fed_config +from algorithm.framework.vertical.linear_regression.assist_trainer import \ + VerticalLinearRegressionAssistTrainer +from algorithm.framework.vertical.linear_regression.trainer import VerticalLinearRegressionTrainer +from algorithm.framework.vertical.linear_regression.label_trainer import VerticalLinearRegressionLabelTrainer +from common.communication.gRPC.python.channel import (BroadcastChannel, DualChannel) + + +def prepare_data(): + case_df = pd.DataFrame({ + 'x0': np.random.random(1000), + 'x1': 2 * np.random.random(1000) + 2.0, + 'x2': 2 * np.random.random(1000) + 1.0, + 'x3': 3 * np.random.random(1000) - 1.0, + 'x4': np.random.random(1000) + }) + case_df['y'] = case_df['x0'] + case_df['x1'] + case_df['x3'] + 0.5 * case_df['x2'] + case_df[['y', 'x0']].head(800).to_csv( + "/opt/dataset/unit_test/train_guest.csv", index=True + ) + case_df[['y', 'x0']].tail(200).to_csv( + "/opt/dataset/unit_test/test_guest.csv", index=True + ) + case_df[['x1', 'x2']].head(800).to_csv( + "/opt/dataset/unit_test/train_host1.csv", index=True + ) + case_df[['x1', 'x2']].tail(200).to_csv( + "/opt/dataset/unit_test/test_host1.csv", index=True + ) + case_df[['x3', 'x4']].head(800).to_csv( + "/opt/dataset/unit_test/train_host2.csv", index=True + ) + case_df[['x3', 'x4']].tail(200).to_csv( + "/opt/dataset/unit_test/test_host2.csv", index=True + ) + + +@pytest.fixture(scope="module", autouse=True) +def env(): + if not os.path.exists("/opt/dataset/unit_test"): + os.makedirs("/opt/dataset/unit_test") + if not os.path.exists("/opt/checkpoints/unit_test"): + os.makedirs("/opt/checkpoints/unit_test") + prepare_data() + yield + if os.path.exists("/opt/dataset/unit_test"): + shutil.rmtree("/opt/dataset/unit_test") + if os.path.exists("/opt/checkpoints/unit_test"): + shutil.rmtree("/opt/checkpoints/unit_test") + + +@pytest.fixture() +def get_label_trainer_conf(): + with open("python/algorithm/config/vertical_linear_regression/label_trainer.json") as f: + conf = json.load(f) + conf["input"]["trainset"][0]["path"] = "/opt/dataset/unit_test" + conf["input"]["trainset"][0]["name"] = "train_guest.csv" + conf["input"]["valset"][0]["path"] = "/opt/dataset/unit_test" + conf["input"]["valset"][0]["name"] = "test_guest.csv" + conf["output"]["path"] = "/opt/checkpoints/unit_test" + yield conf + + +@pytest.fixture() +def get_trainer1_conf(): + with open("python/algorithm/config/vertical_linear_regression/trainer.json") as f: + conf = json.load(f) + conf["input"]["trainset"][0]["path"] = "/opt/dataset/unit_test" + conf["input"]["trainset"][0]["name"] = "train_host1.csv" + conf["input"]["valset"][0]["path"] = "/opt/dataset/unit_test" + conf["input"]["valset"][0]["name"] = "test_host1.csv" + conf["output"]["path"] = "/opt/checkpoints/unit_test" + yield conf + + +class TestVerticalLinearRegressionTrainer: + @pytest.mark.parametrize("encryption", [{"ckks": { + "poly_modulus_degree": 8192, "coeff_mod_bit_sizes": [60, 40, 40, 60], "global_scale_bit_size": 40}}, + {"plain": {}}, {"paillier": {"key_bit_size": 2048, "precision": 7, "djn_on": True, "parallelize_on": True}}]) + def test_all_trainers(self, get_label_trainer_conf, encryption, mocker): + conf = get_label_trainer_conf + with open("python/algorithm/config/vertical_linear_regression/trainer.json") as f: + conf_t = json.load(f) + conf_t["input"]["trainset"][0]["path"] = "/opt/dataset/unit_test" + conf_t["input"]["trainset"][0]["name"] = "train_host1.csv" + conf_t["input"]["valset"][0]["path"] = "/opt/dataset/unit_test" + conf_t["input"]["valset"][0]["name"] = "test_host1.csv" + conf_t["output"]["path"] = "/opt/checkpoints/unit_test" + + conf["train_info"]["train_params"]["global_epoch"] = 1 + conf["train_info"]["train_params"]["batch_size"] = 1000 + conf["train_info"]["train_params"]["encryption"] = encryption + conf_t["train_info"]["train_params"]["batch_size"] = 1000 + conf_t["train_info"]["train_params"]["encryption"] = encryption + conf_t["train_info"]["train_params"]["global_epoch"] = 1 + + # test trainset not configured error + conf2 = copy.deepcopy(conf) + conf2["input"]["trainset"] = [] + with pytest.raises(NotImplementedError) as e: + vlr_ = VerticalLinearRegressionLabelTrainer(conf2) + exec_msg = e.value.args[0] + assert exec_msg == "Trainset was not configured." + + # test trainset type not configured error + conf1 = copy.deepcopy(conf) + conf1["input"]["trainset"][0]["type"] = "json" + with pytest.raises(NotImplementedError) as e: + vlr_ = VerticalLinearRegressionLabelTrainer(conf1) + exec_msg = e.value.args[0] + assert exec_msg == "Dataset type {} is not supported.".format(vlr_.input["trainset"][0]["type"]) + + # mock label_trainer + encryption_config = encryption + encryption_method = list(encryption.keys())[0] + + if encryption_method == "ckks": + private_context = ts.context( + ts.SCHEME_TYPE.CKKS, + poly_modulus_degree=encryption_config[encryption_method]["poly_modulus_degree"], + coeff_mod_bit_sizes=encryption_config[encryption_method]["coeff_mod_bit_sizes"] + ) + private_context.generate_galois_keys() + private_context.generate_relin_keys() + private_context.global_scale = 1 << encryption_config[encryption_method][ + "global_scale_bit_size"] + + serialized_public_context = private_context.serialize( + save_public_key=True, + save_secret_key=False, + save_galois_keys=True, + save_relin_keys=True + ) + public_context = ts.context_from(serialized_public_context) + elif encryption_method == "paillier": + num_cores = -1 if encryption_config[encryption_method]["parallelize_on"] else 1 + private_context = Paillier.context(encryption_config[encryption_method]["key_bit_size"], + djn_on=encryption_config[encryption_method]["djn_on"]) + paillier_key = private_context.to_public().serialize() + public_context = Paillier.context_from(paillier_key) + + mocker.patch.object( + DualChannel, "__init__", return_value=None + ) + mocker.patch.object( + BroadcastChannel, "__init__", return_value=None + ) + mocker.patch.object( + service.fed_config.FedConfig, "get_label_trainer", return_value=['node-1'] + ) + mocker.patch.object( + service.fed_config.FedConfig, "get_trainer", return_value=['node-2', 'node-3'] + ) + mocker.patch.object( + service.fed_config.FedConfig, "get_assist_trainer", return_value="assist_trainer" + ) + mocker.patch.object( + DualChannel, "send", return_value=0 + ) + + vlr = VerticalLinearRegressionLabelTrainer(conf) + vlr_t = VerticalLinearRegressionTrainer(conf_t) + + assert len(vlr.train_dataloader) == int(len(vlr.train) / vlr.batch_size) + 1 + + for batch_idx, (_x_batch, _) in enumerate(vlr_t.train_dataloader): + x_batch = _x_batch + break + for batch_idx, (_x_batch, _y_batch, _) in enumerate(vlr.train_dataloader): + x_batch_label = _x_batch + y_batch = _y_batch + break + for batch_idx, (_x_batch, _) in enumerate(vlr_t.val_dataloader): + x_batch_val = _x_batch + break + + pred_trainer = vlr_t.model(x_batch) + pred_trainer_val = vlr_t.model(x_batch_val) + pred_label_trainer = vlr.model(x_batch_label) + + loss_trainer = (pred_trainer ** 2).sum() / 2 + pred_residual = pred_label_trainer - y_batch + loss_label_trainer = (pred_residual ** 2).sum() / 2 + loss = loss_trainer + loss_label_trainer + d = pred_trainer + pred_residual + + if encryption_method == "paillier": + en_pred_trainer_p = Paillier.serialize(Paillier.encrypt(public_context, + pred_trainer.numpy().astype(np.float32).flatten(), + precision=encryption_config[encryption_method][ + "precision"], + obfuscation=True, num_cores=num_cores)) + en_loss_trainer_p = Paillier.serialize(Paillier.encrypt(public_context, + float(loss_trainer), + precision=encryption_config[encryption_method][ + "precision"], + obfuscation=True, num_cores=num_cores)) + + def mock_dual_label_t_recv(*args, **kwargs): + if encryption_method == "ckks": + if mock_label_t_recv.call_count == 1: + return ts.ckks_vector(public_context, pred_trainer.numpy().astype(np.float32).flatten()).serialize() + elif mock_label_t_recv.call_count == 2: + return ts.ckks_vector(public_context, loss_trainer.numpy().astype(np.float32).flatten()).serialize() + elif mock_label_t_recv.call_count == 3: + return pred_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv.call_count == 4: + return pred_trainer_val.numpy().astype(np.float32).flatten() + elif mock_label_t_recv.call_count == 5: + tmp = ("node-2", vlr_t.model.state_dict()["linear.weight"][0]) + return tmp + elif encryption_method == "paillier": + if mock_label_t_recv.call_count == 1: + return en_pred_trainer_p + elif mock_label_t_recv.call_count == 2: + return en_loss_trainer_p + elif mock_label_t_recv.call_count == 3: + tmp = Paillier.ciphertext_from(public_context, en_pred_trainer_p) + return Paillier.serialize(np.sum(tmp * pred_trainer.numpy().astype(np.float32).flatten())) + elif mock_label_t_recv.call_count == 4: + return pred_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv.call_count == 5: + return pred_trainer_val.numpy().astype(np.float32).flatten() + elif mock_label_t_recv.call_count == 6: + tmp = ("node-2", vlr_t.model.state_dict()["linear.weight"][0]) + return tmp + elif encryption_method == "plain": + if mock_label_t_recv.call_count == 1: + return pred_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv.call_count == 2: + return loss_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv.call_count == 3: + return pred_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv.call_count == 4: + return pred_trainer_val.numpy().astype(np.float32).flatten() + elif mock_label_t_recv.call_count == 5: + tmp = ("node-2", vlr_t.model.state_dict()["linear.weight"][0]) + return tmp + + def mock_dual_label_t_recv_1(*args, **kwargs): + if encryption_method == "ckks": + if mock_label_t_recv_1.call_count == 1: + return ts.ckks_vector(public_context, pred_trainer.numpy().astype(np.float32).flatten()).serialize() + elif mock_label_t_recv_1.call_count == 2: + return ts.ckks_vector(public_context, loss_trainer.numpy().astype(np.float32).flatten()).serialize() + elif mock_label_t_recv_1.call_count == 3: + return pred_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv_1.call_count == 4: + return pred_trainer_val.numpy().astype(np.float32).flatten() + elif mock_label_t_recv_1.call_count == 5: + tmp = ("node-3", vlr_t.model.state_dict()["linear.weight"][0]) + return tmp + elif encryption_method == "paillier": + if mock_label_t_recv_1.call_count == 1: + return en_pred_trainer_p + elif mock_label_t_recv_1.call_count == 2: + return en_loss_trainer_p + elif mock_label_t_recv_1.call_count == 3: + tmp = Paillier.ciphertext_from(public_context, en_pred_trainer_p) + return Paillier.serialize(np.sum(tmp * pred_trainer.numpy().astype(np.float32).flatten())) + elif mock_label_t_recv_1.call_count == 4: + return pred_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv_1.call_count == 5: + return pred_trainer_val.numpy().astype(np.float32).flatten() + elif mock_label_t_recv_1.call_count == 6: + tmp = ("node-3", vlr_t.model.state_dict()["linear.weight"][0]) + return tmp + elif encryption_method == "plain": + if mock_label_t_recv_1.call_count == 1: + return pred_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv_1.call_count == 2: + return loss_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv_1.call_count == 3: + return pred_trainer.numpy().astype(np.float32).flatten() + elif mock_label_t_recv_1.call_count == 4: + return pred_trainer_val.numpy().astype(np.float32).flatten() + elif mock_label_t_recv_1.call_count == 5: + tmp = ("node-3", vlr_t.model.state_dict()["linear.weight"][0]) + return tmp + + mock_label_t_recv = mocker.patch.object( + vlr.dual_channels["intermediate_label_trainer"]["node-2"], "recv", side_effect=mock_dual_label_t_recv + ) + mock_label_t_recv_1 = mocker.patch.object( + vlr.dual_channels["intermediate_label_trainer"]["node-3"], "recv", side_effect=mock_dual_label_t_recv_1 + ) + + def mock_broadcast_recv(*args, **kwargs): + if vlr.encryption_method == "ckks": + return serialized_public_context + elif vlr.encryption_method == "paillier": + return private_context.to_public().serialize() + + def mock_broadcast_collect(*args, **kwargs): + return [2, 2] + + mocker.patch.object( + BroadcastChannel, "recv", side_effect=mock_broadcast_recv + ) + mocker.patch.object( + BroadcastChannel, "collect", side_effect=mock_broadcast_collect + ) + + def mock_gradients_loss(*args, **kwargs): + if mock_gradients_loss_label.call_count == 1: + return loss + elif mock_gradients_loss_label.call_count == 2: + tmp_w = vlr.model.linear.weight + tmp_b = vlr.model.linear.bias + return {"noised_gradient_label_trainer_w": tmp_w, "noised_gradient_label_trainer_b": tmp_b} + + mock_gradients_loss_label = mocker.patch.object( + vlr.dual_channels["gradients_loss"], "recv", side_effect=mock_gradients_loss + ) + # fit label_trainer + vlr.fit() + + # mock for trainer + mocker.patch.object( + BroadcastChannel, "send", return_value=0 + ) + mocker.patch.object( + BroadcastChannel, "broadcast", return_value=0 + ) + + def mock_trainer_collect(*args, **kwargs): + return [en_pred_trainer_p] + + mocker.patch.object( + BroadcastChannel, "collect", side_effect=mock_trainer_collect + ) + + def mock_dual_recv(*args, **kwargs): + if mock_trainer_dual_recv.call_count == 1: + if encryption_method == "ckks": + return ts.ckks_vector(public_context, d.numpy().astype(np.float32).flatten()).serialize() + elif encryption_method == "paillier": + return Paillier.serialize(Paillier.encrypt(public_context, d.numpy().astype(np.float32).flatten(), + precision=encryption_config[encryption_method][ + "precision"], + obfuscation=True, num_cores=num_cores)) + elif encryption_method == "plain": + return d + elif mock_trainer_dual_recv.call_count == 2: + return [False, True, vlr.early_stopping_config["patience"]] + + mock_trainer_dual_recv = mocker.patch.object( + vlr_t.dual_channels["intermediate_label_trainer"], "recv", side_effect=mock_dual_recv + ) + + def mock_gradients_trainer(*args, **kwargs): + return vlr_t.model.linear.weight + + mocker.patch.object( + vlr_t.dual_channels["gradients_loss"], "recv", side_effect=mock_gradients_trainer + ) + + def mock_broadcast_trainer_recv(*args, **kwargs): + if encryption_method == "ckks": + return serialized_public_context + elif encryption_method == "paillier": + return private_context.to_public().serialize() + + mocker.patch.object( + vlr_t.broadcast_channel, "recv", side_effect=mock_broadcast_trainer_recv + ) + # fit vlr_t + vlr_t.fit() + + # mock for assist_trainer + def mock_dual_init_recv(*args, **kwargs): + if mock_dual_init_recv_.call_count == 1: + return 1 + elif mock_dual_init_recv_.call_count == 2: + return 1 + elif mock_dual_init_recv_.call_count == 3: + return 1000 + elif mock_dual_init_recv_.call_count == 4: + return encryption_config + elif mock_dual_init_recv_.call_count == 5: + return encryption_method + + mock_dual_init_recv_ = mocker.patch.object( + DualChannel, "recv", side_effect=mock_dual_init_recv + ) + vlr_a = VerticalLinearRegressionAssistTrainer() + + if encryption_method == "paillier": + num_cores = -1 if encryption_config[encryption_method]["parallelize_on"] else 1 + public_context = Paillier.context_from(vlr_a.public_context_ser) + elif encryption_method == "ckks": + public_context = ts.context_from(vlr_a.public_context_ser) + + def mock_dual_label_t_recv(*args, **kwargs): + if mock_dual_label_recv_.call_count == 1: + if encryption_method == "ckks": + return ts.ckks_vector(public_context, loss.numpy().astype(np.float32).flatten() + ).serialize() + elif encryption_method == "paillier": + return Paillier.serialize(Paillier.encrypt(public_context, float(loss), + precision=encryption_config[encryption_method][ + "precision"], obfuscation=True, num_cores=num_cores)) + elif encryption_method == "plain": + return loss + elif mock_dual_label_recv_.call_count == 2: + if encryption_method == "ckks": + return ts.ckks_vector(public_context, vlr.model.linear.weight.numpy().astype(np.float32).flatten() + ).serialize() + elif encryption_method == "paillier": + return Paillier.serialize(Paillier.encrypt(public_context, vlr.model.linear.weight.numpy().astype( + np.float32).flatten(), precision=encryption_config[encryption_method]["precision"], + obfuscation=True, num_cores=num_cores)) + elif mock_dual_label_recv_.call_count == 3: + if encryption_method == "ckks": + return ts.ckks_vector(public_context, vlr.model.linear.bias.numpy().astype(np.float32).flatten() + ).serialize() + elif encryption_method == "paillier": + return Paillier.serialize(Paillier.encrypt(public_context, vlr.model.linear.bias.numpy().astype( + np.float32).flatten(), precision=encryption_config[encryption_method]["precision"], + obfuscation=True, num_cores=num_cores)) + + mock_dual_label_recv_ = mocker.patch.object( + vlr_a.dual_channels["gradients_loss"]['node-1'], "recv", side_effect=mock_dual_label_t_recv + ) + + def mock_dual_trainer_t_recv(*args, **kwargs): + if encryption_method == "ckks": + return ts.ckks_vector(public_context, vlr_t.model.linear.weight.numpy().astype(np.float32).flatten() + ).serialize() + elif encryption_method == "paillier": + return Paillier.serialize(Paillier.encrypt(public_context, vlr_t.model.linear.weight.numpy().astype( + np.float32).flatten(), precision=encryption_config[encryption_method]["precision"], + obfuscation=True, num_cores=num_cores)) + + mocker.patch.object( + vlr_a.dual_channels["gradients_loss"]['node-2'], "recv", side_effect=mock_dual_trainer_t_recv + ) + mocker.patch.object( + vlr_a.dual_channels["gradients_loss"]['node-3'], "recv", side_effect=mock_dual_trainer_t_recv + ) + # fit assist_trainer + vlr_a.fit() + + assert os.path.exists("/opt/checkpoints/unit_test/vertical_linear_regression_[STAGE_ID].pt") + assert os.path.exists("/opt/checkpoints/unit_test/linear_reg_metric_train_[STAGE_ID].csv") + assert os.path.exists("/opt/checkpoints/unit_test/linear_reg_metric_val_[STAGE_ID].csv") + assert os.path.exists("/opt/checkpoints/unit_test/linear_reg_prediction_train_[STAGE_ID].csv") + assert os.path.exists("/opt/checkpoints/unit_test/linear_reg_prediction_val_[STAGE_ID].csv") + assert os.path.exists("/opt/checkpoints/unit_test/linear_reg_feature_importance_[STAGE_ID].csv") + + feature_importance = pd.read_csv("/opt/checkpoints/unit_test/linear_reg_feature_importance_[STAGE_ID].csv") + assert len(feature_importance) == 5 + train_metric = pd.read_csv("/opt/checkpoints/unit_test/linear_reg_metric_train_[STAGE_ID].csv") + assert len(train_metric.columns) == 6 + val_metric = pd.read_csv("/opt/checkpoints/unit_test/linear_reg_metric_val_[STAGE_ID].csv") + assert len(val_metric.columns) == 6