YARN源码解析(8)-TaskAttempt容错

Posted by AlstonWilliams on February 17, 2019

我们前面已经介绍了,一个Application,一个Job正常执行的工作流程。

但是,我们都知道,在一个分布式系统中,容错是很重要的。容错性,也是在设计一个分布式系统的时候,必须要仔细考虑的问题。

了解MapReduce的朋友都知道,在MapReduce执行的时候,每个Mapper或者Reducer都是一个Task。而这些Task又有对应的TaskAttempt,来做到对每个Task的容错性。TaskAttempt指的是,每次Task尝试执行。每个TaskAttempt都有自己的ID。我们也可以指定一个Task最多attempt几次,才会放弃。

那么,在这篇文章中,我们会介绍,ApplicationMaster是如何检测以及处理TaskAttempt出现故障的。

TaskAttempt故障检测

ApplicationMaster判断一个TaskAttempt是否出现的标准很简单,有这么三点:

  • TaskAttempt在正常的过期时间段内,并没有读数据
  • TaskAttempt在正常的过期时间段内,并没有写数据
  • TaskAttempt在正常的过期时间段内,并没有告诉ApplicationMaster它进入了其他的Phase。

这里的Phase指的是什么呢?我们都知道,在Mapper端,会有一个Sort的过程,在Reducer端,会有一个Shuffle的过程,以及对接收到的数据进行Sort的过程。这么,这些过程,不管是Sort也好,Shuffle也好,就是一个个的Phase。

那么,ApplicationMaster怎么知道有没有上面的三个条件发生呢?

很简单,在ApplicationMaster上,有一个专门的TaskHeartbeatHandler,专门用于检测TaskAttempt是否过期。

从上面的代码中,我们可以看到,就是循环检测超时时间段内,TaskAttemptId是否超时。

还有另一个类,TaskAttemptListenerImpl,用于接收NodeManager上的TaskAttempt发送来的状态信息。每当TaskAttemptListenerImpl收到这些信息,就会更新TaskHeartbeatHandler中对应的TaskAttemptReportTime

TaskAttempt会在切换Phase的时候,直接告诉TaskAttemptListenerImpl,”“我的状态更新啦。”

其中statusUpdate(umbilical)方法内部,调用的就是上面我们看到的TaskAttemptListenerImplstatusUpdate()方法。

那剩下的两个条件呢?即TaskAttempt会告诉ApplicationMaster它在过期时间内,读过或者写过数据呢?

为了实现这个功能,在MapTaskReduceTask的父类Task中,有一个TaskReporter.这是一个线程,它的作用就是,当各种Counter更新时,就告诉TaskAttemptListenerImpl,”我的状态更新啦”。

各种Counter,都是啥呢?都是MapReduce内部使用的Counter啦,就是那些报告给你,Mapper多了多少数据,Shuffle了多少数据等,那些Counter。只要你写过MapReduce程序,对它一定不陌生。

而这些Counter都是什么时候更新呢?

当然是在TaskAttempt读写数据时更新啦。

这里拿MapTask举例,从MapTask中内部的RecordReader的wrapper-NewTrackingRecordReader来看,当它调用nextKeyValue()方法的时候,就会更新这些Counter。

而这个方法,会在我们执行Mapper的时候被调用到。

至于输出的话,记得之前看过是在OutputCommitter中更新的。

ApplicationMaster处理TaskAttempt故障

顺着TaskHeartbeatHandler的处理逻辑往下走,最终,你会发现,ApplicationMaster会做这样的处理:

  • 创建一个TaskAttempt
  • 跟ResourceManager请求资源
  • 重新在NodeManager上调度

这里我们只介绍第一步,第二三步跟正常调度一个Container相同。

我们可以看到,如果没有失败的TaskAttempt,那么就直接发出TaskAttemptEventType.TA_SCHEDULE事件进行调度,而如果有,那么就发出TaskAttemptEventType.TA_RESCHEDULED事件重新进行调度。