我们前面已经介绍了,一个Application,一个Job正常执行的工作流程。
但是,我们都知道,在一个分布式系统中,容错是很重要的。容错性,也是在设计一个分布式系统的时候,必须要仔细考虑的问题。
了解MapReduce的朋友都知道,在MapReduce执行的时候,每个Mapper或者Reducer都是一个Task。而这些Task又有对应的TaskAttempt,来做到对每个Task的容错性。TaskAttempt指的是,每次Task尝试执行。每个TaskAttempt都有自己的ID。我们也可以指定一个Task最多attempt几次,才会放弃。
那么,在这篇文章中,我们会介绍,ApplicationMaster是如何检测以及处理TaskAttempt出现故障的。
TaskAttempt故障检测
ApplicationMaster判断一个TaskAttempt是否出现的标准很简单,有这么三点:
- TaskAttempt在正常的过期时间段内,并没有读数据
- TaskAttempt在正常的过期时间段内,并没有写数据
- TaskAttempt在正常的过期时间段内,并没有告诉ApplicationMaster它进入了其他的Phase。
这里的Phase指的是什么呢?我们都知道,在Mapper端,会有一个Sort的过程,在Reducer端,会有一个Shuffle的过程,以及对接收到的数据进行Sort的过程。这么,这些过程,不管是Sort也好,Shuffle也好,就是一个个的Phase。
那么,ApplicationMaster怎么知道有没有上面的三个条件发生呢?
很简单,在ApplicationMaster上,有一个专门的TaskHeartbeatHandler,专门用于检测TaskAttempt是否过期。
从上面的代码中,我们可以看到,就是循环检测超时时间段内,TaskAttemptId是否超时。
还有另一个类,TaskAttemptListenerImpl,用于接收NodeManager上的TaskAttempt发送来的状态信息。每当TaskAttemptListenerImpl收到这些信息,就会更新TaskHeartbeatHandler中对应的TaskAttempt的ReportTime。
TaskAttempt会在切换Phase的时候,直接告诉TaskAttemptListenerImpl,”“我的状态更新啦。”
其中statusUpdate(umbilical)方法内部,调用的就是上面我们看到的TaskAttemptListenerImpl的statusUpdate()方法。
那剩下的两个条件呢?即TaskAttempt会告诉ApplicationMaster它在过期时间内,读过或者写过数据呢?
为了实现这个功能,在MapTask和ReduceTask的父类Task中,有一个TaskReporter.这是一个线程,它的作用就是,当各种Counter更新时,就告诉TaskAttemptListenerImpl,”我的状态更新啦”。
这各种Counter,都是啥呢?都是MapReduce内部使用的Counter啦,就是那些报告给你,Mapper多了多少数据,Shuffle了多少数据等,那些Counter。只要你写过MapReduce程序,对它一定不陌生。
而这些Counter都是什么时候更新呢?
当然是在TaskAttempt读写数据时更新啦。
这里拿MapTask举例,从MapTask中内部的RecordReader的wrapper-NewTrackingRecordReader来看,当它调用nextKeyValue()方法的时候,就会更新这些Counter。
而这个方法,会在我们执行Mapper的时候被调用到。
至于输出的话,记得之前看过是在OutputCommitter中更新的。
ApplicationMaster处理TaskAttempt故障
顺着TaskHeartbeatHandler的处理逻辑往下走,最终,你会发现,ApplicationMaster会做这样的处理:
- 创建一个TaskAttempt
- 跟ResourceManager请求资源
- 重新在NodeManager上调度
这里我们只介绍第一步,第二三步跟正常调度一个Container相同。
我们可以看到,如果没有失败的TaskAttempt,那么就直接发出TaskAttemptEventType.TA_SCHEDULE事件进行调度,而如果有,那么就发出TaskAttemptEventType.TA_RESCHEDULED事件重新进行调度。