diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..9ccba39 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,31 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Lint with Flake8 + +on: + pull_request: + branches: [ develop ] + +permissions: + contents: read + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.8 + uses: actions/setup-python@v3 + with: + python-version: "3.8" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-linters.txt + pip install . + - name: Lint with flake8 + run: | + flake8 --show-source diff --git a/.github/workflows/pytest_coverage.yml b/.github/workflows/pytest_coverage.yml new file mode 100644 index 0000000..ce2a7f6 --- /dev/null +++ b/.github/workflows/pytest_coverage.yml @@ -0,0 +1,33 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Pytest and code coverage + +on: + pull_request: + branches: [ develop ] + +permissions: + contents: read + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.8 + uses: actions/setup-python@v3 + with: + python-version: "3.8" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pytest coverage + pip install -r requirements-test.txt + pip install . + - name: Test with pytest and report code coverage + run: | + coverage run -m pytest -rA + coverage report diff --git a/.github/workflows/task_runner_custom_weighted_average.yml b/.github/workflows/task_runner_custom_weighted_average.yml new file mode 100644 index 0000000..0deec9d --- /dev/null +++ b/.github/workflows/task_runner_custom_weighted_average.yml @@ -0,0 +1,38 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Task Runner with Median Aggregation + +on: + pull_request: + branches: [ develop ] + +permissions: + contents: read + +jobs: + build: + strategy: + matrix: + os: ['ubuntu-latest', 'windows-latest'] + python-version: ['3.8','3.9','3.10','3.11'] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v3 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies ubuntu + if: matrix.os == 'ubuntu-latest' + run: | + python -m pip install --upgrade pip + pip install . + - name: Install dependencies windows + if: matrix.os == 'windows-latest' + run: | + python -m pip install --upgrade pip + pip install . + - name: Test Task Runner API + run: | + python -m tests.github.test_task_runner --workspace torch_cnn_mnist_custom_weighted_average --col1 col1 --col2 col2 --rounds-to-train 3 diff --git a/.github/workflows/task_runner_skc_compression.yml b/.github/workflows/task_runner_skc_compression.yml new file mode 100644 index 0000000..7653aec --- /dev/null +++ b/.github/workflows/task_runner_skc_compression.yml @@ -0,0 +1,38 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Task Runner with SKC Compression + +on: + pull_request: + branches: [ develop ] + +permissions: + contents: read + +jobs: + build: + strategy: + matrix: + os: ['ubuntu-latest', 'windows-latest'] + python-version: ['3.8','3.9','3.10','3.11'] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v3 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies ubuntu + if: matrix.os == 'ubuntu-latest' + run: | + python -m pip install --upgrade pip + pip install . + - name: Install dependencies windows + if: matrix.os == 'windows-latest' + run: | + python -m pip install --upgrade pip + pip install . + - name: Test Task Runner API + run: | + python -m tests.github.test_task_runner --workspace torch_cnn_mnist_skc_compression --col1 col1 --col2 col2 --rounds-to-train 3 diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml new file mode 100644 index 0000000..54b2088 --- /dev/null +++ b/.github/workflows/ubuntu.yml @@ -0,0 +1,46 @@ +name: Ubuntu (latest) + +on: + schedule: + - cron: '0 0 * * *' + +permissions: + contents: read + +jobs: + lint: # from lint.yml + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.8 + uses: actions/setup-python@v3 + with: + python-version: "3.8" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-linters.txt + pip install . + - name: Lint with flake8 + run: | + flake8 --show-source + + pytest-coverage: # from pytest_coverage.yml + needs: lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.8 + uses: actions/setup-python@v3 + with: + python-version: "3.8" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pytest coverage + pip install -r requirements-test.txt + pip install . + - name: Test with pytest and report code coverage + run: | + coverage run -m pytest -rA + coverage report \ No newline at end of file diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml new file mode 100644 index 0000000..fa2d478 --- /dev/null +++ b/.github/workflows/windows.yml @@ -0,0 +1,28 @@ +name: Windows (latest) + +on: + schedule: + - cron: '0 0 * * *' + +permissions: + contents: read + +jobs: + pytest-coverage: # from pytest_coverage.yml + runs-on: windows-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.8 + uses: actions/setup-python@v3 + with: + python-version: "3.8" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pytest coverage + pip install -r requirements-test.txt + pip install . + - name: Test with pytest and report code coverage + run: | + coverage run -m pytest -rA + coverage report \ No newline at end of file diff --git a/openfl_contrib/__init__.py b/openfl_contrib/__init__.py new file mode 100644 index 0000000..04d40ef --- /dev/null +++ b/openfl_contrib/__init__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl base package.""" +from openfl_contrib.__version__ import __version__ diff --git a/openfl_contrib/__version__.py b/openfl_contrib/__version__.py new file mode 100644 index 0000000..a764983 --- /dev/null +++ b/openfl_contrib/__version__.py @@ -0,0 +1,4 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl-contrib version information.""" +__version__ = '0.1.0' diff --git a/openfl_contrib/interface/__init__.py b/openfl_contrib/interface/__init__.py new file mode 100644 index 0000000..b84aca4 --- /dev/null +++ b/openfl_contrib/interface/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl.interface package.""" diff --git a/openfl_contrib/interface/aggregation_functions/__init__.py b/openfl_contrib/interface/aggregation_functions/__init__.py new file mode 100644 index 0000000..e3049d7 --- /dev/null +++ b/openfl_contrib/interface/aggregation_functions/__init__.py @@ -0,0 +1,6 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Aggregation functions package.""" + +from openfl_contrib.interface.aggregation_functions.custom_weighted_average import CustomWeightedAverage diff --git a/openfl_contrib/interface/aggregation_functions/custom_weighted_average.py b/openfl_contrib/interface/aggregation_functions/custom_weighted_average.py new file mode 100644 index 0000000..deb214e --- /dev/null +++ b/openfl_contrib/interface/aggregation_functions/custom_weighted_average.py @@ -0,0 +1,45 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Custom Federated averaging module.""" + +import numpy as np + +from openfl.interface.aggregation_functions.core import AggregationFunction + + +class CustomWeightedAverage(AggregationFunction): + """Weighted average aggregation.""" + + def call(self, local_tensors, *_) -> np.ndarray: + """Aggregate tensors. + + Args: + local_tensors(list[openfl.utilities.LocalTensor]): List of local tensors to aggregate. + db_iterator: iterator over history of all tensors. Columns: + - 'tensor_name': name of the tensor. + Examples for `torch.nn.Module`s: 'conv1.weight', 'fc2.bias'. + - 'round': 0-based number of round corresponding to this tensor. + - 'tags': tuple of tensor tags. Tags that can appear: + - 'model' indicates that the tensor is a model parameter. + - 'trained' indicates that tensor is a part of a training result. + These tensors are passed to the aggregator node after local learning. + - 'aggregated' indicates that tensor is a result of aggregation. + These tensors are sent to collaborators for the next round. + - 'delta' indicates that value is a difference between rounds + for a specific tensor. + also one of the tags is a collaborator name + if it corresponds to a result of a local task. + + - 'nparray': value of the tensor. + tensor_name: name of the tensor + fl_round: round number + tags: tuple of tags for this tensor + Returns: + np.ndarray: aggregated tensor + """ + tensors, weights = zip(*[(x.tensor, x.weight) for x in local_tensors]) + + total_weight = sum(weights) + weighted_sum = np.sum([tensor * weight for tensor, weight in zip(tensors, weights)], axis=0) + return weighted_sum / total_weight \ No newline at end of file diff --git a/openfl_contrib/pipelines/__init__.py b/openfl_contrib/pipelines/__init__.py new file mode 100644 index 0000000..8668c27 --- /dev/null +++ b/openfl_contrib/pipelines/__init__.py @@ -0,0 +1,5 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl.pipelines module.""" + +from openfl_contrib.pipelines.skc_pipeline import SKCPipeline diff --git a/openfl_contrib/pipelines/skc_pipeline.py b/openfl_contrib/pipelines/skc_pipeline.py new file mode 100644 index 0000000..b07f29b --- /dev/null +++ b/openfl_contrib/pipelines/skc_pipeline.py @@ -0,0 +1,226 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""SKCPipeline module.""" + +import copy as co +import gzip as gz + +import numpy as np +from sklearn import cluster + +from openfl.pipelines.pipeline import TransformationPipeline +from openfl.pipelines.pipeline import Transformer + + +class SparsityTransformer(Transformer): + """A transformer class to sparsify input data.""" + + def __init__(self, p=0.01): + """Initialize. + + Args: + p (float): sparsity ratio (Default=0.01) + """ + self.lossy = True + self.p = p + + def forward(self, data, **kwargs): + """ + Sparsify data and pass over only non-sparsified elements by reducing the array size. + + Args: + data: an numpy array from the model tensor_dict. + + Returns: + sparse_data: a flattened, sparse representation of the input tensor + metadata: dictionary to store a list of meta information. + """ + metadata = {'int_list': list(data.shape)} + # sparsification + data = data.astype(np.float32) + flatten_data = data.flatten() + n_elements = flatten_data.shape[0] + k_op = int(np.ceil(n_elements * self.p)) + topk, topk_indices = self._topk_func(flatten_data, k_op) + sparse_data = np.zeros(flatten_data.shape) + sparse_data[topk_indices] = topk + return sparse_data, metadata + + def backward(self, data, metadata, **kwargs): + """Recover data array with the right shape and numerical type. + + Args: + data: an numpy array with non-zero values. + metadata: dictionary to contain information for recovering back + to original data array. + + Returns: + recovered_data: an numpy array with original shape. + """ + data = data.astype(np.float32) + data_shape = metadata['int_list'] + recovered_data = data.reshape(data_shape) + return recovered_data + + @staticmethod + def _topk_func(x, k): + """Select top k values. + + Args: + x: an numpy array to be sorted out for top-k components. + k: k most maximum values. + + Returns: + topk_mag: components with top-k values. + indices: indices of the top-k components. + """ + # quick sort as default on magnitude + idx = np.argsort(np.abs(x)) + # sorted order, the right most is the largest magnitude + length = x.shape[0] + start_idx = length - k + # get the top k magnitude + topk_mag = np.asarray(x[idx[start_idx:]]) + indices = np.asarray(idx[start_idx:]) + if min(topk_mag) - 0 < 10e-8: # avoid zeros + topk_mag = topk_mag + 10e-8 + return topk_mag, indices + + +class KmeansTransformer(Transformer): + """A transformer class to quantize input data.""" + + def __init__(self, n_cluster=6): + """Initialize.""" + self.n_cluster = n_cluster + self.lossy = True + + def forward(self, data, **kwargs): + """Quantize data into n_cluster levels of values. + + Args: + data: an flattened numpy array. + + Returns: + int_data: an numpy array being quantized. + metadata: dictionary to store a list of meta information. + """ + # clustering + data = data.reshape((-1, 1)) + if data.shape[0] >= self.n_cluster: + k_means = cluster.KMeans( + n_clusters=self.n_cluster, n_init=self.n_cluster) + k_means.fit(data) + quantized_values = k_means.cluster_centers_.squeeze() + indices = k_means.labels_ + quant_array = np.choose(indices, quantized_values) + else: + quant_array = data + int_array, int2float_map = self._float_to_int(quant_array) + metadata = {'int_to_float': int2float_map} + int_array = int_array.reshape(-1) + return int_array, metadata + + def backward(self, data, metadata, **kwargs): + """Recover data array back to the original numerical type. + + Args: + data: an numpy array with non-zero values + metadata: dictionary to contain information for recovering back + to original data array + + Returns: + data: an numpy array with original numerical type + """ + # convert back to float + data = co.deepcopy(data) + int2float_map = metadata['int_to_float'] + for key in int2float_map: + indices = data == key + data[indices] = int2float_map[key] + return data + + @staticmethod + def _float_to_int(np_array): + """ + Create look-up table for conversion between floating and integer types. + + Args: + np_array + + Returns: + int_array, int_to_float_map + """ + flatten_array = np_array.reshape(-1) + unique_value_array = np.unique(flatten_array) + int_array = np.zeros(flatten_array.shape, dtype=np.int32) + int_to_float_map = {} + float_to_int_map = {} + # create table + for idx, u_value in enumerate(unique_value_array): + int_to_float_map.update({idx: u_value}) + float_to_int_map.update({u_value: idx}) + # assign to the integer array + indices = np.where(flatten_array == u_value) + int_array[indices] = idx + int_array = int_array.reshape(np_array.shape) + return int_array, int_to_float_map + + +class GZIPTransformer(Transformer): + """A transformer class to losslessly compress data.""" + + def __init__(self): + """Initialize.""" + self.lossy = False + + def forward(self, data, **kwargs): + """Compress data into bytes. + + Args: + data: an numpy array with non-zero values + """ + bytes_ = data.astype(np.float32).tobytes() + compressed_bytes_ = gz.compress(bytes_) + metadata = {} + return compressed_bytes_, metadata + + def backward(self, data, metadata, **kwargs): + """Decompress data into numpy of float32. + + Args: + data: an numpy array with non-zero values + metadata: dictionary to contain information for recovering back + to original data array + + Returns: + data: + """ + decompressed_bytes_ = gz.decompress(data) + data = np.frombuffer(decompressed_bytes_, dtype=np.float32) + return data + + +class SKCPipeline(TransformationPipeline): + """A pipeline class to compress data lossly using sparsity and k-means methods.""" + + def __init__(self, p_sparsity=0.1, n_clusters=6, **kwargs): + """Initialize a pipeline of transformers. + + Args: + p_sparsity (float): Sparsity factor (Default=0.1) + n_cluster (int): Number of K-Means clusters (Default=6) + + Returns: + Data compression transformer pipeline object + """ + # instantiate each transformer + self.p = p_sparsity + self.n_cluster = n_clusters + transformers = [ + SparsityTransformer(self.p), + KmeansTransformer(self.n_cluster), + GZIPTransformer() + ] + super(SKCPipeline, self).__init__(transformers=transformers, **kwargs) diff --git a/openfl_contrib_workspace/__init__.py b/openfl_contrib_workspace/__init__.py new file mode 100644 index 0000000..f664507 --- /dev/null +++ b/openfl_contrib_workspace/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""openfl_contrib_workspace package.""" diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/.workspace b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/.workspace new file mode 100644 index 0000000..3c2c5d0 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/.workspace @@ -0,0 +1,2 @@ +current_plan_name: default + diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/cols.yaml b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/cols.yaml new file mode 100644 index 0000000..15e0e52 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/cols.yaml @@ -0,0 +1,5 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +collaborators: + \ No newline at end of file diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/data.yaml b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/data.yaml new file mode 100644 index 0000000..4c1fc32 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/data.yaml @@ -0,0 +1,5 @@ +## Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# collaborator_name ,data_directory_path +one,1 \ No newline at end of file diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/defaults b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/defaults new file mode 100644 index 0000000..fb82f9c --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/defaults @@ -0,0 +1,2 @@ +../../workspace/plan/defaults + diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/plan.yaml b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/plan.yaml new file mode 100644 index 0000000..f871634 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/plan/plan.yaml @@ -0,0 +1,53 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +aggregator : + defaults : plan/defaults/aggregator.yaml + template : openfl.component.Aggregator + settings : + init_state_path : save/torch_cnn_mnist_init.pbuf + best_state_path : save/torch_cnn_mnist_best.pbuf + last_state_path : save/torch_cnn_mnist_last.pbuf + rounds_to_train : 10 + log_metric_callback : + template : src.utils.write_metric + + +collaborator : + defaults : plan/defaults/collaborator.yaml + template : openfl.component.Collaborator + settings : + delta_updates : false + opt_treatment : RESET + +data_loader : + defaults : plan/defaults/data_loader.yaml + template : src.dataloader.PyTorchMNISTInMemory + settings : + collaborator_count : 2 + data_group_name : mnist + batch_size : 256 + +task_runner : + defaults : plan/defaults/task_runner.yaml + template : src.taskrunner.PyTorchCNN + +network : + defaults : plan/defaults/network.yaml + +assigner : + defaults : plan/defaults/assigner.yaml + +tasks : + defaults : plan/defaults/tasks_torch.yaml + train: + function : train_task + kwargs : + metrics : + - loss + epochs : 1 + aggregation_type : + template : openfl_contrib.interface.aggregation_functions.CustomWeightedAverage + +compression_pipeline : + defaults : plan/defaults/compression_pipeline.yaml \ No newline at end of file diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/requirements.txt b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/requirements.txt new file mode 100644 index 0000000..1132223 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/requirements.txt @@ -0,0 +1,4 @@ +torch==2.3.0 +torchvision==0.18 +tensorboard +wheel>=0.38.0 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/__init__.py b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/__init__.py new file mode 100644 index 0000000..d5df5b8 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""You may copy this file as the starting point of your own model.""" diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/dataloader.py b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/dataloader.py new file mode 100644 index 0000000..cb0acc4 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/dataloader.py @@ -0,0 +1,130 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" + +from openfl.federated import PyTorchDataLoader +from torchvision import datasets +from torchvision import transforms +import numpy as np +from logging import getLogger + +logger = getLogger(__name__) + + +class PyTorchMNISTInMemory(PyTorchDataLoader): + """PyTorch data loader for MNIST dataset.""" + + def __init__(self, data_path, batch_size, **kwargs): + """Instantiate the data object. + + Args: + data_path: The file path to the data + batch_size: The batch size of the data loader + **kwargs: Additional arguments, passed to super + init and load_mnist_shard + """ + super().__init__(batch_size, **kwargs) + + num_classes, X_train, y_train, X_valid, y_valid = load_mnist_shard( + shard_num=int(data_path), **kwargs + ) + + self.X_train = X_train + self.y_train = y_train + self.train_loader = self.get_train_loader() + + self.X_valid = X_valid + self.y_valid = y_valid + self.val_loader = self.get_valid_loader() + + self.num_classes = num_classes + + +def load_mnist_shard( + shard_num, collaborator_count, categorical=False, channels_last=True, **kwargs +): + """ + Load the MNIST dataset. + + Args: + shard_num (int): The shard to use from the dataset + collaborator_count (int): The number of collaborators in the + federation + categorical (bool): True = convert the labels to one-hot encoded + vectors (Default = True) + channels_last (bool): True = The input images have the channels + last (Default = True) + **kwargs: Additional parameters to pass to the function + + Returns: + list: The input shape + int: The number of classes + numpy.ndarray: The training data + numpy.ndarray: The training labels + numpy.ndarray: The validation data + numpy.ndarray: The validation labels + """ + num_classes = 10 + + (X_train, y_train), (X_valid, y_valid) = _load_raw_datashards( + shard_num, collaborator_count, transform=transforms.ToTensor() + ) + + logger.info(f"MNIST > X_train Shape : {X_train.shape}") + logger.info(f"MNIST > y_train Shape : {y_train.shape}") + logger.info(f"MNIST > Train Samples : {X_train.shape[0]}") + logger.info(f"MNIST > Valid Samples : {X_valid.shape[0]}") + + if categorical: + # convert class vectors to binary class matrices + y_train = one_hot(y_train, num_classes) + y_valid = one_hot(y_valid, num_classes) + + return num_classes, X_train, y_train, X_valid, y_valid + + +def one_hot(labels, classes): + """ + One Hot encode a vector. + + Args: + labels (list): List of labels to onehot encode + classes (int): Total number of categorical classes + + Returns: + np.array: Matrix of one-hot encoded labels + """ + return np.eye(classes)[labels] + + +def _load_raw_datashards(shard_num, collaborator_count, transform=None): + """ + Load the raw data by shard. + + Returns tuples of the dataset shard divided into training and validation. + + Args: + shard_num (int): The shard number to use + collaborator_count (int): The number of collaborators in the federation + transform: torchvision.transforms.Transform to apply to images + + Returns: + 2 tuples: (image, label) of the training, validation dataset + """ + train_data, val_data = ( + datasets.MNIST("data", train=train, download=True, transform=transform) + for train in (True, False) + ) + X_train_tot, y_train_tot = train_data.train_data, train_data.train_labels + X_valid_tot, y_valid_tot = val_data.test_data, val_data.test_labels + + # create the shards + shard_num = int(shard_num) + X_train = X_train_tot[shard_num::collaborator_count].unsqueeze(1).float() + y_train = y_train_tot[shard_num::collaborator_count] + + X_valid = X_valid_tot[shard_num::collaborator_count].unsqueeze(1).float() + y_valid = y_valid_tot[shard_num::collaborator_count] + + return (X_train, y_train), (X_valid, y_valid) diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/taskrunner.py b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/taskrunner.py new file mode 100644 index 0000000..c962a81 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/taskrunner.py @@ -0,0 +1,120 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from typing import Iterator +from typing import Tuple + +from openfl.federated import PyTorchTaskRunner +from openfl.utilities import Metric + + +class PyTorchCNN(PyTorchTaskRunner): + """ + Simple CNN for classification. + + PyTorchTaskRunner inherits from nn.module, so you can define your model + in the same way that you would for PyTorch + """ + + def __init__(self, device="cpu", **kwargs): + """Initialize. + + Args: + device: The hardware device to use for training (Default = "cpu") + **kwargs: Additional arguments to pass to the function + + """ + super().__init__(device=device, **kwargs) + + # Define the model + self.conv1 = nn.Conv2d(1, 20, 2, 1) + self.conv2 = nn.Conv2d(20, 50, 5, 1) + self.fc1 = nn.Linear(800, 500) + self.fc2 = nn.Linear(500, 10) + self.to(device) + + # `self.optimizer` must be set for optimizer weights to be federated + self.optimizer = optim.Adam(self.parameters(), lr=1e-4) + + # Set the loss function + self.loss_fn = F.cross_entropy + + def forward(self, x): + """ + Forward pass of the model. + + Args: + x: Data input to the model for the forward pass + """ + x = F.relu(self.conv1(x)) + x = F.max_pool2d(x, 2, 2) + x = F.relu(self.conv2(x)) + x = F.max_pool2d(x, 2, 2) + x = x.view(-1, 800) + x = F.relu(self.fc1(x)) + x = self.fc2(x) + return x + + def train_( + self, train_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]] + ) -> Metric: + """ + Train single epoch. + + Override this function in order to use custom training. + + Args: + train_dataloader: Train dataset batch generator. Yields (samples, targets) tuples of + size = `self.data_loader.batch_size`. + Returns: + Metric: An object containing name and np.ndarray value. + """ + losses = [] + for data, target in train_dataloader: + data, target = data.to(self.device), target.to(self.device) + self.optimizer.zero_grad() + output = self(data) + loss = self.loss_fn(output, target) + loss.backward() + self.optimizer.step() + losses.append(loss.detach().cpu().numpy()) + loss = np.mean(losses) + return Metric(name=self.loss_fn.__name__, value=np.array(loss)) + + def validate_( + self, validation_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]] + ) -> Metric: + """ + Perform validation on PyTorch Model + + Override this function for your own custom validation function + + Args: + validation_dataloader: Validation dataset batch generator. + Yields (samples, targets) tuples + Returns: + Metric: An object containing name and np.ndarray value + """ + + total_samples = 0 + val_score = 0 + with torch.no_grad(): + for data, target in validation_dataloader: + samples = target.shape[0] + total_samples += samples + data, target = data.to(self.device), target.to( + self.device, dtype=torch.int64 + ) + output = self(data) + # get the index of the max log-probability + pred = output.argmax(dim=1) + val_score += pred.eq(target).sum().cpu().numpy() + + accuracy = val_score / total_samples + return Metric(name="accuracy", value=np.array(accuracy)) diff --git a/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/utils.py b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/utils.py new file mode 100644 index 0000000..41023a7 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_custom_weighted_average/src/utils.py @@ -0,0 +1,21 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own utilities.""" + +from torch.utils.tensorboard import SummaryWriter + +writer = None + + +def get_writer(): + """Create global writer object.""" + global writer + if not writer: + writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) + + +def write_metric(node_name, task_name, metric_name, metric, round_number): + """Write metric callback.""" + get_writer() + writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/.workspace b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/.workspace new file mode 100644 index 0000000..3c2c5d0 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/.workspace @@ -0,0 +1,2 @@ +current_plan_name: default + diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/cols.yaml b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/cols.yaml new file mode 100644 index 0000000..15e0e52 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/cols.yaml @@ -0,0 +1,5 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +collaborators: + \ No newline at end of file diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/data.yaml b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/data.yaml new file mode 100644 index 0000000..c3812ae --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/data.yaml @@ -0,0 +1,5 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# collaborator_name ,data_directory_path +one,1 \ No newline at end of file diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/defaults b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/defaults new file mode 100644 index 0000000..fb82f9c --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/defaults @@ -0,0 +1,2 @@ +../../workspace/plan/defaults + diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/plan.yaml b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/plan.yaml new file mode 100644 index 0000000..627b2be --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/plan/plan.yaml @@ -0,0 +1,48 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +aggregator : + defaults : plan/defaults/aggregator.yaml + template : openfl.component.Aggregator + settings : + init_state_path : save/torch_cnn_mnist_init.pbuf + best_state_path : save/torch_cnn_mnist_best.pbuf + last_state_path : save/torch_cnn_mnist_last.pbuf + rounds_to_train : 10 + log_metric_callback : + template : src.utils.write_metric + + +collaborator : + defaults : plan/defaults/collaborator.yaml + template : openfl.component.Collaborator + settings : + delta_updates : false + opt_treatment : RESET + +data_loader : + defaults : plan/defaults/data_loader.yaml + template : src.dataloader.PyTorchMNISTInMemory + settings : + collaborator_count : 2 + data_group_name : mnist + batch_size : 256 + +task_runner : + defaults : plan/defaults/task_runner.yaml + template : src.taskrunner.PyTorchCNN + +network : + defaults : plan/defaults/network.yaml + +assigner : + defaults : plan/defaults/assigner.yaml + +tasks : + defaults : plan/defaults/tasks_torch.yaml + +compression_pipeline : + defaults : plan/defaults/compression_pipeline.yaml + template: openfl_contrib.pipelines.SKCPipeline + settings: + n_clusters : 2 \ No newline at end of file diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/requirements.txt b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/requirements.txt new file mode 100644 index 0000000..1132223 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/requirements.txt @@ -0,0 +1,4 @@ +torch==2.3.0 +torchvision==0.18 +tensorboard +wheel>=0.38.0 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/__init__.py b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/__init__.py new file mode 100644 index 0000000..d5df5b8 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""You may copy this file as the starting point of your own model.""" diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/dataloader.py b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/dataloader.py new file mode 100644 index 0000000..cb0acc4 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/dataloader.py @@ -0,0 +1,130 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" + +from openfl.federated import PyTorchDataLoader +from torchvision import datasets +from torchvision import transforms +import numpy as np +from logging import getLogger + +logger = getLogger(__name__) + + +class PyTorchMNISTInMemory(PyTorchDataLoader): + """PyTorch data loader for MNIST dataset.""" + + def __init__(self, data_path, batch_size, **kwargs): + """Instantiate the data object. + + Args: + data_path: The file path to the data + batch_size: The batch size of the data loader + **kwargs: Additional arguments, passed to super + init and load_mnist_shard + """ + super().__init__(batch_size, **kwargs) + + num_classes, X_train, y_train, X_valid, y_valid = load_mnist_shard( + shard_num=int(data_path), **kwargs + ) + + self.X_train = X_train + self.y_train = y_train + self.train_loader = self.get_train_loader() + + self.X_valid = X_valid + self.y_valid = y_valid + self.val_loader = self.get_valid_loader() + + self.num_classes = num_classes + + +def load_mnist_shard( + shard_num, collaborator_count, categorical=False, channels_last=True, **kwargs +): + """ + Load the MNIST dataset. + + Args: + shard_num (int): The shard to use from the dataset + collaborator_count (int): The number of collaborators in the + federation + categorical (bool): True = convert the labels to one-hot encoded + vectors (Default = True) + channels_last (bool): True = The input images have the channels + last (Default = True) + **kwargs: Additional parameters to pass to the function + + Returns: + list: The input shape + int: The number of classes + numpy.ndarray: The training data + numpy.ndarray: The training labels + numpy.ndarray: The validation data + numpy.ndarray: The validation labels + """ + num_classes = 10 + + (X_train, y_train), (X_valid, y_valid) = _load_raw_datashards( + shard_num, collaborator_count, transform=transforms.ToTensor() + ) + + logger.info(f"MNIST > X_train Shape : {X_train.shape}") + logger.info(f"MNIST > y_train Shape : {y_train.shape}") + logger.info(f"MNIST > Train Samples : {X_train.shape[0]}") + logger.info(f"MNIST > Valid Samples : {X_valid.shape[0]}") + + if categorical: + # convert class vectors to binary class matrices + y_train = one_hot(y_train, num_classes) + y_valid = one_hot(y_valid, num_classes) + + return num_classes, X_train, y_train, X_valid, y_valid + + +def one_hot(labels, classes): + """ + One Hot encode a vector. + + Args: + labels (list): List of labels to onehot encode + classes (int): Total number of categorical classes + + Returns: + np.array: Matrix of one-hot encoded labels + """ + return np.eye(classes)[labels] + + +def _load_raw_datashards(shard_num, collaborator_count, transform=None): + """ + Load the raw data by shard. + + Returns tuples of the dataset shard divided into training and validation. + + Args: + shard_num (int): The shard number to use + collaborator_count (int): The number of collaborators in the federation + transform: torchvision.transforms.Transform to apply to images + + Returns: + 2 tuples: (image, label) of the training, validation dataset + """ + train_data, val_data = ( + datasets.MNIST("data", train=train, download=True, transform=transform) + for train in (True, False) + ) + X_train_tot, y_train_tot = train_data.train_data, train_data.train_labels + X_valid_tot, y_valid_tot = val_data.test_data, val_data.test_labels + + # create the shards + shard_num = int(shard_num) + X_train = X_train_tot[shard_num::collaborator_count].unsqueeze(1).float() + y_train = y_train_tot[shard_num::collaborator_count] + + X_valid = X_valid_tot[shard_num::collaborator_count].unsqueeze(1).float() + y_valid = y_valid_tot[shard_num::collaborator_count] + + return (X_train, y_train), (X_valid, y_valid) diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/taskrunner.py b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/taskrunner.py new file mode 100644 index 0000000..c962a81 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/taskrunner.py @@ -0,0 +1,120 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from typing import Iterator +from typing import Tuple + +from openfl.federated import PyTorchTaskRunner +from openfl.utilities import Metric + + +class PyTorchCNN(PyTorchTaskRunner): + """ + Simple CNN for classification. + + PyTorchTaskRunner inherits from nn.module, so you can define your model + in the same way that you would for PyTorch + """ + + def __init__(self, device="cpu", **kwargs): + """Initialize. + + Args: + device: The hardware device to use for training (Default = "cpu") + **kwargs: Additional arguments to pass to the function + + """ + super().__init__(device=device, **kwargs) + + # Define the model + self.conv1 = nn.Conv2d(1, 20, 2, 1) + self.conv2 = nn.Conv2d(20, 50, 5, 1) + self.fc1 = nn.Linear(800, 500) + self.fc2 = nn.Linear(500, 10) + self.to(device) + + # `self.optimizer` must be set for optimizer weights to be federated + self.optimizer = optim.Adam(self.parameters(), lr=1e-4) + + # Set the loss function + self.loss_fn = F.cross_entropy + + def forward(self, x): + """ + Forward pass of the model. + + Args: + x: Data input to the model for the forward pass + """ + x = F.relu(self.conv1(x)) + x = F.max_pool2d(x, 2, 2) + x = F.relu(self.conv2(x)) + x = F.max_pool2d(x, 2, 2) + x = x.view(-1, 800) + x = F.relu(self.fc1(x)) + x = self.fc2(x) + return x + + def train_( + self, train_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]] + ) -> Metric: + """ + Train single epoch. + + Override this function in order to use custom training. + + Args: + train_dataloader: Train dataset batch generator. Yields (samples, targets) tuples of + size = `self.data_loader.batch_size`. + Returns: + Metric: An object containing name and np.ndarray value. + """ + losses = [] + for data, target in train_dataloader: + data, target = data.to(self.device), target.to(self.device) + self.optimizer.zero_grad() + output = self(data) + loss = self.loss_fn(output, target) + loss.backward() + self.optimizer.step() + losses.append(loss.detach().cpu().numpy()) + loss = np.mean(losses) + return Metric(name=self.loss_fn.__name__, value=np.array(loss)) + + def validate_( + self, validation_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]] + ) -> Metric: + """ + Perform validation on PyTorch Model + + Override this function for your own custom validation function + + Args: + validation_dataloader: Validation dataset batch generator. + Yields (samples, targets) tuples + Returns: + Metric: An object containing name and np.ndarray value + """ + + total_samples = 0 + val_score = 0 + with torch.no_grad(): + for data, target in validation_dataloader: + samples = target.shape[0] + total_samples += samples + data, target = data.to(self.device), target.to( + self.device, dtype=torch.int64 + ) + output = self(data) + # get the index of the max log-probability + pred = output.argmax(dim=1) + val_score += pred.eq(target).sum().cpu().numpy() + + accuracy = val_score / total_samples + return Metric(name="accuracy", value=np.array(accuracy)) diff --git a/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/utils.py b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/utils.py new file mode 100644 index 0000000..41023a7 --- /dev/null +++ b/openfl_contrib_workspace/torch_cnn_mnist_skc_compression/src/utils.py @@ -0,0 +1,21 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own utilities.""" + +from torch.utils.tensorboard import SummaryWriter + +writer = None + + +def get_writer(): + """Create global writer object.""" + global writer + if not writer: + writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) + + +def write_metric(node_name, task_name, metric_name, metric, round_number): + """Write metric callback.""" + get_writer() + writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/requirements-linters.txt b/requirements-linters.txt new file mode 100644 index 0000000..9d2599f --- /dev/null +++ b/requirements-linters.txt @@ -0,0 +1,14 @@ +flake8 +flake8-broken-line +flake8-bugbear +flake8-builtins +flake8-comprehensions +flake8-copyright +flake8-docstrings +flake8-eradicate +flake8-import-order +flake8-import-single +flake8-quotes +flake8-use-fstring +pep8-naming +setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..060ed65 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1 @@ +pytest==8.2.0 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..5ea6363 --- /dev/null +++ b/setup.py @@ -0,0 +1,17 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from setuptools import setup, find_packages + +setup( + name='openfl-contrib', + version='0.1.0', + author='The OpenFL Contributors', + description='Repository for academic and research contributions to OpenFL', + packages=find_packages(), + include_package_data=True, + install_requires=[ + 'openfl @ git+https://github.com/securefederatedai/openfl.git@develop', + ], + python_requires='>=3.8, <3.12' +) \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..28eee91 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""Tests package.""" diff --git a/tests/github/__init__.py b/tests/github/__init__.py new file mode 100644 index 0000000..28eee91 --- /dev/null +++ b/tests/github/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""Tests package.""" diff --git a/tests/github/test_task_runner.py b/tests/github/test_task_runner.py new file mode 100644 index 0000000..a69f065 --- /dev/null +++ b/tests/github/test_task_runner.py @@ -0,0 +1,63 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import time +import socket +import argparse +from pathlib import Path +from subprocess import check_call +from concurrent.futures import ProcessPoolExecutor + +from openfl.utilities.utils import rmtree +from tests.github.utils import create_collaborator, initialize_and_certify_workspace, certify_aggregator + + +if __name__ == '__main__': + # Test the pipeline + parser = argparse.ArgumentParser() + parser.add_argument('--workspace', default='fed_work12345alpha81671') + parser.add_argument('--col1', default='one123dragons') + parser.add_argument('--col2', default='beta34unicorns') + parser.add_argument('--rounds-to-train') + parser.add_argument('--col1-data-path', default='1') + parser.add_argument('--col2-data-path', default='2') + parser.add_argument('--save-model') + + origin_dir = Path.cwd().resolve() + args = parser.parse_args() + workspace = args.workspace + archive_name = f'{workspace}.zip' + fqdn = socket.getfqdn() + rounds_to_train = args.rounds_to_train + col1, col2 = args.col1, args.col2 + col1_data_path, col2_data_path = args.col1_data_path, args.col2_data_path + save_model = args.save_model + + # START + # ===== + # Make sure you are in a Python virtual environment with the FL package installed. + initialize_and_certify_workspace(workspace, fqdn, rounds_to_train) + certify_aggregator(fqdn) + + workspace_root = Path().resolve() # Get the absolute directory path for the workspace + + # Create collaborator #1 + create_collaborator(col1, workspace_root, col1_data_path, archive_name, workspace) + + # Create collaborator #2 + create_collaborator(col2, workspace_root, col2_data_path, archive_name, workspace) + + # Run the federation + with ProcessPoolExecutor(max_workers=3) as executor: + executor.submit(check_call, ['fx', 'aggregator', 'start'], cwd=workspace_root) + time.sleep(5) + + dir1 = workspace_root / col1 / workspace + executor.submit(check_call, ['fx', 'collaborator', 'start', '-n', col1], cwd=dir1) + + dir2 = workspace_root / col2 / workspace + executor.submit(check_call, ['fx', 'collaborator', 'start', '-n', col2], cwd=dir2) + + os.chdir(origin_dir) + rmtree(workspace_root) diff --git a/tests/github/utils.py b/tests/github/utils.py new file mode 100644 index 0000000..24550b8 --- /dev/null +++ b/tests/github/utils.py @@ -0,0 +1,83 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import shutil +from subprocess import check_call +import os +from pathlib import Path +import re +import tarfile + + +def create_collaborator(col, workspace_root, data_path, archive_name, fed_workspace): + # Copy workspace to collaborator directories (these can be on different machines) + col_path = workspace_root / col + shutil.rmtree(col_path, ignore_errors=True) # Remove any existing directory + col_path.mkdir() # Create a new directory for the collaborator + + # Import the workspace to this collaborator + check_call( + ['fx', 'workspace', 'import', '--archive', workspace_root / archive_name], + cwd=col_path + ) + + # Create collaborator certificate request + check_call( + ['fx', 'collaborator', 'create', '-d', data_path, '-n', col, '--silent'], + cwd=col_path / fed_workspace + ) + # Remove '--silent' if you run this manually + check_call( + ['fx', 'collaborator', 'generate-cert-request', '-n', col, '--silent'], + cwd=col_path / fed_workspace + ) + + # Sign collaborator certificate + # Remove '--silent' if you run this manually + request_pkg = col_path / fed_workspace / f'col_{col}_to_agg_cert_request.zip' + check_call( + ['fx', 'collaborator', 'certify', '--request-pkg', str(request_pkg), '--silent'], + cwd=workspace_root) + + # Import the signed certificate from the aggregator + import_path = workspace_root / f'agg_to_col_{col}_signed_cert.zip' + check_call( + ['fx', 'collaborator', 'certify', '--import', import_path], + cwd=col_path / fed_workspace + ) + + +def initialize_and_certify_workspace(path, fqdn, rounds_to_train): + path = os.path.join('openfl_contrib_workspace', path) + os.chdir(path) + + check_call(['pip', 'install', '-r', 'requirements.txt']) + if not os.path.exists('save'): + os.makedirs('save') + + # Initialize FL plan + check_call(['fx', 'plan', 'initialize', '-a', fqdn]) + plan_path = Path('plan/plan.yaml') + try: + rounds_to_train = int(rounds_to_train) + with open(plan_path, "r", encoding='utf-8') as sources: + lines = sources.readlines() + with open(plan_path, "w", encoding='utf-8') as sources: + for line in lines: + sources.write( + re.sub(r'rounds_to_train.*', f'rounds_to_train: {rounds_to_train}', line) + ) + except (ValueError, TypeError): + pass + # Create certificate authority for workspace + check_call(['fx', 'workspace', 'certify']) + + # Export FL workspace + check_call(['fx', 'workspace', 'export']) + + +def certify_aggregator(fqdn): + # Create aggregator certificate + check_call(['fx', 'aggregator', 'generate-cert-request', '--fqdn', fqdn]) + + # Sign aggregator certificate + check_call(['fx', 'aggregator', 'certify', '--fqdn', fqdn, '--silent']) \ No newline at end of file diff --git a/tests/openfl-contrib/__init__.py b/tests/openfl-contrib/__init__.py new file mode 100644 index 0000000..1a666a0 --- /dev/null +++ b/tests/openfl-contrib/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""tests.openfl-contrib package.""" diff --git a/tests/openfl-contrib/interface/aggregation_functions/test_custom_weighted_average.py b/tests/openfl-contrib/interface/aggregation_functions/test_custom_weighted_average.py new file mode 100644 index 0000000..be1d26c --- /dev/null +++ b/tests/openfl-contrib/interface/aggregation_functions/test_custom_weighted_average.py @@ -0,0 +1,48 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""CustomWeightedAverage tests.""" + +import pytest +import numpy as np + +from openfl.databases.tensor_db import TensorDB +from openfl.utilities.types import TensorKey + +from openfl_contrib.interface.aggregation_functions import CustomWeightedAverage + + +@pytest.fixture +def tensor_db(): + """Prepare tensor db.""" + db = TensorDB() + array_1 = np.array([0, 1, 2, 3, 4]) + tensor_key_1 = TensorKey(tensor_name='tensor', origin= '', round_number=0, report=False, tags=('col1',)) + array_2 = np.array([5, 6, 7, 8, 9]) + tensor_key_2 = TensorKey(tensor_name='tensor', origin= '', round_number=0, report=False, tags=('col2',)) + db.cache_tensor({ + tensor_key_1: array_1, + tensor_key_2: array_2 + }) + return db + + +def test_get_aggregated_tensor_weights(tensor_db): + """Test that get_aggregated_tensor calculates correctly.""" + + collaborator_weight_dict = { + 'col1': 0.9, + 'col2': 0.1 + } + + tensor_key = TensorKey(tensor_name='tensor', origin= '', round_number=0, report=False, tags=()) + + agg_nparray = tensor_db.get_aggregated_tensor( + tensor_key, collaborator_weight_dict, CustomWeightedAverage()) + + control_nparray = np.average( + [np.array([0, 1, 2, 3, 4]), np.array([5, 6, 7, 8, 9])], + weights=np.array(list(collaborator_weight_dict.values())), + axis=0 + ) + + assert np.array_equal(agg_nparray, control_nparray) \ No newline at end of file diff --git a/tests/openfl-contrib/pipelines/test_skc_compression.py b/tests/openfl-contrib/pipelines/test_skc_compression.py new file mode 100644 index 0000000..50e2856 --- /dev/null +++ b/tests/openfl-contrib/pipelines/test_skc_compression.py @@ -0,0 +1,47 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import numpy as np +import pytest + +from openfl.protocols import base_pb2 +from openfl_contrib.pipelines import SKCPipeline + + +@pytest.fixture +def named_tensor(): + """Initialize the named_tensor mock.""" + tensor = base_pb2.NamedTensor( + name='tensor_name', + round_number=0, + lossless=False, + report=False, + data_bytes=32 * b'1' + ) + metadata = tensor.transformer_metadata.add() + metadata.int_to_float[1] = 1. + metadata.int_list.extend([1, 8]) + metadata.bool_list.append(True) + + return tensor + + +def test_skc_compression_pipeline(named_tensor): + """Test that SKCPipeline works correctly.""" + + tp = SKCPipeline() + proto = named_tensor.transformer_metadata.pop() + metadata = {'int_to_float': proto.int_to_float, + 'int_list': proto.int_list, + 'bool_list': proto.bool_list + } + array_shape = tuple(metadata['int_list']) + flat_array = np.frombuffer(named_tensor.data_bytes, dtype=np.float32) + + nparray = np.reshape(flat_array, newshape=array_shape, order='C') + + data_fwd, transformer_metadata = tp.forward(nparray) + assert len(transformer_metadata) == 3 + assert isinstance(data_fwd, bytes) + + data_bwd = tp.backward(data_fwd, transformer_metadata) + assert data_bwd.shape == tuple(metadata['int_list'])