Distributed Communication#

Initialize distributed network#

FedLab uses torch.distributed as point-to-point communication tools. The communication backend is Gloo as default. FedLab processes send/receive data through TCP network connection. Here is the details of how to initialize the distributed network.

You need to assign right ethernet to DistNetwork, making sure torch.distributed network initialization works. DistNetwork is for quickly network configuration, which you can create one as follows:

from fedlab.core.network import DistNetwork
world_size = 10
rank = 0  # 0 for server, other rank for clients
ethernet = None
server_ip = '127.0.0.1'
server_port = 1234
network = DistNetwork(address=(server_ip, server_port), world_size, rank, ethernet)

network.init_network_connection() # call this method to start connection.
network.close_network_connection() # call this method to shutdown connection.
  • The (server_ip, server_port) is the address of server. please be aware of that the rank of server is 0 as default.

  • Make sure world_size is the same across process.

  • Rank should be different (from 0 to world_size-1).

  • world_size = 1 (server) + client number.

  • The ethernet is None as default. torch.distributed will try finding the right ethernet automatically.

  • The ethernet_name must be checked (using ifconfig). Otherwise, network initialization would fail.

If the automatically detected interface does not work, users are required to assign a right network interface for Gloo, by assigning in code or setting the environment variables GLOO_SOCKET_IFNAME, for example export GLOO_SOCKET_IFNAME=eth0 or os.environ['GLOO_SOCKET_IFNAME'] = "eth0".

Note

Check the available ethernet:

$ ifconfig

Point-to-point communication#

In recent update, we hide the communication details from user and provide simple APIs. DistNetwork now provies two basic communication APIs: send() and recv(). These APIs suppor flexible pytorch tensor communication.

Sender process:

network = DistNetwork(address=(server_ip, server_port), world_size, rank, ethernet)
network.init_network_connection()
network.send(content, message_code, dst)
network.close_network_connection()

Receiver process:

network = DistNetwork(address=(server_ip, server_port), world_size, rank, ethernet)
network.init_network_connection()
sender_rank, message_code, content = network.recv(src)
#################################
#                               #
#  local process with content.  #
#                               #
#################################
network.close_network_connection()

Note

Currently, following restrictions need to be noticed:
  1. Tensor list: send() accepts a python list with tensors.

  2. Data type: send() doesn’t accept tensors of different data type. In other words, FedLab force all appended tensors to be the same data type as the first appended tensor. Torch data types like [torch.int8, torch.int16, torch.int32, torch.int64, torch.float16, torch.float32, torch.float64] are supported.

Further understanding of FedLab communication#

FedLab pack content into a pre-defined package data structure. send() and recv() are implemented like:

def send(self, content=None, message_code=None, dst=0):
    """Send tensor to process rank=dst"""
    pack = Package(message_code=message_code, content=content)
    PackageProcessor.send_package(pack, dst=dst)

def recv(self, src=None):
    """Receive tensor from process rank=src"""
    sender_rank, message_code, content = PackageProcessor.recv_package(
        src=src)
    return sender_rank, message_code, content

Create package#

The basic communication unit in FedLab is called package. The communication module of FedLab is in fedlab/core/communicator. Package defines the basic data structure of network package. It contains header and content.

p = Package()
p.header   # A tensor with size = (5,).
p.content  # A tensor with size = (x,).

Currently, you can create a network package from following methods:

  1. initialize with tensor

tensor = torch.Tensor(size=(10,))
package = Package(content=tensor)
  1. initialize with tensor list

tensor_sizes = [10, 5, 8]
tensor_list = [torch.rand(size) for size in tensor_sizes]
package = Package(content=tensor_list)
  1. append a tensor to exist package

tensor = torch.Tensor(size=(10,))
package = Package(content=tensor)

new_tensor = torch.Tensor(size=(8,))
package.append_tensor(new_tensor)
  1. append a tensor list to exist package

tensor_sizes = [10, 5, 8]
tensor_list = [torch.rand(size) for size in tensor_sizes]

package = Package()
package.append_tensor_list(tensor_list)

Two static methods are provided by Package to parse header and content:

p = Package()
Package.parse_header(p.header)  # necessary information to describe the package
Package.parse_content(p.slices, p.content) # tensor list associated with the tensor sequence appended into.

Send package#

The point-to-point communicating agreements is implemented in PackageProcessor module. PackageProcessor is a static class to manage package sending/receiving procedure.

User can send a package to a process with rank=0 (the parameter dst must be assigned):

p = Package()
PackageProcessor.send_package(package=p, dst=0)

or, receive a package from rank=0 (set the parameter src=None to receive package from any other process):

sender_rank, message_code, content = PackageProcessor.recv_package(src=0)