-
Notifications
You must be signed in to change notification settings - Fork 558
Distributed Training
In this section, we introduce how to use Euler in a distributed environment.
As Euler will store all of the graph data in memory at run time, when a single machine's memory cannot fit in all the data, or when distributed training is required, the graph engine needs to be used in a distributed manner.
In tf_euler
, the TensorFlow worker can use the following interface to initialize a distributed graph.
tf_euler.initialize_shared_graph(
directory, # graph data directory
zk_addr, # Zookeeper address, ip:port
zk_path, # Zookeeper path
shard_idx, # shard id
shard_num, # shard number
global_sampler_type, # global_sampler_type: all / node / edge / none, the default is node.
graph_type, # graph type, compact / fast, the default is compact.
server_thread_num # euler service thread number, the default is 4.
) # return bool to indicate whether the graph is initialized successfully.
Running the above code will enable a graph engine instance in each TensorFlow worker. Each instance contains a part of the graph. When the required graph data is not in the current process, Euler will use RPC to obtain graph data from other instances. Specially, the use of distributed graph engine has the following related configurations:
- The data needs to be [partitioned] (Preparing_data#Data partition), and must be placed on HDFS.
- Apache ZooKeeper is used to synchronize the meta information between the graph engine instances. zk_addr represents the ZooKeeper service address, and zk_path represents the data node used to store meta infomation on ZooKeeper.
- The Euler cluster divides the graph into multiple shards, numbered from 0 to num_shards - 1. There can be multiple graph engine instances (replica) in each shard.
For example, when training with 4 TensorFlow workers and splitting the data into two shards, the graph engine can be initialized like the following:
# Worker 0
tf_euler.initialize_shared_graph(
'hdfs://namenode.example.com:9000/euler_data',
'zk.example.com:2181', '/path/for/euler',
0, 2)
# Worker 1
tf_euler.initialize_shared_graph(
'hdfs://namenode.example.com:9000/euler_data',
'zk.example.com:2181', '/path/for/euler',
1, 2)
# Worker 2
tf_euler.initialize_shared_graph(
'hdfs://namenode.example.com:9000/euler_data',
'zk.example.com:2181', '/path/for/euler',
0, 2)
# Worker 3
tf_euler.initialize_shared_graph(
'hdfs://namenode.example.com:9000/euler_data',
'zk.example.com:2181', '/path/for/euler',
1, 2)
Finally, since each TensorFlow worker can contain the graph data that other workers may visit, workers need to exit synchronously. The following Hooks can be used to achieve the goal:
tf_euler.utils.hooks.SyncExitHook(num_workers)
The above deployment scheme deploys the graph engine in the same set of processes as the TensorFlow worker. Thus, the graph engine shares the computing resources with TensorFlow. To make better use of computing resources, users can also choose to deploy the graph engine in a series of independent processes:
import euler
euler.start_and_wait(directory, # graph data directory
loader_type, # loader type
hdfs_addr, # HDFS address
hdfs_port, # HDFS port
shard_idx, # shard idx
shard_num, # shard number
zk_addr, # Zookeeper address, ip:port
zk_path, # Zookeeper path
global_sampler_type, # global_sampler_type: all / node / edge / none, the default is node.
graph_type, # graph type, compact / fast, the default is compact.
server_thread_num # euler service thread number, the default is 4.
)
Specially, In particular, when deploying the graph engine independently, you can initialize the graph API in TensorFlow worker using the following methods:
tf_euler.initialize_graph({'mode': 'Remote',
'zk_server': 'zk.example.com:2181',
'zk_path': '/path/for/euler'})