Skip to content
coderplay edited this page Jun 26, 2012 · 15 revisions

#Dragon 设计

背景

目标

事件处理

Dragon采用类似Facebook Puma的思想. 源端有一个类似Facebook Scribe的收集器, 将日志文件, MySQL的binlog等等收集至HDFS文件. 只要源端一有数据就sync到HDFS, 让Dragon的HDFS客户端能够实时地看见文件的最新数据. 与Puma不同的是, 这些数据会经过一组task分析后经过类似MapReduce的Shuffle交给下游task继续处理, 这样Dragon就有能力处理更为复杂的数据分析. 这些tasks之间是一个DAG的拓扑结构.

每道流入Dragon节点的消息都称为一个Event.

Event API

在Dragon系统中传递的每道消息称为一个Event. 以下是Event的接口定义:

/**
 * Event is the basic processing unit of dragon. 
 */
public interface Event<KEY, VALUE> extends Serializable {

  /**
   * Every {@link Event} should provide a <code>offset</code> 
   * for helping checkpointing.
   * @return the offset of this {@link Event}
   */
  public long offset();

  /**
   * The key of this {@link Event} 
   * @return the key of this {@link Event} 
   */
  public KEY key();
  
  /**
   * The value of this {@link Event} 
   * @return the value of this {@link Event} 
   */
  public VALUE value();

}

Dragon不可能像MapReduce那么随意地让用户定义消息数据类型.

作业管理子系统

Hadoop Yarn简介

Hadoop Yarn是由Hadoop社区开发的集群资源管理框架.

主要组件

ResourceManager

ResourceManager负责分配资源单位给DragonJob. 例如某一个DragonJob需要10GB内存, 5颗CPU. 由DragonAppMaster向ResourceManager申请. DragonAppMaster读取客户端DragonClient提交的作业描述文件得知运行作业所需要的资源情况.

NodeManager

ResourceManager负责分配Container, 每个Container代表一组资源, 每个Container代表运行作业的其中一个Task. 同时NodeManager需要配置一个AuxService. DragonJob上游Task发给下游节点的数据由这个service接收, 定期sync到Hadoop HDFS中, 让由这个NodeManager启动的下游Task能够实时地获取到这些传过来的数据.

DragonClient

DragonClient 负责提交作业至ResourceManager并且监控提交是否成功. 它的提交过程为以下几步:

  • 检查作业DAG是否合法
  • 向`ResourceManager获取一个新的Application Id.

job.jar, job.xml以及

DragonAppMaster

DragonAppMaster是Dragon的核心. NodeManager启动将DragonAppMaster当作一个普通Container启动之后, DragonAppMaster会向ResourceManager请求作业所有Task所需资源, 并通过NodeManager在远程启动各个Task的Container, 运行DragonChild类.

DragonChild

DragonChild负责运行作业的某一个Task.

工作流程

一道运行在Yarn框架下的DragonJob, 它的生命周期如下:

  • 用户定义作业DAG拓扑结构, 使用DragonJob在客户端通过DragonJobRunner提交作业至yarn的ResourceManager(ApplicationsMaster).
  • ApplicationsMasterScheduler索要DragonAppMaster所需要的Container资源, 然后为这道作业启动一个DragonAppMaster
  • DragonAppMaster启动后向ApplicationsMaster注册
  • DragonJob客户端会轮询ApplicationsMaster, 获取DragonAppMaster的信息(其中包含DragonAppMaster运行所在的主机及它的RPC端口号), 之后直接和DragonAppMaster通信, 向DragonAppMaster获取各Task的启动进度等信息.
  • DragonAppMaster将作业DAG拓扑结构的描述文件job.desc反序列化, 形成Task的资源请求
  • DragonAppMaster将Task的资源请求发送给ResourceManager, 从后者获取Containers后通过NodeManager将各Container在相应的机器上启动.
  • DragonAppMaster监控该作业所有Task的运行情况, 更新作业的状态信息, 如果其中某个Task失败或没有响应, 则重新申请过Container
  • 由于流式计算作业会一直运行, 所以只有用户杀死作业时, 作业才算完成.

DragonJob API

Configuration conf = getConf();
conf.setInt(INT_PROPERTY, 1);
conf.set(STRING_PROPERTY, "VALUE");
conf.set(DragonJobConfig.PROPERTY, "GRAPHJOB_VALUE");
DragonJob job = DragonJob.getInstance(conf);
job.setJobName("First Graph Job");

DragonVertex source = new DragonVertex.Builder("source")
                                      .producer(EventProducer.class)
                                      .processor(EventProcessor.class)
                                      .tasks(10)
                                      .build();
DragonVertex m1 = new DragonVertex.Builder("intermediate1")
                                  .processor(EventProcessor.class)
                                  .addFile("file.txt")
                                  .addFile("dict.dat")
                                  .addArchive("archive.zip")
                                  .tasks(10)
                                  .build();
DragonVertex m2 = new DragonVertex.Builder("intermediate2")
                                  .processor(EventProcessor.class)
                                  .addFile("aux")
                                  .tasks(10)
                                  .build();
DragonVertex dest = new DragonVertex.Builder("dest")
                                    .processor(EventProcessor.class)
                                    .tasks(10)
                                    .build();
DragonJobGraph g = new DragonJobGraph();
// check if the graph is cyclic when adding edge
g.addEdge(source, m1).parition(HashPartitioner.class);
g.addEdge(source, m2).parition(HashPartitioner.class);
g.addEdge(m1, dest).parition(CustomPartitioner.class);
g.addEdge(m2, dest).parition(CustomPartitioner.class);
job.setJobGraph(g);
// check all source vertex hold event producers when submitting
job.submit();

容错

checkpoint

Task(Container)活着, 但无响应

DragonAppMaster认为这个Task超时, 然后将它杀死.

Task(Container)死亡

  • 方案1: NodeManager发现Container已经退出, 将此通知给ResourceManager(Scheduler). DragonAppMaster会在下次联系Scheduler时获取到Container的状态, 发现它死亡后, 重新向Scheduler索取新的Container.
  • 方案2:

NodeManager故障(活着但无响应, 或死亡)

  • 方案1: NodeManager发生故障时, ResourceManager会因为一直接收不到NodeManager的心跳而确认后者已死. 待下次DragonAppMaster联系ResourceManager时, ResourceManager会将此事件告知给在发生故障节点起过ContainerDragonAppMaster. DragonAppMaster知道之后将那台出故障的NodeManager所有DragonChild 的JVM进程关闭, 再向ResourceManager重新申请其它NodeManager上的资源.
  • 方案2: DragonChild所在的Container向NodeManager(AuxService)发送Event进行Shuffle操作时失败, 则记下来. 待下次DragonChildDragonAppMaster更新状态时告诉DragonAppMaster有失败事件. DragonAppMaster根据此作业所有DragonChild的状态更新消息发现超过50%的Task都向此NodeManager发送消息失败, 认定是因为NodeManager发生故障才出现这种状况. 然后DragonAppMaster将那台出故障的NodeManager所有DragonChild 的JVM进程关闭, 再向ResourceManager重新申请其它NodeManager上的资源.

DragonAppMaster发生故障

Clone this wiki locally