Skip to content

Distributed Training

Siran Yang edited this page Jun 4, 2019 · 1 revision

Distributed training

In this section, we introduce how to use Euler in a distributed environment.

As Euler will store all of the graph data in memory at run time, when a single machine's memory cannot fit in all the data, or when distributed training is required, the graph engine needs to be used in a distributed manner.

In tf_euler, the TensorFlow worker can use the following interface to initialize a distributed graph.

tf_euler.initialize_shared_graph(
    directory, # graph data directory
    zk_addr, # Zookeeper address, ip:port
    zk_path, # Zookeeper path
    shard_idx, # shard id
    shard_num, # shard number
    global_sampler_type, # global_sampler_type: all / node / edge / none, the default is node.
    graph_type, # graph type, compact / fast, the default is compact.
    server_thread_num # euler service thread number, the default is 4.
) # return bool to indicate whether the graph is initialized successfully.

Running the above code will enable a graph engine instance in each TensorFlow worker. Each instance contains a part of the graph. When the required graph data is not in the current process, Euler will use RPC to obtain graph data from other instances. Specially, the use of distributed graph engine has the following related configurations:

  1. The data needs to be [partitioned] (Preparing_data#Data partition), and must be placed on HDFS.
  2. Apache ZooKeeper is used to synchronize the meta information between the graph engine instances. zk_addr represents the ZooKeeper service address, and zk_path represents the data node used to store meta infomation on ZooKeeper.
  3. The Euler cluster divides the graph into multiple shards, numbered from 0 to num_shards - 1. There can be multiple graph engine instances (replica) in each shard.

For example, when training with 4 TensorFlow workers and splitting the data into two shards, the graph engine can be initialized like the following:

# Worker 0
tf_euler.initialize_shared_graph(
    'hdfs://namenode.example.com:9000/euler_data',
    'zk.example.com:2181', '/path/for/euler',
    0, 2)

# Worker 1
tf_euler.initialize_shared_graph(
    'hdfs://namenode.example.com:9000/euler_data',
    'zk.example.com:2181', '/path/for/euler',
    1, 2)

# Worker 2
tf_euler.initialize_shared_graph(
    'hdfs://namenode.example.com:9000/euler_data',
    'zk.example.com:2181', '/path/for/euler',
    0, 2)

# Worker 3
tf_euler.initialize_shared_graph(
    'hdfs://namenode.example.com:9000/euler_data',
    'zk.example.com:2181', '/path/for/euler',
    1, 2)

Finally, since each TensorFlow worker can contain the graph data that other workers may visit, workers need to exit synchronously. The following Hooks can be used to achieve the goal:

tf_euler.utils.hooks.SyncExitHook(num_workers)

Heterogeneous deployment

The above deployment scheme deploys the graph engine in the same set of processes as the TensorFlow worker. Thus, the graph engine shares the computing resources with TensorFlow. To make better use of computing resources, users can also choose to deploy the graph engine in a series of independent processes:

import euler

euler.start_and_wait(directory, # graph data directory
                     loader_type, # loader type
                     hdfs_addr, # HDFS address
                     hdfs_port, # HDFS port
                     shard_idx, # shard idx
                     shard_num, # shard number
                     zk_addr, # Zookeeper address, ip:port
                     zk_path, # Zookeeper path
                     global_sampler_type, # global_sampler_type: all / node / edge / none, the default is node.
                     graph_type, # graph type, compact / fast, the default is compact.
                     server_thread_num # euler service thread number, the default is 4.
)

Specially, In particular, when deploying the graph engine independently, you can initialize the graph API in TensorFlow worker using the following methods:

tf_euler.initialize_graph({'mode': 'Remote',
                           'zk_server': 'zk.example.com:2181',
                           'zk_path': '/path/for/euler'})
Clone this wiki locally