Skip to content
coderplay edited this page Jul 11, 2012 · 15 revisions

#Dragon 设计

背景

目标

术语

Job

Task

Task Attempt

事件(Event)

事件处理

Dragon采用类似Facebook Puma的思想. 源端有一个类似Facebook Scribe的收集器, 将日志文件, MySQL的binlog等等收集至HDFS文件. 只要源端一有数据就sync到HDFS, 让Dragon的HDFS客户端能够实时地看见文件的最新数据. 与Puma不同的是, 这些数据会经过一组task分析后经过类似MapReduce的Shuffle交给下游task继续处理, 这样Dragon就有能力处理更为复杂的数据分析. 这些tasks之间是一个DAG的拓扑结构.

用户在提交Dragon作业时, 需要给源端DragonVertex定义输入路径, 然后再由DragonJobService提交作业. 这个输入路径代表一个HDFS目录. 输入目录下有一组子目录, 每个子目录包含同一项数据(例如, 日志数据)的所有轮转. 作业分为很多Task, Task将在集群内不同的机器上运行. 每个Task将监视一个子目录的变化, 不停tail子目录文件的数据. 子目录里的每一个文件都以类似000000000000.drg为名, drg是文件的后缀, 前面那串数字代表此文件首个Event对于全局Event的偏移量.

Task读取子目录的数据时, 有一个WatchService后台线程不断地监视子目录是否有新的文件产生. 一旦有, 便将此文件加入到队列之中, 以便实时地tail新文件的数据.

每道流入Dragon节点的消息都称为一个Event.

Event API

在Dragon系统中传递的每道消息称为一个Event. 以下是Event的接口定义:

/**
 * Event is the basic processing unit of dragon. 
 */
public interface Event<KEY, VALUE> extends Serializable {

  /**
   * Every {@link Event} should provide a <code>offset</code> 
   * for helping checkpointing.
   * @return the offset of this {@link Event}
   */
  public long offset();

  /**
   * The key of this {@link Event} 
   * @return the key of this {@link Event} 
   */
  public KEY key();
  
  /**
   * The value of this {@link Event} 
   * @return the value of this {@link Event} 
   */
  public VALUE value();

}

Dragon不可能像MapReduce那么随意地让用户定义消息数据类型.

网络协议

Child进程向下游NodeManager发送数据前,需要先注册. 注册命令的格式如下

  • version, 1 byte, 代表协议的版本号
  • command,1 byte, 代表命令类型, 这儿就是REGISTER
  • serializer type, 1 byte, 代表序列化类型
  • attributes, 1 byte, 属性字段, 保留
  • upstream job id, 4 bytes, 代表此Child所运行任务的作业Id
  • upstream task topology index, 2 bytes, 代表此Child所运行任务在作业DAG中的拓扑顺序
  • upstream task id 2 bytes, 代表此Child所运行任务的Id
  • downstream task topology index, 2 bytes, 代表下游任务在作业DAG中的拓扑顺序 以下是注册命令网络协议示意图:
   +------------+------------+------------+------------+ 0  bytes
   |   version  |  register  | serializer | attributes |
   +------------+------------+------------+------------+ 4  bytes
   |                  upstream job id                  |
   +-------------------------+-------------------------+ 8  bytes
   |   upstream task index   |   upstream task id      |
   +-------------------------+-------------------------+ 12  bytes
   |  downstream task index  |
   +-------------------------+

注册完毕后, Child进程就可以向下游节点发送数据了, 发送命令的格式如下:

  • version, 1 byte, 代表协议的版本号
  • command, 1 byte, 代表命令类型, 这儿就是PUT
  • attributes, 1 byte, 属性字段, 保留
  • partition id, 2 bytes, 分区Id, 即下游任务Id
  • checksum,4 bytes, 校验和, 防止I/O传输错误
  • payload length, n bytes, 变长编码, 表示元数据(metadata)和数据内容的长度
  • metadata, 1 bytes, 元数据
  • payload, 数据内容
   +------------+------------+------------+------------+ 0  bytes
   |   version  |     put    | attributes | partition- |
   +------------+------------+------------+------------+ 4  bytes
   |    -id     |               checksum               |
   +------------+--------------------------------------+ 8  bytes
   |            |            payload length            |
   +------------+--------------------------------------+ 
   |  metadata  |              payload                 |
   +------------+--------------------------------------+
   |                        ...                        |
   +---------------------------------------------------+ ...

数据内容可以由一条或多条Event数据组成. Event的网络传递的格式如下:

   +---------------------------------------------------+ 0  bytes
   |                  source event offset              |
   +---------------------------------------------------+ 4  byte
   |                                                   |
   +---------------------------------------------------+ 8  bytes
   |                    event length                   |
   +---------------------------------------------------+
   |                     event data                    |
   |                        ...                        |
   +---------------------------------------------------+ ...

长连接

Event被处理后, 根据其分区发送到对应的下游节点. 下游节点的NodeManager会接受上游发送过来的结果数据, 然后将它们写入到HDFS文件供下游的Task使用. 发送数据时, 可以采用同步和异步两种策略.

作业管理子系统

Hadoop Yarn简介

Hadoop Yarn是由Hadoop社区开发的集群资源管理框架.

主要组件

ResourceManager

ResourceManager负责分配资源单位给DragonJob. 例如某一个DragonJob需要10GB内存, 5颗CPU. 由DragonAppMaster向ResourceManager申请. DragonAppMaster读取客户端DragonClient提交的作业描述文件得知运行作业所需要的资源情况.

NodeManager

ResourceManager负责分配Container, 每个Container代表一组资源, 每个Container代表运行作业的其中一个Task. 同时NodeManager需要配置一个AuxService. DragonJob上游Task发给下游节点的数据由这个service接收, 定期sync到Hadoop HDFS中, 让由这个NodeManager启动的下游Task能够实时地获取到这些传过来的数据.

DragonClient

DragonClient 负责提交作业至ResourceManager并且监控提交是否成功. 它的提交过程为以下几步:

  • 检查作业DAG是否合法
  • 向`ResourceManager获取一个新的Application Id.

job.jar, job.xml以及

DragonAppMaster

DragonAppMaster是Dragon的核心. NodeManager启动将DragonAppMaster当作一个普通Container启动之后, DragonAppMaster会向ResourceManager请求作业所有Task所需资源, 并通过NodeManager在远程启动各个Task的Container, 运行DragonChild类.

DragonChild

DragonChild负责运行作业的某一个Task.

工作流程

一道运行在Yarn框架下的DragonJob, 它的生命周期如下:

  • 用户定义作业DAG拓扑结构, 使用DragonJob在客户端通过DragonJobRunner提交作业至yarn的ResourceManager(ApplicationsMaster).
  • ApplicationsMasterScheduler索要DragonAppMaster所需要的Container资源, 然后为这道作业启动一个DragonAppMaster
  • DragonAppMaster启动后向ApplicationsMaster注册
  • DragonJob客户端会轮询ApplicationsMaster, 获取DragonAppMaster的信息(其中包含DragonAppMaster运行所在的主机及它的RPC端口号), 之后直接和DragonAppMaster通信, 向DragonAppMaster获取各Task的启动进度等信息.
  • DragonAppMaster将作业DAG拓扑结构的描述文件job.desc反序列化, 形成Task的资源请求
  • DragonAppMaster将Task的资源请求发送给ResourceManager, 从后者获取Containers后通过NodeManager将各Container在相应的机器上启动.
  • DragonAppMaster监控该作业所有Task的运行情况, 更新作业的状态信息, 如果其中某个Task失败或没有响应, 则重新申请过Container
  • 由于流式计算作业会一直运行, 所以只有用户杀死作业时, 作业才算完成.

DragonJob API

Configuration conf = getConf();
conf.setInt(INT_PROPERTY, 1);
conf.set(STRING_PROPERTY, "VALUE");
conf.set(DragonJobConfig.PROPERTY, "GRAPHJOB_VALUE");
DragonJob job = DragonJob.getInstance(conf);
job.setJobName("First Graph Job");

DragonVertex source = new DragonVertex.Builder("source")
                                      .producer(EventProducer.class)
                                      .processor(EventProcessor.class)
                                      .tasks(10)
                                      .build();
DragonVertex m1 = new DragonVertex.Builder("intermediate1")
                                  .processor(EventProcessor.class)
                                  .addFile("file.txt")
                                  .addFile("dict.dat")
                                  .addArchive("archive.zip")
                                  .tasks(10)
                                  .build();
DragonVertex m2 = new DragonVertex.Builder("intermediate2")
                                  .processor(EventProcessor.class)
                                  .addFile("aux")
                                  .tasks(10)
                                  .build();
DragonVertex dest = new DragonVertex.Builder("dest")
                                    .processor(EventProcessor.class)
                                    .tasks(10)
                                    .build();
DragonJobGraph g = new DragonJobGraph();
// check if the graph is cyclic when adding edge
g.addEdge(source, m1).parition(HashPartitioner.class);
g.addEdge(source, m2).parition(HashPartitioner.class);
g.addEdge(m1, dest).parition(CustomPartitioner.class);
g.addEdge(m2, dest).parition(CustomPartitioner.class);
job.setJobGraph(g);
// check all source vertex hold event producers when submitting
job.submit();

容错

检查点

检查点由一个后台线程执行, 每隔指定的时间内作一次. 一个Task对应一个检查点文件. 这些文件存储在HDFS上的job staging/jobid/checkpoint/task_[0-9].cp. 例如task_0.cp代码Task Id为0的Task的检查点. 每次做检查点时记下当前已经处理完毕的最近的Event的偏移量.

Task(Container)活着, 但无响应

DragonAppMaster认为这个Task超时, 然后将它杀死.

Task(Container)死亡

  • 方案1: NodeManager发现Container已经退出, 将此通知给ResourceManager(Scheduler). DragonAppMaster会在下次联系Scheduler时获取到Container的状态, 发现它死亡后, 重新向Scheduler索取新的Container.
  • 方案2:

NodeManager故障(活着但无响应, 或死亡)

  • 方案1: NodeManager发生故障时, ResourceManager会因为一直接收不到NodeManager的心跳而确认后者已死. 待下次DragonAppMaster联系ResourceManager时, ResourceManager会将此事件告知给在发生故障节点起过ContainerDragonAppMaster. DragonAppMaster知道之后将那台出故障的NodeManager所有DragonChild 的JVM进程关闭, 再向ResourceManager重新申请其它NodeManager上的资源.
  • 方案2: DragonChild所在的Container向NodeManager(AuxService)发送Event进行Shuffle操作时失败, 则记下来. 待下次DragonChildDragonAppMaster更新状态时告诉DragonAppMaster有失败事件. DragonAppMaster根据此作业所有DragonChild的状态更新消息发现超过50%的Task都向此NodeManager发送消息失败, 认定是因为NodeManager发生故障才出现这种状况. 然后DragonAppMaster将那台出故障的NodeManager所有DragonChild 的JVM进程关闭, 再向ResourceManager重新申请其它NodeManager上的资源.

DragonAppMaster发生故障