server#

Package Contents#

SyncParameterServerHandler

Synchronous Parameter Server Handler.

AsyncParameterServerHandler

Asynchronous Parameter Server Handler

ServerSynchronousManager

Synchronous communication

ServerAsynchronousManager

Asynchronous communication network manager for server

class SyncParameterServerHandler(model, global_round=5, cuda=False, sample_ratio=1.0, logger=Logger())#

Bases: ParameterServerBackendHandler

Synchronous Parameter Server Handler.

Backend of synchronous parameter server: this class is responsible for backend computing in synchronous server.

Synchronous parameter server will wait for every client to finish local training process before the next FL round.

Details in paper: http://proceedings.mlr.press/v54/mcmahan17a.html

Parameters
  • model (torch.nn.Module) – Model used in this federation.

  • global_round (int) – stop condition. Shut down FL system when global round is reached.

  • cuda (bool) – Use GPUs or not. Default: False

  • sample_ratio (float) – sample_ratio * client_num is the number of clients to join in every FL round. Default: 1.0.

  • logger (Logger, optional) – object of Logger.

stop_condition(self) bool#

NetworkManager keeps monitoring the return of this method, and it will stop all related processes and threads when True returned.

sample_clients(self)#

Return a list of client rank indices selected randomly. The client ID is from 1 to self.client_num_in_total + 1.

add_model(self, sender_rank, model_parameters)#

Deal with incoming model parameters from one client.

Note

Return True when self._update_model is called.

Parameters
  • sender_rank (int) – Rank of sender client in torch.distributed group.

  • model_parameters (torch.Tensor) – Serialized model parameters from one client.

_update_model(self, model_parameters_list)#

Update global model with collected parameters from clients.

Note

Server handler will call this method when its client_buffer_cache is full. User can overwrite the strategy of aggregation to apply on model_parameters_list, and use SerializationTool.deserialize_model() to load serialized parameters after aggregation into self._model.

Parameters

model_parameters_list (list[torch.Tensor]) – A list of parameters.aq

property client_num_per_round(self)#
class AsyncParameterServerHandler(model, alpha=0.5, total_time=5, strategy='constant', cuda=False, logger=Logger())#

Bases: ParameterServerBackendHandler

Asynchronous Parameter Server Handler

Update global model immediately after receiving a ParameterUpdate message Paper: https://arxiv.org/abs/1903.03934

Parameters
  • model (torch.nn.Module) – Global model in server

  • alpha (float) – weight used in async aggregation.

  • total_time (int) – stop condition. Shut down FL system when total_time is reached.

  • strategy (str) – adaptive strategy. constant, hinge and polynomial is optional. Default: constant.

  • cuda (bool) – Use GPUs or not.

  • logger (Logger, optional) – object of Logger.

property server_time(self)#
stop_condition(self) bool#

NetworkManager keeps monitoring the return of this method, and it will stop all related processes and threads when True returned.

_update_model(self, client_model_parameters, model_time)#

“update global model from client_model_queue

_adapt_alpha(self, receive_model_time)#

update the alpha according to staleness

class ServerSynchronousManager(network, handler, logger=Logger())#

Bases: ServerManager

Synchronous communication

This is the top class in our framework which is mainly responsible for network communication of SERVER!. Synchronously communicate with clients following agreements defined in main_loop().

Parameters
setup(self)#

Initialization Stage.

  • Server accept local client num report from client manager.

  • Init a coordinator for client_id mapping.

main_loop(self)#

Actions to perform in server when receiving a package from one client.

Server transmits received package to backend computation handler for aggregation or others manipulations.

Loop:
  1. activate clients for current training round.

  2. listen for message from clients -> transmit received parameters to server backend.

Note

Communication agreements related: user can overwrite this function to customize communication agreements. This method is key component connecting behaviors of ParameterServerBackendHandler and NetworkManager.

Raises

Exception – Unexpected MessageCode.

shutdown(self)#

Shutdown stage.

activate_clients(self)#

Activate subset of clients to join in one FL round

Manager will start a new thread to send activation package to chosen clients’ process rank. The ranks of clients are obtained from handler.sample_clients().

shutdown_clients(self)#

Shutdown all clients.

Send package to each client with MessageCode.Exit to ask client to exit.

Note

Communication agreements related: User can overwrite this function to define package for exiting information.

class ServerAsynchronousManager(network, handler, logger=Logger())#

Bases: ServerManager

Asynchronous communication network manager for server

This is the top class in our framework which is mainly responsible for network communication of SERVER!. Asynchronously communicate with clients following agreements defined in run().

Parameters
setup(self)#

Initialization Stage.

  • Server accept local client num report from client manager.

  • Init a coordinator for client_id mapping.

main_loop(self)#

Communication agreements of asynchronous FL.

  • Server receive ParameterRequest from client. Send model parameter to client.

  • Server receive ParameterUpdate from client. Transmit parameters to queue waiting for aggregation.

Raises

ValueError – invalid message code.

shutdown(self)#

Shutdown stage.

Close the network connection in the end.

watching_queue(self)#

Asynchronous communication maintain a message queue. A new thread will be started to run this function.

shutdown_clients(self)#

Shutdown all clients.

Send package to clients with MessageCode.Exit.