Distributed Communication#
Initialize distributed network#
FedLab uses torch.distributed as point-to-point communication tools. The communication backend is Gloo as default. FedLab processes send/receive data through TCP network connection. Here is the details of how to initialize the distributed network.
You need to assign right ethernet to DistNetwork
, making sure torch.distributed
network initialization works. DistNetwork
is for quickly network configuration, which you can create one as follows:
from fedlab.core.network import DistNetwork
world_size = 10
rank = 0 # 0 for server, other rank for clients
ethernet = None
server_ip = '127.0.0.1'
server_port = 1234
network = DistNetwork(address=(server_ip, server_port), world_size, rank, ethernet)
network.init_network_connection() # call this method to start connection.
network.close_network_connection() # call this method to shutdown connection.
The
(server_ip, server_port)
is the address of server. please be aware of that the rank of server is 0 as default.Make sure
world_size
is the same across process.Rank should be different (from
0
toworld_size-1
).world_size = 1 (server) + client number.
The ethernet is None as default. torch.distributed will try finding the right ethernet automatically.
The
ethernet_name
must be checked (usingifconfig
). Otherwise, network initialization would fail.
If the automatically detected interface does not work, users are required to assign a right network interface for Gloo, by assigning in code or setting the environment variables GLOO_SOCKET_IFNAME
, for example export GLOO_SOCKET_IFNAME=eth0
or os.environ['GLOO_SOCKET_IFNAME'] = "eth0"
.
Note
Check the available ethernet:
$ ifconfig
Point-to-point communication#
In recent update, we hide the communication details from user and provide simple APIs. DistNetwork
now provies two basic communication APIs: send()
and recv()
. These APIs suppor flexible pytorch tensor communication.
Sender process:
network = DistNetwork(address=(server_ip, server_port), world_size, rank, ethernet)
network.init_network_connection()
network.send(content, message_code, dst)
network.close_network_connection()
Receiver process:
network = DistNetwork(address=(server_ip, server_port), world_size, rank, ethernet)
network.init_network_connection()
sender_rank, message_code, content = network.recv(src)
#################################
# #
# local process with content. #
# #
#################################
network.close_network_connection()
Note
- Currently, following restrictions need to be noticed:
Tensor list:
send()
accepts a python list with tensors.Data type:
send()
doesn’t accept tensors of different data type. In other words, FedLab force all appended tensors to be the same data type as the first appended tensor. Torch data types like [torch.int8, torch.int16, torch.int32, torch.int64, torch.float16, torch.float32, torch.float64] are supported.
Further understanding of FedLab communication#
FedLab pack content into a pre-defined package data structure. send()
and recv()
are implemented like:
def send(self, content=None, message_code=None, dst=0):
"""Send tensor to process rank=dst"""
pack = Package(message_code=message_code, content=content)
PackageProcessor.send_package(pack, dst=dst)
def recv(self, src=None):
"""Receive tensor from process rank=src"""
sender_rank, message_code, content = PackageProcessor.recv_package(
src=src)
return sender_rank, message_code, content
Create package#
The basic communication unit in FedLab is called package. The communication module of FedLab is in fedlab/core/communicator. Package
defines the basic data structure of network package. It contains header and content.
p = Package()
p.header # A tensor with size = (5,).
p.content # A tensor with size = (x,).
Currently, you can create a network package from following methods:
initialize with tensor
tensor = torch.Tensor(size=(10,))
package = Package(content=tensor)
initialize with tensor list
tensor_sizes = [10, 5, 8]
tensor_list = [torch.rand(size) for size in tensor_sizes]
package = Package(content=tensor_list)
append a tensor to exist package
tensor = torch.Tensor(size=(10,))
package = Package(content=tensor)
new_tensor = torch.Tensor(size=(8,))
package.append_tensor(new_tensor)
append a tensor list to exist package
tensor_sizes = [10, 5, 8]
tensor_list = [torch.rand(size) for size in tensor_sizes]
package = Package()
package.append_tensor_list(tensor_list)
Two static methods are provided by Package to parse header and content:
p = Package()
Package.parse_header(p.header) # necessary information to describe the package
Package.parse_content(p.slices, p.content) # tensor list associated with the tensor sequence appended into.
Send package#
The point-to-point communicating agreements is implemented in PackageProcessor module. PackageProcessor is a static class to manage package sending/receiving procedure.
User can send a package to a process with rank=0 (the parameter dst must be assigned):
p = Package()
PackageProcessor.send_package(package=p, dst=0)
or, receive a package from rank=0 (set the parameter src=None to receive package from any other process):
sender_rank, message_code, content = PackageProcessor.recv_package(src=0)