-
Notifications
You must be signed in to change notification settings - Fork 5
DragonDesignCN
#Dragon 设计
Dragon采用类似Facebook Puma的思想. 源端有一个类似Facebook Scribe的收集器, 将日志文件, MySQL的binlog等等收集至HDFS文件. 只要源端一有数据就sync到HDFS, 让Dragon的HDFS客户端能够实时地看见文件的最新数据. 与Puma不同的是, 这些数据会经过一组task分析后经过类似MapReduce的Shuffle交给下游task继续处理, 这样Dragon就有能力处理更为复杂的数据分析. 这些tasks之间是一个DAG的拓扑结构.
每道流入Dragon节点的消息都称为一个Event.
在Dragon系统中传递的每道消息称为一个Event. 以下是Event的接口定义:
/**
* Event is the basic processing unit of dragon.
*/
public interface Event<KEY, VALUE> extends Serializable {
/**
* Every {@link Event} should provide a <code>offset</code>
* for helping checkpointing.
* @return the offset of this {@link Event}
*/
public long offset();
/**
* The key of this {@link Event}
* @return the key of this {@link Event}
*/
public KEY key();
/**
* The value of this {@link Event}
* @return the value of this {@link Event}
*/
public VALUE value();
}
Dragon不可能像MapReduce那么随意地让用户定义消息数据类型.
Hadoop Yarn是由Hadoop社区开发的集群资源管理框架.
ResourceManager
负责分配资源单位给DragonJob
. 例如某一个DragonJob
需要10GB内存, 5颗CPU. 由DragonAppMaster
向ResourceManager申请. DragonAppMaster
读取客户端DragonClient
提交的作业描述文件得知运行作业所需要的资源情况.
ResourceManager
负责分配Container
, 每个Container
代表一组资源, 每个Container
代表运行作业的其中一个Task. 同时NodeManager需要配置一个AuxService
. DragonJob
上游Task发给下游节点的数据由这个service接收, 定期sync
到Hadoop HDFS中, 让由这个NodeManager启动的下游Task能够实时地获取到这些传过来的数据.
DragonClient
负责提交作业至ResourceManager
并且监控提交是否成功. 它的提交过程为以下几步:
- 检查作业DAG是否合法
- 向`ResourceManager获取一个新的Application Id.
job.jar, job.xml以及
DragonAppMaster
是Dragon的核心. NodeManager
启动将DragonAppMaster
当作一个普通Container
启动之后, DragonAppMaster
会向ResourceManager
请求作业所有Task所需资源, 并通过NodeManager
在远程启动各个Task的Container
, 运行DragonChild
类.
DragonChild
负责运行作业的某一个Task
.
一道运行在Yarn框架下的DragonJob
, 它的生命周期如下:
- 用户定义作业DAG拓扑结构, 使用
DragonJob
在客户端通过DragonJobRunner
提交作业至yarn的ResourceManager
(ApplicationsMaster
). -
ApplicationsMaster
向Scheduler
索要DragonAppMaster
所需要的Container
资源, 然后为这道作业启动一个DragonAppMaster
-
DragonAppMaster
启动后向ApplicationsMaster
注册 -
DragonJob
客户端会轮询ApplicationsMaster
, 获取DragonAppMaster
的信息(其中包含DragonAppMaster
运行所在的主机及它的RPC端口号), 之后直接和DragonAppMaster
通信, 向DragonAppMaster
获取各Task的启动进度等信息. -
DragonAppMaster
将作业DAG拓扑结构的描述文件job.desc反序列化, 形成Task的资源请求
-
DragonAppMaster
将Task的资源请求
发送给ResourceManager
, 从后者获取Container
s后通过NodeManager
将各Container
在相应的机器上启动. -
DragonAppMaster
监控该作业所有Task的运行情况, 更新作业的状态信息, 如果其中某个Task失败或没有响应, 则重新申请过Container
- 由于流式计算作业会一直运行, 所以只有用户杀死作业时, 作业才算完成.
Configuration conf = getConf();
conf.setInt(INT_PROPERTY, 1);
conf.set(STRING_PROPERTY, "VALUE");
conf.set(DragonJobConfig.PROPERTY, "GRAPHJOB_VALUE");
DragonJob job = DragonJob.getInstance(conf);
job.setJobName("First Graph Job");
DragonVertex source = new DragonVertex.Builder("source")
.producer(EventProducer.class)
.processor(EventProcessor.class)
.tasks(10)
.build();
DragonVertex m1 = new DragonVertex.Builder("intermediate1")
.processor(EventProcessor.class)
.addFile("file.txt")
.addFile("dict.dat")
.addArchive("archive.zip")
.tasks(10)
.build();
DragonVertex m2 = new DragonVertex.Builder("intermediate2")
.processor(EventProcessor.class)
.addFile("aux")
.tasks(10)
.build();
DragonVertex dest = new DragonVertex.Builder("dest")
.processor(EventProcessor.class)
.tasks(10)
.build();
DragonJobGraph g = new DragonJobGraph();
// check if the graph is cyclic when adding edge
g.addEdge(source, m1).parition(HashPartitioner.class);
g.addEdge(source, m2).parition(HashPartitioner.class);
g.addEdge(m1, dest).parition(CustomPartitioner.class);
g.addEdge(m2, dest).parition(CustomPartitioner.class);
job.setJobGraph(g);
// check all source vertex hold event producers when submitting
job.submit();
DragonAppMaster
认为这个Task超时, 然后将它杀死.
- 方案1:
NodeManager
发现Container已经退出, 将此通知给ResourceManager
(Scheduler
).DragonAppMaster
会在下次联系Scheduler
时获取到Container
的状态, 发现它死亡后, 重新向Scheduler
索取新的Container
. - 方案2:
- 方案1:
NodeManager
发生故障时,ResourceManager
会因为一直接收不到NodeManager
的心跳而确认后者已死. 待下次DragonAppMaster
联系ResourceManager
时,ResourceManager
会将此事件告知给在发生故障节点起过Container
的DragonAppMaster
.DragonAppMaster
知道之后将那台出故障的NodeManager
所有DragonChild
的JVM进程关闭, 再向ResourceManager
重新申请其它NodeManager
上的资源. - 方案2:
DragonChild
所在的Container向NodeManager
(AuxService
)发送Event
进行Shuffle操作时失败, 则记下来. 待下次DragonChild
与DragonAppMaster
更新状态时告诉DragonAppMaster
有失败事件.DragonAppMaster
根据此作业所有DragonChild
的状态更新消息发现超过50%的Task都向此NodeManager
发送消息失败, 认定是因为NodeManager
发生故障才出现这种状况. 然后DragonAppMaster
将那台出故障的NodeManager
所有DragonChild
的JVM进程关闭, 再向ResourceManager
重新申请其它NodeManager
上的资源.