diff --git a/federatedscope/core/auxiliaries/data_builder.py b/federatedscope/core/auxiliaries/data_builder.py index 6f41f1452..7a3d7294a 100644 --- a/federatedscope/core/auxiliaries/data_builder.py +++ b/federatedscope/core/auxiliaries/data_builder.py @@ -650,9 +650,12 @@ def get_data(config): ] or config.data.type.startswith('graph_multi_domain'): from federatedscope.gfl.dataloader import load_graphlevel_dataset data, modified_config = load_graphlevel_dataset(config) - elif config.data.type.lower() == 'vertical_fl_data': - from federatedscope.vertical_fl.dataloader import load_vertical_data - data, modified_config = load_vertical_data(config, generate=True) + elif config.data.type.lower() == 'secure_lr_data': + from federatedscope.vertical.dataloader import load_data + data, modified_config = load_data(config, generate=True) + elif config.data.type.lower() == 'caesar_v_fl_data': + from federatedscope.vertical.dataloader import load_data + data, modified_config = load_data(config, generate=True) elif 'movielens' in config.data.type.lower( ) or 'netflix' in config.data.type.lower(): from federatedscope.mf.dataloader import load_mf_dataset diff --git a/federatedscope/core/auxiliaries/worker_builder.py b/federatedscope/core/auxiliaries/worker_builder.py index bff5c6183..ffc9044e0 100644 --- a/federatedscope/core/auxiliaries/worker_builder.py +++ b/federatedscope/core/auxiliaries/worker_builder.py @@ -11,8 +11,12 @@ def get_client_cls(cfg): from federatedscope.autotune.fedex import FedExClient return FedExClient - if cfg.vertical.use: - from federatedscope.vertical_fl.worker import vFLClient + if cfg.secure_vertical.use: + from federatedscope.vertical.secure_LR.worker import vFLClient + return vFLClient + + if cfg.caesar_vertical.use: + from federatedscope.vertical.caesar_v_fl.worker import vFLClient return vFLClient if cfg.federate.method.lower() in constants.CLIENTS_TYPE: @@ -67,8 +71,12 @@ def get_server_cls(cfg): import BackdoorServer return BackdoorServer - if cfg.vertical.use: - from federatedscope.vertical_fl.worker import vFLServer + if cfg.secure_vertical.use: + from federatedscope.vertical.secure_LR.worker import vFLServer + return vFLServer + + if cfg.caesar_vertical.use: + from federatedscope.vertical.caesar_v_fl.worker import vFLServer return vFLServer if cfg.federate.method.lower() in constants.SERVER_TYPE: diff --git a/federatedscope/core/configs/cfg_fl_setting.py b/federatedscope/core/configs/cfg_fl_setting.py index 4f374c26d..2ce7e1a36 100644 --- a/federatedscope/core/configs/cfg_fl_setting.py +++ b/federatedscope/core/configs/cfg_fl_setting.py @@ -65,13 +65,22 @@ def extend_fl_setting_cfg(cfg): cfg.distribute.grpc_enable_http_proxy = False # ---------------------------------------------------------------------- # - # Vertical FL related options (for demo) + # Secure_LR vertical FL related options (for demo) # ---------------------------------------------------------------------- # - cfg.vertical = CN() - cfg.vertical.use = False - cfg.vertical.encryption = 'paillier' - cfg.vertical.dims = [5, 10] - cfg.vertical.key_size = 3072 + cfg.secure_vertical = CN() + cfg.secure_vertical.use = False + cfg.secure_vertical.encryption = 'paillier' + cfg.secure_vertical.dims = [5, 10] + cfg.secure_vertical.key_size = 3072 + + # ---------------------------------------------------------------------- # + # caesar vertical FL related options (for demo) + # ---------------------------------------------------------------------- # + cfg.caesar_vertical = CN() + cfg.caesar_vertical.use = False + cfg.caesar_vertical.encryption = 'paillier' + cfg.caesar_vertical.dims = [5, 10] + cfg.caesar_vertical.key_size = 3072 # --------------- register corresponding check function ---------- cfg.register_cfg_check_fun(assert_fl_setting_cfg) diff --git a/federatedscope/core/secret_sharing/secret_sharing.py b/federatedscope/core/secret_sharing/secret_sharing.py index 31fb99b79..9eeaec0dc 100644 --- a/federatedscope/core/secret_sharing/secret_sharing.py +++ b/federatedscope/core/secret_sharing/secret_sharing.py @@ -24,14 +24,14 @@ class AdditiveSecretSharing(SecretSharing): AdditiveSecretSharing class, which can split a number into frames and recover it by summing up """ - def __init__(self, shared_party_num, size=60): + def __init__(self, shared_party_num, size=20): super(SecretSharing, self).__init__() assert shared_party_num > 1, "AdditiveSecretSharing require " \ "shared_party_num > 1" self.shared_party_num = shared_party_num self.maximum = 2**size self.mod_number = 2 * self.maximum + 1 - self.epsilon = 1e8 + self.epsilon = 1e4 self.mod_funs = np.vectorize(lambda x: x % self.mod_number) self.float2fixedpoint = np.vectorize(self._float2fixedpoint) self.fixedpoint2float = np.vectorize(self._fixedpoint2float) @@ -73,16 +73,23 @@ def secret_reconstruct(self, secret_seq): To recover the secret """ assert len(secret_seq) == self.shared_party_num - merge_model = secret_seq[0].copy() + merge_model = None + secret_seq = [np.asarray(x) for x in secret_seq] if isinstance(merge_model, dict): for key in merge_model: for idx in range(len(secret_seq)): if idx == 0: - merge_model[key] = secret_seq[idx][key] + merge_model[key] = secret_seq[idx][key].copy() else: merge_model[key] += secret_seq[idx][key] merge_model[key] = self.fixedpoint2float(merge_model[key]) - + else: + for idx in range(len(secret_seq)): + if idx == 0: + merge_model = secret_seq[idx].copy() + else: + merge_model += secret_seq[idx] + merge_model = self.fixedpoint2float(merge_model) return merge_model def _float2fixedpoint(self, x): diff --git a/federatedscope/vertical_fl/Paillier/__init__.py b/federatedscope/vertical/Paillier/__init__.py similarity index 100% rename from federatedscope/vertical_fl/Paillier/__init__.py rename to federatedscope/vertical/Paillier/__init__.py diff --git a/federatedscope/vertical_fl/Paillier/abstract_paillier.py b/federatedscope/vertical/Paillier/abstract_paillier.py similarity index 100% rename from federatedscope/vertical_fl/Paillier/abstract_paillier.py rename to federatedscope/vertical/Paillier/abstract_paillier.py diff --git a/federatedscope/vertical/caesar_v_fl/README.md b/federatedscope/vertical/caesar_v_fl/README.md new file mode 100644 index 000000000..cff310288 --- /dev/null +++ b/federatedscope/vertical/caesar_v_fl/README.md @@ -0,0 +1,18 @@ +### Caesar Vertical Federated Learning + +We provide an example for seCure lArge-scalE SpArse logistic Regression (caesar) vertical federated learning, you can run with: +```bash +python3 ../../main.py --cfg caesar_v_fl.yaml +``` + +Implementation of caesar vertical FL refer to `When Homomorphic Encryption + Marries Secret Sharing: Secure Large-Scale Sparse Logistic Regression and + Applications in Risk Control` [Chen, et al., 2021] + (https://arxiv.org/abs/2008.08753) + +You can specify customized configurations in `caesar_v_fl.yaml`, such as `data.type` and `federate.total_round_num`. + + +Note that FederatedScope only provide an `abstract_paillier`, user can refer to [pyphe](https://github.com/data61/python-paillier/blob/master/phe/paillier.py) for the detail implementation, or adopt other homomorphic encryption algorithms. + +More support for vertical federated learning is coming soon! We really appreciate any contributions to FederatedScope ! \ No newline at end of file diff --git a/federatedscope/vertical/caesar_v_fl/__init__.py b/federatedscope/vertical/caesar_v_fl/__init__.py new file mode 100644 index 000000000..d8f28ef68 --- /dev/null +++ b/federatedscope/vertical/caesar_v_fl/__init__.py @@ -0,0 +1 @@ +from federatedscope.vertical.Paillier.abstract_paillier import * diff --git a/federatedscope/vertical/caesar_v_fl/caesar_v_fl.yaml b/federatedscope/vertical/caesar_v_fl/caesar_v_fl.yaml new file mode 100644 index 000000000..5d155a9c2 --- /dev/null +++ b/federatedscope/vertical/caesar_v_fl/caesar_v_fl.yaml @@ -0,0 +1,22 @@ +use_gpu: False +federate: + mode: standalone + client_num: 2 + total_round_num: 30 +model: + type: lr + use_bias: False +train: + optimizer: + lr: 0.001 +data: + type: caesar_v_fl_data + batch_size: 50 +caesar_vertical: + use: True + key_size: 256 +trainer: + type: none +eval: + freq: 5 + best_res_update_round_wise_key: test_loss diff --git a/federatedscope/vertical/caesar_v_fl/worker/__init__.py b/federatedscope/vertical/caesar_v_fl/worker/__init__.py new file mode 100644 index 000000000..7dc9807ab --- /dev/null +++ b/federatedscope/vertical/caesar_v_fl/worker/__init__.py @@ -0,0 +1,6 @@ +from federatedscope.vertical.caesar_v_fl.worker.vertical_client \ + import vFLClient +from federatedscope.vertical.caesar_v_fl.worker.vertical_server \ + import vFLServer + +__all__ = ['vFLServer', 'vFLClient'] diff --git a/federatedscope/vertical/caesar_v_fl/worker/vertical_client.py b/federatedscope/vertical/caesar_v_fl/worker/vertical_client.py new file mode 100644 index 000000000..569a998ff --- /dev/null +++ b/federatedscope/vertical/caesar_v_fl/worker/vertical_client.py @@ -0,0 +1,383 @@ +import numpy as np +import logging + +from federatedscope.core.workers import Client +from federatedscope.core.message import Message +from federatedscope.vertical.dataloader.utils import batch_iter +# since we use an abstract Paillier, so we can ss it by our simple ss scheme, +# for the real one, you may also rewrite the ss scheme +from federatedscope.vertical.Paillier import abstract_paillier +# from federatedscope.core.secret_sharing import AdditiveSecretSharing +from federatedscope.vertical.simple_secret_sharing import AdditiveSecretSharing + +logger = logging.getLogger(__name__) + + +class vFLClient(Client): + """ + The client class for CAESAR vertical FL, which customizes the handled + functions. Please refer to the tutorial for more details about the + implementation algorithm + Implementation of CAESAR Vertical FL refer to `When Homomorphic Encryption + Marries Secret Sharing: Secure Large-Scale Sparse Logistic Regression and + Applications in Risk Control` [Chen, et al., 2021] + (https://arxiv.org/abs/2008.08753) + + For convenience, we assume that client B has the label. + """ + def __init__(self, + ID=-1, + server_id=None, + state=-1, + config=None, + data=None, + model=None, + device='cpu', + strategy=None, + *args, + **kwargs): + + super(vFLClient, + self).__init__(ID, server_id, state, config, data, model, device, + strategy, *args, **kwargs) + + cfg_key_size = config.caesar_vertical.key_size + self.my_public_key, self.my_private_key = \ + abstract_paillier.generate_paillier_keypair(n_length=cfg_key_size) + self.data = data + # A has my_para = w_a, B has my_para = w_b + self.my_para = None + # w_a = _1 + _2 and w_b = _1 + _2 + self.my_part_of_others_para = None + self.my_part_of_my_para = None + self.others_public_key = None + self.my_part_of_my_z = None + self.my_part_of_others_part_of_my_z = None + self.my_part_of_my_part_of_others_z = None + self.batch_index = None + self.own_label = ('y' in self.data['train']) + self.y = None + self.lr = config.train.optimizer.lr + self.dataloader = batch_iter(self.data['train'], + self._cfg.data.batch_size, + shuffled=True) + self.total_round_num = None + + self.ss = AdditiveSecretSharing(shared_party_num=2) + + self.register_handlers('model_para', self.callback_func_for_model_para) + self.register_handlers('public_key_and_para', + self.callback_func_for_public_key_and_para) + self.register_handlers('sample_data', + self.callback_func_for_sample_data) + self.register_handlers('decrypt', self.callback_func_for_decrypt) + self.register_handlers('encrypted', self.callback_func_for_encrypted) + self.register_handlers('three_values', + self.callback_func_for_three_values) + self.register_handlers('complicated_comp', + self.callback_func_for_a_to_update_para) + self.register_handlers('para_update', + self.callback_func_for_b_to_update_para) + self.register_handlers('final_step_for_a', + self.callback_func_for_final_step_for_a) + self.register_handlers('para_exchange', + self.callback_func_for_para_exchange) + + def sample_data(self, index=None): + if index is None: + assert self.own_label + return next(self.dataloader) + else: + return self.data['train']['x'][index] + + # A and B receive paras, ss them, and save them respectively, + + def callback_func_for_model_para(self, message: Message): + self.total_round_num, self.my_para = message.content + self.my_part_of_my_para, others_part_of_my_para = \ + self.ss.secret_split(self.my_para) + + self.comm_manager.send( + Message(msg_type='public_key_and_para', + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=(self.my_public_key, others_part_of_my_para))) + + def callback_func_for_public_key_and_para(self, message: Message): + self.others_public_key, self.my_part_of_others_para = message.content + if self.own_label: + self.move_to_the_next_train() + + # start training + # B samples data + # B calls encode() + + def move_to_the_next_train(self): + index, self.input_x, input_y = self.sample_data() + self.y = input_y + self.batch_index = index + self.comm_manager.send( + Message(msg_type='sample_data', + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=self.batch_index)) + self.encode() + + # A samples the same data as B + # A also calls encode() + + def callback_func_for_sample_data(self, message: Message): + index = message.content + self.batch_index = index + self.input_x = self.sample_data(index=self.batch_index) + self.encode() + + # both clients do the following + def encode(self): + en_para = [ + self.my_public_key.encrypt(x) for x in self.my_part_of_others_para + ] + self.comm_manager.send( + Message(msg_type='encrypted', + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=en_para)) + + def callback_func_for_encrypted(self, message: Message): + en_para = message.content + + upgrade_x = self.ss.upgrade(self.input_x) + self.my_part_of_my_z = self.ss.mod_matmul(upgrade_x, + self.my_part_of_my_para) + + tmp1 = self.ss.mod_matmul(upgrade_x, en_para) + + self.my_part_of_others_part_of_my_z, en_z = \ + self.ss.secret_split_for_piece_of_ss(tmp1) + + self.comm_manager.send( + Message(msg_type='decrypt', + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=en_z)) + + def callback_func_for_decrypt(self, message: Message): + en_z = message.content + self.my_part_of_my_part_of_others_z = [ + self.my_private_key.decrypt(x) for x in en_z + ] + if not self.own_label: + self.a_computes() + + # A computes _1 = _1 + <_2>_1 + <_1>_1, + # _1 squared and _1 cubed + # A encrypts the above three vectors and sends them to B + + def a_computes(self): + tmp1 = [ + self.my_part_of_my_z[i] + self.my_part_of_others_part_of_my_z[i] + + self.my_part_of_my_part_of_others_z[i] + for i in range(len(self.my_part_of_my_z)) + ] + tmp1 = self.ss.mod_funs(tmp1) + tmp1 = [self.my_public_key.encrypt(x) for x in tmp1] + + self.comm_manager.send( + Message(msg_type='three_values', + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=tmp1)) + + # B receives [_1]_a, [_1 squared]_a, [_1 cubed]_a + # B computes _2 = _2 + <_2>_2 + <_1>_2 + # [z]_a = [_1]_a + _2 + # B gets _2 + # B sends [_1]_a = [y hat]_a - _2 to A + # B computes [e]_a = [y hat]_a - y + # _2 = _2 - y + # [e]_a = [y hat]_a - y + # [g_b]_a = [e]_a * X_b and ss it + # B keeps _2 + # B sends [_1]_a to A + # B sends [_2]_b to A + def callback_func_for_three_values(self, message: Message): + tmp1 = message.content + z2 = [ + self.my_part_of_my_z[i] + self.my_part_of_my_part_of_others_z[i] + + self.my_part_of_others_part_of_my_z[i] + for i in range(len(self.my_part_of_my_z)) + ] + z2 = self.ss.mod_funs(z2) + + za = self.ss.mod_add(tmp1, z2) + + za = self.ss.fixedpoint2float(za) + za = [round(x) for x in za] + + # Here is the approximation of sigmoid function, + # different data and different forms may lead to diversity, + # here linear is enough + t1 = [self.ss.const_mul_fixedpoint(1 / 4, x) for x in za] + y_hat_a = [self.ss.const_add_fixedpoint(1 / 2, x) for x in t1] + upgrade_y = self.ss.upgrade(self.y) + + e_a = self.ss.mod_add(y_hat_a, -upgrade_y) + + y_hat_2, y_hat_1 = self.ss.secret_split_for_piece_of_ss(y_hat_a) + e_2 = self.ss.mod_add(y_hat_2, -upgrade_y) + + upgrade_x = self.ss.upgrade(self.input_x) + g_b_a = self.ss.err_mul_data(e_a, upgrade_x) + g_b_2, g_b_1 = self.ss.secret_split_for_piece_of_ss(g_b_a) + + # user b update w_b_2 + upgrade_lr = self.ss.upgrade(self.lr) + t1 = [upgrade_lr * x for x in g_b_2] + + self.my_part_of_my_para = self.ss.mod_add(self.my_part_of_my_para, + [-x for x in t1]) + + encrypted_e_2 = [self.my_public_key.encrypt(x) for x in e_2] + self.comm_manager.send( + Message(msg_type="complicated_comp", + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=(y_hat_1, g_b_1, encrypted_e_2))) + + # A receives [_1]_a and [_1]_a and [_2]_b + # A decrypts [_1]_a and gets _1 + # A decrypts [_1]_a and gets _1 + # A computes _1 = _1 * X_a + # A computes [_2]_b = [_2]_b * X_b and ss it + # A gets <_2>_1 + # A sends [<_2>_2]_b = [_2>]_b - <_2>_1 to B + # A updates _1 = _1 - alpha * (_1 + <_2>_1) + # A updates _1 = _1- alpha * (_1) + + def callback_func_for_a_to_update_para(self, message: Message): + y_hat_1, en_g_b_1, encrypted_e_2 = message.content + g_b_1 = [self.my_private_key.decrypt(x) for x in en_g_b_1] + e_1 = [self.my_private_key.decrypt(x) for x in y_hat_1] + + upgrade_x = self.ss.upgrade(self.input_x) + g_a_1 = self.ss.err_mul_data(e_1, upgrade_x) + + g_a_2 = self.ss.err_mul_data(encrypted_e_2, upgrade_x) + g_a_2_1, g_a_2_2 = self.ss.secret_split_for_piece_of_ss(g_a_2) + + # user A updates w_a_1 + upgrade_lr = self.ss.upgrade(self.lr) + t1 = self.ss.mod_funs( + [g_a_1[i] + g_a_2_1[i] for i in range(len(g_a_1))]) + t2 = [upgrade_lr * x for x in t1] + + self.my_part_of_my_para = self.ss.mod_add(self.my_part_of_my_para, + [-x for x in t2]) + + # user A updates w_b_1 + t3 = [upgrade_lr * x for x in g_b_1] + self.my_part_of_others_para = self.ss.mod_add( + self.my_part_of_others_para, [-x for x in t3]) + + self.comm_manager.send( + Message(msg_type="para_update", + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=g_a_2_2)) + + # B receives [<_2>_2]_b and decrypts it and gets <_2>_2 + # B updates _2 = _2 - alpha * <_2>_2 + # B updates _2 = _2 - alpha * _2 + # And goes to the next iteration + # After training, + # B sends _2 to A + + def callback_func_for_b_to_update_para(self, message: Message): + g_a_2_2 = message.content + g_a_2_2 = [self.my_private_key.decrypt(x) for x in g_a_2_2] + + # user B updates w_a_2 + upgrade_lr = self.ss.upgrade(self.lr) + t1 = [upgrade_lr * x for x in g_a_2_2] + + self.my_part_of_others_para = self.ss.mod_add( + self.my_part_of_others_para, [-x for x in t1]) + + self.state += 1 + if self.state < self.total_round_num: + # Move to next round of training + logger.info(f'----------- Starting a new training round (Round ' + f'#{self.state}) -------------') + self.move_to_the_next_train() + else: + self.final_step() + + # call exchange_paras(): two clients send each other their part of paras + def final_step(self): + self.comm_manager.send( + Message(msg_type="final_step_for_a", + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=None)) + self.exchange_paras() + + def callback_func_for_final_step_for_a(self, message: Message): + self.exchange_paras() + + def exchange_paras(self): + self.comm_manager.send( + Message(msg_type="para_exchange", + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=self.my_part_of_others_para)) + + def callback_func_for_para_exchange(self, message: Message): + para = message.content + self.my_para = self.ss.secret_reconstruct_for_ss_pieces( + (self.my_part_of_my_para, para)) + self.my_para = self.ss.fixedpoint2float(self.my_para) + self.my_para = self.my_para / self.ss.epsilon + + # print("my_para", self.my_para) + self.comm_manager.send( + Message(msg_type='para_for_server', + sender=self.ID, + receiver=self.server_id, + state=self.state, + content=self.my_para)) diff --git a/federatedscope/vertical/caesar_v_fl/worker/vertical_server.py b/federatedscope/vertical/caesar_v_fl/worker/vertical_server.py new file mode 100644 index 000000000..4714d1964 --- /dev/null +++ b/federatedscope/vertical/caesar_v_fl/worker/vertical_server.py @@ -0,0 +1,96 @@ +import numpy as np +import logging +import math + +from federatedscope.core.workers import Server +from federatedscope.core.message import Message + +logger = logging.getLogger(__name__) + + +class vFLServer(Server): + """ + The server class for vertical FL, which customizes the handled + functions. Please refer to the tutorial for more details about the + implementation algorithm + Implementation of Vertical FL refer to `When Homomorphic Encryption + Marries Secret Sharing: Secure Large-Scale Sparse Logistic Regression and + Applications in Risk Control` [Chen, et al., 2021] + (https://arxiv.org/abs/2008.08753) + """ + def __init__(self, + ID=-1, + state=0, + config=None, + data=None, + model=None, + client_num=5, + total_round_num=10, + device='cpu', + strategy=None, + **kwargs): + super(vFLServer, + self).__init__(ID, state, config, data, model, client_num, + total_round_num, device, strategy, **kwargs) + self.dims = [0] + config.caesar_vertical.dims + self.omega = self.model.state_dict()['fc.weight'].numpy().reshape(-1) + # self.omega = [0 for _ in range(sum(self.dims))] + + self.lr = config.train.optimizer.lr + self.w_a = None + self.w_b = None + self.coll = dict() + + self.register_handlers('para_for_server', + self.callback_func_for_para_for_server) + + def trigger_for_start(self): + if self.check_client_join_in(): + self.broadcast_client_address() + self.broadcast_model_para() + + def broadcast_model_para(self): + client_ids = self.comm_manager.neighbors.keys() + cur_idx = 0 + for client_id in client_ids: + omega_slices = self.omega[cur_idx:cur_idx + + self.dims[int(client_id)]] + self.comm_manager.send( + Message(msg_type='model_para', + sender=self.ID, + receiver=client_id, + state=self.state, + content=(self.total_round_num, omega_slices))) + cur_idx += self.dims[int(client_id)] + + def callback_func_for_para_for_server(self, message: Message): + tmp = message.content + idx = message.sender + self.coll[idx] = tmp + self.output() + + def output(self): + if len(self.coll) == 2: + metrics = self.evaluate() + self._monitor.update_best_result( + self.best_results, + metrics, + results_type='server_global_eval', + round_wise_update_key=self._cfg.eval. + best_res_update_round_wise_key) + formatted_logs = self._monitor.format_eval_res( + metrics, + rnd=self.state, + role='Server #', + forms=self._cfg.eval.report) + logger.info(formatted_logs) + self.coll = dict() + + def evaluate(self): + self.omega = np.concatenate([self.coll[1], self.coll[2]], axis=-1) + test_x = self.data['test']['x'] + test_y = self.data['test']['y'] + loss = np.mean( + np.log(1 + np.exp(-test_y * np.matmul(test_x, self.omega)))) + acc = np.mean((test_y * np.matmul(test_x, self.omega)) > 0) + return {'test_loss': loss, 'test_acc': acc, 'test_total': len(test_y)} diff --git a/federatedscope/vertical/dataloader/__init__.py b/federatedscope/vertical/dataloader/__init__.py new file mode 100644 index 000000000..0cda08fa3 --- /dev/null +++ b/federatedscope/vertical/dataloader/__init__.py @@ -0,0 +1,4 @@ +from federatedscope.vertical.dataloader.dataloader \ + import load_data + +__all__ = ['load_data'] diff --git a/federatedscope/vertical_fl/dataloader/dataloader.py b/federatedscope/vertical/dataloader/dataloader.py similarity index 84% rename from federatedscope/vertical_fl/dataloader/dataloader.py rename to federatedscope/vertical/dataloader/dataloader.py index e58560c12..9d79b73eb 100644 --- a/federatedscope/vertical_fl/dataloader/dataloader.py +++ b/federatedscope/vertical/dataloader/dataloader.py @@ -1,7 +1,7 @@ import numpy as np -def load_vertical_data(config=None, generate=False): +def load_data(config=None, generate=False): """ To generate the synthetic data for vertical FL @@ -17,7 +17,8 @@ def load_vertical_data(config=None, generate=False): INSTANCE_NUM = 1000 TRAIN_SPLIT = 0.9 - total_dims = np.sum(config.vertical.dims) + total_dims = np.sum(config.caesar_vertical.dims) + theta = np.random.uniform(low=-1.0, high=1.0, size=(total_dims, 1)) x = np.random.choice([-1.0, 1.0, -2.0, 2.0, -3.0, 3.0], size=(INSTANCE_NUM, total_dims)) @@ -39,15 +40,17 @@ def load_vertical_data(config=None, generate=False): # For Client #1 data[1] = dict() data[1]['train'] = { - 'x': x[:train_num, :config.vertical.dims[0]], - 'y': y[:train_num] + 'x': x[:train_num, :config.caesar_vertical.dims[0]] } data[1]['val'] = None data[1]['test'] = test_data # For Client #2 data[2] = dict() - data[2]['train'] = {'x': x[:train_num, config.vertical.dims[0]:]} + data[2]['train'] = { + 'x': x[:train_num, config.caesar_vertical.dims[0]:], + 'y': y[:train_num] + } data[2]['val'] = None data[2]['test'] = test_data diff --git a/federatedscope/vertical_fl/dataloader/utils.py b/federatedscope/vertical/dataloader/utils.py similarity index 96% rename from federatedscope/vertical_fl/dataloader/utils.py rename to federatedscope/vertical/dataloader/utils.py index 96788c665..377db79c6 100644 --- a/federatedscope/vertical_fl/dataloader/utils.py +++ b/federatedscope/vertical/dataloader/utils.py @@ -11,7 +11,7 @@ def batch_iter(data, batch_size, shuffled=True): batch_size (int): the batch size shuffled (bool): whether to shuffle the data at the start of each epoch :returns: sample index, batch of x, batch_of y - :rtype: int, ndarray, ndarry + :rtype: int, ndarray, ndarray """ assert 'x' in data and 'y' in data diff --git a/federatedscope/vertical_fl/README.md b/federatedscope/vertical/secure_LR/README.md similarity index 78% rename from federatedscope/vertical_fl/README.md rename to federatedscope/vertical/secure_LR/README.md index 5c0908674..7d06fc4c0 100644 --- a/federatedscope/vertical_fl/README.md +++ b/federatedscope/vertical/secure_LR/README.md @@ -2,10 +2,10 @@ We provide an example for vertical federated learning, you run run with: ```bash -python3 ../main.py --cfg vertical_fl.yaml +python3 ../../main.py --cfg secure_LR.yaml ``` -You can specify customized configurations in `vertical_fl.yaml`, such as `data.type` and `federate.total_round_num`. +You can specify customized configurations in `secure_LR.yaml`, such as `data.type` and `federate.total_round_num`. More details of the provided example can be found in [Tutorial](https://federatedscope.io/docs/cross-silo/). Note that FederatedScope only provide an `abstract_paillier`, user can refer to [pyphe](https://github.com/data61/python-paillier/blob/master/phe/paillier.py) for the detail implementation, or adopt other homomorphic encryption algorithms. diff --git a/federatedscope/vertical/secure_LR/__init__.py b/federatedscope/vertical/secure_LR/__init__.py new file mode 100644 index 000000000..01502d694 --- /dev/null +++ b/federatedscope/vertical/secure_LR/__init__.py @@ -0,0 +1 @@ +from federatedscope.vertical.Paillier.abstract_paillier import * diff --git a/federatedscope/vertical_fl/vertical_fl.yaml b/federatedscope/vertical/secure_LR/secure_LR.yaml similarity index 87% rename from federatedscope/vertical_fl/vertical_fl.yaml rename to federatedscope/vertical/secure_LR/secure_LR.yaml index 9c77b640d..a7e20679f 100644 --- a/federatedscope/vertical_fl/vertical_fl.yaml +++ b/federatedscope/vertical/secure_LR/secure_LR.yaml @@ -10,9 +10,9 @@ train: optimizer: lr: 0.05 data: - type: vertical_fl_data + type: secure_LR_data batch_size: 50 -vertical: +secure_vertical: use: True key_size: 256 trainer: diff --git a/federatedscope/vertical/secure_LR/worker/__init__.py b/federatedscope/vertical/secure_LR/worker/__init__.py new file mode 100644 index 000000000..e474877e0 --- /dev/null +++ b/federatedscope/vertical/secure_LR/worker/__init__.py @@ -0,0 +1,4 @@ +from federatedscope.vertical.secure_LR.worker.vertical_client import vFLClient +from federatedscope.vertical.secure_LR.worker.vertical_server import vFLServer + +__all__ = ['vFLServer', 'vFLClient'] diff --git a/federatedscope/vertical_fl/worker/vertical_client.py b/federatedscope/vertical/secure_LR/worker/vertical_client.py similarity index 84% rename from federatedscope/vertical_fl/worker/vertical_client.py rename to federatedscope/vertical/secure_LR/worker/vertical_client.py index b463ea82e..8cc1abcb5 100644 --- a/federatedscope/vertical_fl/worker/vertical_client.py +++ b/federatedscope/vertical/secure_LR/worker/vertical_client.py @@ -3,7 +3,7 @@ from federatedscope.core.workers import Client from federatedscope.core.message import Message -from federatedscope.vertical_fl.dataloader.utils import batch_iter +from federatedscope.vertical.dataloader.utils import batch_iter class vFLClient(Client): @@ -44,10 +44,10 @@ def __init__(self, self.callback_funcs_for_public_keys) self.register_handlers('model_para', self.callback_funcs_for_model_para) - self.register_handlers('encryped_gradient_u', - self.callback_funcs_for_encryped_gradient_u) - self.register_handlers('encryped_gradient_v', - self.callback_funcs_for_encryped_gradient_v) + self.register_handlers('encrypted_gradient_u', + self.callback_funcs_for_encrypted_gradient_u) + self.register_handlers('encrypted_gradient_v', + self.callback_funcs_for_encrypted_gradient_v) def sample_data(self, index=None): if index is None: @@ -69,7 +69,7 @@ def callback_funcs_for_model_para(self, message: Message): en_u_A = [self.public_key.encrypt(x) for x in u_A] self.comm_manager.send( - Message(msg_type='encryped_gradient_u', + Message(msg_type='encrypted_gradient_u', sender=self.ID, receiver=[ each for each in self.comm_manager.neighbors @@ -78,7 +78,7 @@ def callback_funcs_for_model_para(self, message: Message): state=self.state, content=(self.batch_index, en_u_A))) - def callback_funcs_for_encryped_gradient_u(self, message: Message): + def callback_funcs_for_encrypted_gradient_u(self, message: Message): index, en_u_A = message.content self.batch_index = index input_x = self.sample_data(index=self.batch_index) @@ -88,7 +88,7 @@ def callback_funcs_for_encryped_gradient_u(self, message: Message): en_v_B = en_u * input_x self.comm_manager.send( - Message(msg_type='encryped_gradient_v', + Message(msg_type='encrypted_gradient_v', sender=self.ID, receiver=[ each for each in self.comm_manager.neighbors @@ -97,14 +97,14 @@ def callback_funcs_for_encryped_gradient_u(self, message: Message): state=self.state, content=(en_u, en_v_B))) - def callback_funcs_for_encryped_gradient_v(self, message: Message): + def callback_funcs_for_encrypted_gradient_v(self, message: Message): en_u, en_v_B = message.content input_x = self.sample_data(index=self.batch_index) en_v_A = en_u * input_x - en_v = np.concatenate([en_v_A, en_v_B], axis=-1) + en_v = np.concatenate([en_v_B, en_v_A], axis=-1) self.comm_manager.send( - Message(msg_type='encryped_gradient', + Message(msg_type='encrypted_gradient', sender=self.ID, receiver=[self.server_id], state=self.state, diff --git a/federatedscope/vertical_fl/worker/vertical_server.py b/federatedscope/vertical/secure_LR/worker/vertical_server.py similarity index 92% rename from federatedscope/vertical_fl/worker/vertical_server.py rename to federatedscope/vertical/secure_LR/worker/vertical_server.py index d1e2da946..1a30dc1ea 100644 --- a/federatedscope/vertical_fl/worker/vertical_server.py +++ b/federatedscope/vertical/secure_LR/worker/vertical_server.py @@ -3,7 +3,7 @@ from federatedscope.core.workers import Server from federatedscope.core.message import Message -from federatedscope.vertical_fl.Paillier import abstract_paillier +from federatedscope.vertical.Paillier import abstract_paillier logger = logging.getLogger(__name__) @@ -32,15 +32,15 @@ def __init__(self, super(vFLServer, self).__init__(ID, state, config, data, model, client_num, total_round_num, device, strategy, **kwargs) - cfg_key_size = config.vertical.key_size + cfg_key_size = config.secure_vertical.key_size self.public_key, self.private_key = \ abstract_paillier.generate_paillier_keypair(n_length=cfg_key_size) - self.dims = [0] + config.vertical.dims + self.dims = [0] + config.secure_vertical.dims self.theta = self.model.state_dict()['fc.weight'].numpy().reshape(-1) self.lr = config.train.optimizer.lr - self.register_handlers('encryped_gradient', - self.callback_funcs_for_encryped_gradient) + self.register_handlers('encrypted_gradient', + self.callback_funcs_for_encrypted_gradient) def trigger_for_start(self): if self.check_client_join_in(): @@ -71,7 +71,7 @@ def broadcast_model_para(self): content=theta_slices)) cur_idx += self.dims[int(client_id)] - def callback_funcs_for_encryped_gradient(self, message: Message): + def callback_funcs_for_encrypted_gradient(self, message: Message): sample_num, en_v = message.content v = np.reshape( diff --git a/federatedscope/vertical/simple_secret_sharing/__init__.py b/federatedscope/vertical/simple_secret_sharing/__init__.py new file mode 100644 index 000000000..0df1d0e3c --- /dev/null +++ b/federatedscope/vertical/simple_secret_sharing/__init__.py @@ -0,0 +1,2 @@ +from federatedscope.vertical.simple_secret_sharing.secret_sharing import \ + AdditiveSecretSharing diff --git a/federatedscope/vertical/simple_secret_sharing/secret_sharing.py b/federatedscope/vertical/simple_secret_sharing/secret_sharing.py new file mode 100644 index 000000000..14331be4e --- /dev/null +++ b/federatedscope/vertical/simple_secret_sharing/secret_sharing.py @@ -0,0 +1,264 @@ +from abc import ABC, abstractmethod +import numpy as np +try: + import torch +except ImportError: + torch = None + + +class SecretSharing(ABC): + def __init__(self): + pass + + @abstractmethod + def secret_split(self, secret): + pass + + @abstractmethod + def secret_reconstruct(self, secret_seq): + pass + + +class AdditiveSecretSharing(SecretSharing): + """ + AdditiveSecretSharing class, which can split a number into frames and + recover it by summing up + """ + def __init__(self, shared_party_num, size=30): + super(SecretSharing, self).__init__() + assert shared_party_num > 1, "AdditiveSecretSharing require " \ + "shared_party_num > 1" + self.shared_party_num = shared_party_num + self.maximum = 2**size + self.mod_number = 2**(2 * size) + self.epsilon = 1e8 + self.mod_funs = np.vectorize(lambda x: x % self.mod_number) + self.float2fixedpoint = np.vectorize(self._float2fixedpoint) + self.fixedpoint2float = np.vectorize(self._fixedpoint2float) + self.upgrade = np.vectorize(self._upgrade) + self.downgrade = np.vectorize(self._downgrade) + + def secret_split(self, secret): + """ + To split the secret into frames according to the shared_party_num + """ + if isinstance(secret, dict): + secret_list = [dict() for _ in range(self.shared_party_num)] + for key in secret: + for idx, each in enumerate(self.secret_split(secret[key])): + secret_list[idx][key] = each + return secret_list + + if isinstance(secret, list) or isinstance(secret, np.ndarray): + secret = np.asarray(secret) + shape = [self.shared_party_num - 1] + list(secret.shape) + elif isinstance(secret, torch.Tensor): + secret = secret.numpy() + shape = [self.shared_party_num - 1] + list(secret.shape) + else: + shape = [self.shared_party_num - 1] + + secret = self.float2fixedpoint(secret) + secret_seq = np.random.randint(low=0, high=self.mod_number, size=shape) + last_seq = self.mod_funs(secret - + self.mod_funs(np.sum(secret_seq, axis=0))) + + secret_seq = np.append(secret_seq, + np.expand_dims(last_seq, axis=0), + axis=0) + return secret_seq + + def secret_split_for_piece_of_ss(self, secret): + """ + To split the secret into frames according to the shared_party_num + """ + if isinstance(secret, dict): + secret_list = [dict() for _ in range(self.shared_party_num)] + for key in secret: + for idx, each in enumerate(self.secret_split(secret[key])): + secret_list[idx][key] = each + return secret_list + + if isinstance(secret, list) or isinstance(secret, np.ndarray): + secret = np.asarray(secret) + shape = [self.shared_party_num - 1] + list(secret.shape) + elif isinstance(secret, torch.Tensor): + secret = secret.numpy() + shape = [self.shared_party_num - 1] + list(secret.shape) + else: + shape = [self.shared_party_num - 1] + secret_seq = np.random.randint(low=0, high=self.mod_number, size=shape) + last_seq = self.mod_funs(secret - + self.mod_funs(np.sum(secret_seq, axis=0))) + + secret_seq = np.append(secret_seq, + np.expand_dims(last_seq, axis=0), + axis=0) + return secret_seq + + def secret_reconstruct(self, secret_seq): + """ + To recover the secret + """ + assert len(secret_seq) == self.shared_party_num + merge_model = secret_seq[0].copy() + if isinstance(merge_model, dict): + for key in merge_model: + for idx in range(len(secret_seq)): + if idx == 0: + merge_model[key] = secret_seq[idx][key].copy() + else: + merge_model[key] += secret_seq[idx][key] + merge_model[key] = self.fixedpoint2float(merge_model[key]) + else: + secret_seq = [np.asarray(x) for x in secret_seq] + for idx in range(len(secret_seq)): + if idx == 0: + merge_model = secret_seq[idx].copy() + else: + merge_model += secret_seq[idx] + merge_model = self.fixedpoint2float(merge_model) + return merge_model + + def secret_reconstruct_for_ss_pieces(self, secret_seq): + """ + To recover a piece of ss pieces + """ + assert len(secret_seq) == self.shared_party_num + merge_model = secret_seq[0].copy() + if isinstance(merge_model, dict): + for key in merge_model: + for idx in range(len(secret_seq)): + if idx == 0: + merge_model[key] = secret_seq[idx][key].copy() + else: + merge_model[key] += secret_seq[idx][key] + merge_model[key] = self.fixedpoint2float(merge_model[key]) + else: + secret_seq = [np.asarray(x) for x in secret_seq] + for idx in range(len(secret_seq)): + if idx == 0: + merge_model = secret_seq[idx].copy() + else: + merge_model += secret_seq[idx] + merge_model = self.mod_funs(merge_model) + return merge_model + + def const_add_fixedpoint(self, c, x): + up_c = self._upgrade(c) + res = (up_c + x) % self.mod_number + return res + + def const_mul_fixedpoint(self, c, x): + up_c = self.upgrade(c) + res = self.downgrade(up_c * x) + return res + + def mod_add(self, *args): + # sum the values in an array + if len(args) == 1 and isinstance(args[0], np.ndarray): + res = 0 + for x in args[0]: + res += x.item() + res = res % self.mod_number + return res + # sum the values in a list + if len(args) == 1 and isinstance(args[0], list): + res = 0 + for x in args[0]: + res += x + res = res % self.mod_number + return res + # sum an integer with a list or an array + if len(args) == 2: + if (isinstance(args[0], int) or isinstance( + args[0], np.int64)) and not isinstance(args[1], int): + res = [(int(args[0]) + x) % self.mod_number for x in args[1]] + return res + elif (isinstance(args[1], int) or isinstance( + args[1], np.int64)) and not isinstance(args[0], int): + res = [(args[1] + x) % self.mod_number for x in args[0]] + return res + # sum several lists + if isinstance(args[0], np.ndarray) or isinstance(args[0], list): + n = len(args[0]) + num = len(args) + res = [0] * n + for i in range(n): + for j in range(num): + res[i] += args[j][i].item() + res[i] = res[i] % self.mod_number + return np.asarray(res) + # sum all the integers in args + if isinstance(args[0], int) or isinstance(args[0], np.int64): + res = 0 + for x in args: + res += x + res = res % self.mod_number + return res + # sum dict_values + else: + l_tmp = list(args[0]) + num = len(l_tmp) + # a list of integers + if isinstance(l_tmp[0], int): + res = 0 + for x in l_tmp: + res += x + res = res % self.mod_number + return res + # a list of lists + else: + n = len(l_tmp[0]) + res = [0] * n + for i in range(n): + for j in range(num): + res[i] += l_tmp[j][i] + res[i] = res[i] % self.mod_number + return np.asarray(res) + + def mod_matmul(self, data, para): + res = [0 for _ in range(data.shape[0])] + for i in range(data.shape[0]): + tmp = 0 + for j in range(data.shape[1]): + a = data[i, j].item() + b = para[j].item() + tmp += a * b + tmp = round(tmp / self.epsilon) + res[i] = tmp % self.mod_number + return res + + def err_mul_data(self, e, data): + res = [0 for _ in range(data.shape[1])] + for j in range(data.shape[1]): + tmp = 0 + for i in range(data.shape[0]): + a = e[i].item() + b = data[i, j].item() + tmp += a * b # % mod_number + res[j] = round(tmp / self.epsilon) + res[j] = res[j] % self.mod_number + return res + + def _float2fixedpoint(self, x): + x = round(x * self.epsilon) + assert abs(x) < self.maximum + return x % self.mod_number + + def _fixedpoint2float(self, x): + x = x % self.mod_number + if x > self.mod_number / 2: # self.maximum: + return -1 * (self.mod_number - x) / self.epsilon + else: + return x / self.epsilon + + def _upgrade(self, x): + x = round(x * self.epsilon) + assert abs(x) < self.maximum + return x + + def _downgrade(self, x): + x = round(x / self.epsilon) + x = x % self.mod_number + return x diff --git a/federatedscope/vertical_fl/__init__.py b/federatedscope/vertical_fl/__init__.py deleted file mode 100644 index f9458d03a..000000000 --- a/federatedscope/vertical_fl/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from federatedscope.vertical_fl.Paillier.abstract_paillier import * diff --git a/federatedscope/vertical_fl/dataloader/__init__.py b/federatedscope/vertical_fl/dataloader/__init__.py deleted file mode 100644 index 420b3bbd7..000000000 --- a/federatedscope/vertical_fl/dataloader/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from federatedscope.vertical_fl.dataloader.dataloader import load_vertical_data - -__all__ = ['load_vertical_data'] diff --git a/federatedscope/vertical_fl/worker/__init__.py b/federatedscope/vertical_fl/worker/__init__.py deleted file mode 100644 index 4fcb46fe2..000000000 --- a/federatedscope/vertical_fl/worker/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from federatedscope.vertical_fl.worker.vertical_client import vFLClient -from federatedscope.vertical_fl.worker.vertical_server import vFLServer - -__all__ = ['vFLServer', 'vFLClient'] diff --git a/tests/test_caesar_v_fl.py b/tests/test_caesar_v_fl.py new file mode 100644 index 000000000..35469edb2 --- /dev/null +++ b/tests/test_caesar_v_fl.py @@ -0,0 +1,65 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. +import unittest + +from federatedscope.core.auxiliaries.data_builder import get_data +from federatedscope.core.auxiliaries.worker_builder import get_client_cls, get_server_cls +from federatedscope.core.auxiliaries.utils import setup_seed +from federatedscope.core.auxiliaries.logging import update_logger +from federatedscope.core.configs.config import global_cfg +from federatedscope.core.fed_runner import FedRunner + + +class caesarTest(unittest.TestCase): + def setUp(self): + print(('Testing %s.%s' % (type(self).__name__, self._testMethodName))) + + def set_config(self, cfg): + backup_cfg = cfg.clone() + + import torch + cfg.use_gpu = torch.cuda.is_available() + + cfg.federate.mode = 'standalone' + cfg.federate.total_round_num = 50 + cfg.federate.client_num = 2 + + cfg.model.type = 'lr' + cfg.model.use_bias = False + + cfg.train.optimizer.lr = 0.05 + + cfg.data.type = 'caesar_v_fl_data' + cfg.data.size = 50 + + cfg.caesar_vertical.use = True + cfg.caesar_vertical.key_size = 256 + + cfg.trainer.type = 'none' + cfg.eval.freq = 5 + cfg.eval.best_res_update_round_wise_key = "test_loss" + + return backup_cfg + + def test_caesar(self): + init_cfg = global_cfg.clone() + backup_cfg = self.set_config(init_cfg) + setup_seed(init_cfg.seed) + update_logger(init_cfg, True) + + data, modified_config = get_data(init_cfg.clone()) + init_cfg.merge_from_other_cfg(modified_config) + self.assertIsNotNone(data) + + Fed_runner = FedRunner(data=data, + server_class=get_server_cls(init_cfg), + client_class=get_client_cls(init_cfg), + config=init_cfg.clone()) + self.assertIsNotNone(Fed_runner) + test_results = Fed_runner.run() + init_cfg.merge_from_other_cfg(backup_cfg) + print(test_results) + self.assertGreater(test_results['server_global_eval']['test_acc'], 0.8) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_vertical_fl.py b/tests/test_secure_LR.py similarity index 94% rename from tests/test_vertical_fl.py rename to tests/test_secure_LR.py index dad09038c..ecd65ed6f 100644 --- a/tests/test_vertical_fl.py +++ b/tests/test_secure_LR.py @@ -28,11 +28,11 @@ def set_config(self, cfg): cfg.train.optimizer.lr = 0.05 - cfg.data.type = 'vertical_fl_data' + cfg.data.type = 'secure_LR_data' cfg.data.size = 50 - cfg.vertical.use = True - cfg.vertical.key_size = 256 + cfg.secure_vertical.use = True + cfg.secure_vertical.key_size = 256 cfg.trainer.type = 'none' cfg.eval.freq = 5