-
Notifications
You must be signed in to change notification settings - Fork 5
DragonDesignCN
#Dragon 设计
Dragon采用类似Facebook Puma的思想. 源端有一个类似Facebook Scribe的收集器, 将日志文件, MySQL的binlog等等收集至HDFS文件. 只要源端一有数据就sync到HDFS, 让Dragon的HDFS客户端能够实时地看见文件的最新数据. 与Puma不同的是, 这些数据会经过一组task分析后经过类似MapReduce的Shuffle交给下游task继续处理, 这样Dragon就有能力处理更为复杂的数据分析. 这些tasks之间是一个DAG的拓扑结构.
用户在提交Dragon
作业时, 需要给源端DragonVertex
定义输入路径, 然后再由DragonJobService
提交作业. 这个输入路径代表一个HDFS目录. 输入目录下有一组子目录, 每个子目录包含同一项数据(例如, 日志数据)的所有轮转. 作业分为很多Task, Task将在集群内不同的机器上运行. 每个Task将监视一个子目录的变化, 不停tail子目录文件的数据. 子目录里的每一个文件都以类似000000000000.drg为名, drg是文件的后缀, 前面那串数字代表此文件首个Event对于全局Event的偏移量.
Task读取子目录的数据时, 有一个WatchService
后台线程不断地监视子目录是否有新的文件产生. 一旦有, 便将此文件加入到队列之中, 以便实时地tail新文件的数据.
每道流入Dragon节点的消息都称为一个Event.
在Dragon系统中传递的每道消息称为一个Event. 以下是Event的接口定义:
/**
* Event is the basic processing unit of dragon.
*/
public interface Event<KEY, VALUE> extends Serializable {
/**
* Every {@link Event} should provide a <code>offset</code>
* for helping checkpointing.
* @return the offset of this {@link Event}
*/
public long offset();
/**
* The key of this {@link Event}
* @return the key of this {@link Event}
*/
public KEY key();
/**
* The value of this {@link Event}
* @return the value of this {@link Event}
*/
public VALUE value();
}
Dragon不可能像MapReduce那么随意地让用户定义消息数据类型.
Child进程向下游NodeManager
发送数据前,需要先注册. 注册命令的格式如下
- version, 1 byte, 代表协议的版本号
- command,1 byte, 代表命令类型, 这儿就是REGISTER
- serializer type, 1 byte, 代表序列化类型
- attributes, 1 byte, 属性字段, 保留
- upstream job id, 4 bytes, 代表此Child所运行任务的作业Id
- upstream task topology index, 2 bytes, 代表此Child所运行任务在作业DAG中的拓扑顺序
- upstream task id 2 bytes, 代表此Child所运行任务的Id
- downstream task topology index, 2 bytes, 代表下游任务在作业DAG中的拓扑顺序 以下是注册命令网络协议示意图:
+------------+------------+------------+------------+ 0 bytes
| version | register | serializer | attributes |
+------------+------------+------------+------------+ 4 bytes
| upstream job id |
+-------------------------+-------------------------+ 8 bytes
| upstream task index | upstream task id |
+-------------------------+-------------------------+ 12 bytes
| downstream task index |
+-------------------------+
注册完毕后, Child进程就可以向下游节点发送数据了, 发送命令的格式如下:
- version, 1 byte, 代表协议的版本号
- command, 1 byte, 代表命令类型, 这儿就是PUT
- attributes, 1 byte, 属性字段, 保留
- partition id, 2 bytes, 分区Id, 即下游任务Id
- checksum,4 bytes, 校验和, 防止I/O传输错误
- payload length, n bytes, 变长编码, 表示元数据(metadata)和数据内容的长度
- metadata, 1 bytes, 元数据
- payload, 数据内容
+------------+------------+------------+------------+ 0 bytes
| version | put | attributes | partition- |
+------------+------------+------------+------------+ 4 bytes
| -id | checksum |
+------------+--------------------------------------+ 8 bytes
| | payload length |
+------------+--------------------------------------+
| metadata | payload |
+------------+--------------------------------------+
| ... |
+---------------------------------------------------+ ...
数据内容可以由一条或多条Event
数据组成. Event
的网络传递的格式如下:
+---------------------------------------------------+ 0 bytes
| source event offset |
+---------------------------------------------------+ 4 byte
| |
+---------------------------------------------------+ 8 bytes
| event length |
+---------------------------------------------------+
| event data |
| ... |
+---------------------------------------------------+ ...
Event
被处理后, 根据其分区发送到对应的下游节点. 下游节点的NodeManager
会接受上游发送过来的结果数据, 然后将它们写入到HDFS文件供下游的Task使用. 发送数据时, 可以采用同步和异步两种策略.
Hadoop Yarn是由Hadoop社区开发的集群资源管理框架.
ResourceManager
负责分配资源单位给DragonJob
. 例如某一个DragonJob
需要10GB内存, 5颗CPU. 由DragonAppMaster
向ResourceManager申请. DragonAppMaster
读取客户端DragonClient
提交的作业描述文件得知运行作业所需要的资源情况.
ResourceManager
负责分配Container
, 每个Container
代表一组资源, 每个Container
代表运行作业的其中一个Task. 同时NodeManager需要配置一个AuxService
. DragonJob
上游Task发给下游节点的数据由这个service接收, 定期sync
到Hadoop HDFS中, 让由这个NodeManager启动的下游Task能够实时地获取到这些传过来的数据.
DragonClient
负责提交作业至ResourceManager
并且监控提交是否成功. 它的提交过程为以下几步:
- 检查作业DAG是否合法
- 向`ResourceManager获取一个新的Application Id.
job.jar, job.xml以及
DragonAppMaster
是Dragon的核心. NodeManager
启动将DragonAppMaster
当作一个普通Container
启动之后, DragonAppMaster
会向ResourceManager
请求作业所有Task所需资源, 并通过NodeManager
在远程启动各个Task的Container
, 运行DragonChild
类.
DragonChild
负责运行作业的某一个Task
.
一道运行在Yarn框架下的DragonJob
, 它的生命周期如下:
- 用户定义作业DAG拓扑结构, 使用
DragonJob
在客户端通过DragonJobRunner
提交作业至yarn的ResourceManager
(ApplicationsMaster
). -
ApplicationsMaster
向Scheduler
索要DragonAppMaster
所需要的Container
资源, 然后为这道作业启动一个DragonAppMaster
-
DragonAppMaster
启动后向ApplicationsMaster
注册 -
DragonJob
客户端会轮询ApplicationsMaster
, 获取DragonAppMaster
的信息(其中包含DragonAppMaster
运行所在的主机及它的RPC端口号), 之后直接和DragonAppMaster
通信, 向DragonAppMaster
获取各Task的启动进度等信息. -
DragonAppMaster
将作业DAG拓扑结构的描述文件job.desc反序列化, 形成Task的资源请求
-
DragonAppMaster
将Task的资源请求
发送给ResourceManager
, 从后者获取Container
s后通过NodeManager
将各Container
在相应的机器上启动. -
DragonAppMaster
监控该作业所有Task的运行情况, 更新作业的状态信息, 如果其中某个Task失败或没有响应, 则重新申请过Container
- 由于流式计算作业会一直运行, 所以只有用户杀死作业时, 作业才算完成.
Configuration conf = getConf();
conf.setInt(INT_PROPERTY, 1);
conf.set(STRING_PROPERTY, "VALUE");
conf.set(DragonJobConfig.PROPERTY, "GRAPHJOB_VALUE");
DragonJob job = DragonJob.getInstance(conf);
job.setJobName("First Graph Job");
DragonVertex source = new DragonVertex.Builder("source")
.producer(EventProducer.class)
.processor(EventProcessor.class)
.tasks(10)
.build();
DragonVertex m1 = new DragonVertex.Builder("intermediate1")
.processor(EventProcessor.class)
.addFile("file.txt")
.addFile("dict.dat")
.addArchive("archive.zip")
.tasks(10)
.build();
DragonVertex m2 = new DragonVertex.Builder("intermediate2")
.processor(EventProcessor.class)
.addFile("aux")
.tasks(10)
.build();
DragonVertex dest = new DragonVertex.Builder("dest")
.processor(EventProcessor.class)
.tasks(10)
.build();
DragonJobGraph g = new DragonJobGraph();
// check if the graph is cyclic when adding edge
g.addEdge(source, m1).parition(HashPartitioner.class);
g.addEdge(source, m2).parition(HashPartitioner.class);
g.addEdge(m1, dest).parition(CustomPartitioner.class);
g.addEdge(m2, dest).parition(CustomPartitioner.class);
job.setJobGraph(g);
// check all source vertex hold event producers when submitting
job.submit();
检查点由一个后台线程执行, 每隔指定的时间内作一次. 一个Task对应一个检查点文件. 这些文件存储在HDFS上的job staging/jobid/checkpoint/task_[0-9].cp. 例如task_0.cp代码Task Id为0的Task的检查点. 每次做检查点时记下当前已经处理完毕的最近的Event的偏移量.
DragonAppMaster
认为这个Task超时, 然后将它杀死.
- 方案1:
NodeManager
发现Container已经退出, 将此通知给ResourceManager
(Scheduler
).DragonAppMaster
会在下次联系Scheduler
时获取到Container
的状态, 发现它死亡后, 重新向Scheduler
索取新的Container
. - 方案2:
- 方案1:
NodeManager
发生故障时,ResourceManager
会因为一直接收不到NodeManager
的心跳而确认后者已死. 待下次DragonAppMaster
联系ResourceManager
时,ResourceManager
会将此事件告知给在发生故障节点起过Container
的DragonAppMaster
.DragonAppMaster
知道之后将那台出故障的NodeManager
所有DragonChild
的JVM进程关闭, 再向ResourceManager
重新申请其它NodeManager
上的资源. - 方案2:
DragonChild
所在的Container向NodeManager
(AuxService
)发送Event
进行Shuffle操作时失败, 则记下来. 待下次DragonChild
与DragonAppMaster
更新状态时告诉DragonAppMaster
有失败事件.DragonAppMaster
根据此作业所有DragonChild
的状态更新消息发现超过50%的Task都向此NodeManager
发送消息失败, 认定是因为NodeManager
发生故障才出现这种状况. 然后DragonAppMaster
将那台出故障的NodeManager
所有DragonChild
的JVM进程关闭, 再向ResourceManager
重新申请其它NodeManager
上的资源.