YARN源码解析(2)-作业提交1

Posted by AlstonWilliams on February 17, 2019

最近在阅读Hadoop的源码,为了加深理解,就将其记录下来。

我们都知道,Hadoop主要由MapReduce实现,YARN,以及HDFS组成。所以,我会依次阅读MapReduce的实现,以及YARN,最后是HDFS的实现。

在这篇文章中,我们先介绍提交一个作业的时候发生了什么。

当我们运行一个作业的时候,我们都是通过:$HADOOP_HOME/bin/hadoop your.jar main_class input output这么一条命令来运行的。

通过查看$HADOOP_HOME/bin/hadoop这个文件的内容,我们能够发现,它是运行了一个org.apache.hadoop.util.RunJar这么一个类。

那么这个类都做了什么呢?

其实没什么,打开它的源码,我们能够看到,只不过就是解压我们指定的jar文件,然后运行主类。

这里有一点需要注意的是,有的jar文件是会包含META-INF/MANIFEST.MF这么一个文件的。而这个文件中可以指定Jar文件的主类。

org.apache.hadoop.util.RunJar这个类的内容中,我们可以看到,如果在META-INF/MANIFEST.MF中包含了一个跟我们通过命令行指定的主类不相同的主类,那么是会优先选择META-INF/MANIFEST.MF中指定的那个主类的。

而我们编写的MapReduce作业中,一般主类中,都是通过Job对象来进行一些配置,并通过job.waitForCompletion()方法来等待其完成,并输出中间的过程信息。

关于“作业提交”这个阶段的具体过程,在《Hadoop: The Definitive Guide》这本书中,有一副非常清晰的过程图,这里我们将它摘取过来:

就算不看源码,只看这幅图片,也能对这个过程有一个清晰的认识。

在这篇文章中,我们主要介绍上图中的第1, 2, 3, 4步。后面的步骤,我们会在后面的文章中介绍。

我们首先看一下JobwaitForCompletion(boolean verbose)方法的实现。

在这个方法中,最重要的调用就是submit()这一行。

我们看一下submit()方法的实现。

其中connect()方法,会根据你的配置文件,实例化一个Cluster类的对象,并将其赋给Job.cluster这个字段。我们可以通过Cluster来跟YARN这种集群资源管理系统交互。

这是因为Cluster中有一个ClientProtocol类型的字段,ClientProtocol就是用于Client和集群资源管理系统交互的工具,ClientProtocol有一个很重要的实现,就是YARNRunner

我们可以看到,在submit方法中,获取到了一个JobSubmitter。然后通过JobSubmittersubmitJobInternal()方法来正式提交任务。

关于submitJobInternal()这个方法,在其注释中,就写的非常清楚了。

我们可以看到,它做了这么几件事情:

  • 检查作业的输入和输出
  • 计算InputSplit
  • 验证
  • 将作业的Jar文件,配置文件,以及一些其他的文件保存到DistributedCache中
  • 将作业提交的JobTracker上,并监控它的状态

我们这里主要介绍这么三点:

  • 计算InputSplit
  • 将作业的Jar文件,配置文件,以及一些其他的文件保存到DistributedCache中
  • 将作业提交的JobTracker上,并监控它的状态

我们先来看第二点-将作业的Jar文件、配置文件、以及一些其他的文件保存到DistributedCache中。

在这一点上,最重要的是一个叫做copyAndConfigureFiles()的方法,这个方法会将我们的作业的Jar文件,以及在命令行中,通过-files,-libjars,-archives指定的文件,加入到DistributedCache中。而DistributedCache实际上,就是DFS中的一个特殊的文件夹。它的命名规则是,**/tmp/hadoop-yarn/staging//.staging//files**

在把它们加到DistributedCache中之后,还会为它们设置权限等。并将路径信息保存到Configuration对象中。

还有很多跟这个作业相关的内容,也会被添加到这个跟此次作业相关的文件夹当中,比如,后面我们将会看到的,关于InputSplit的一些信息。

在这个方法内部,通过调用writeSplits()方法,来进行分片。

我们可以看到,就是调用了InputFormatgetSplits()方法而已,最后再对各个InputSplit进行排序,将最大的排到前面。

在这个方法的最后,我们可以看到,调用了ClientProtocolsubmitJob()方法来提交作业。

这个方法中,最重要的就是调用createApplicationSubmissionContext()方法来构造ApplicationSubmissionContext以及ContainerLaunchContext这两个非常重要的对象了。

这两个对象为什么重要呢?

  • ApplicationSubmissionContext这个对象,包含了YARN ResourceManager启动ApplicationMaster的全部信息。ApplicationMaster也是一个非常重要的组件,它相当于Mapper和Reducer中间的桥梁。
  • ContainerLaunchContext这个对象,包含了YARN NodeManager启动一个Container所需要的全部信息,包括需要运行的命令,以及CLASSPATH等。

在构造好这两个对象之后,将ContainerLaunchContext包含在ApplicationSubmissionContext中,然后通过Protobuf这种RPC调用,将ApplicationSubmissionContext发送给YARN ResourceManager,来启动一个任务。

然后,再通过获取任务状态的RPC调用,来获取这个任务的状态。

一个任务的状态,有这么几种:

  • NEW:表示任务刚被创建
  • NEW_SAVING:表示任务已经被保存
  • SUBMITTED:表示任务已经被提交
  • ACCEPTED:表示任务已经被调度器接受
  • RUNNING:表示任务正在运行
  • FINISHED:表示任务已经正常完成
  • FAILED:表示任务失败
  • KILLED:表示任务已经被用户或者管理员取消

只要任务不是处于NEW或者NEW_SAVING状态,那么就给Client返回。也就是说,不管任务是成功还是失败,都给Client返回。

然后,客户端再通过不断向YARN ResourceManager轮循任务的状态,在控制台输出进度等。

思考

从Hadoop的实现中,可以看到,我们可以很轻松的添加其他的集群资源管理系统,可以替换掉YARN,而采用Mesos,或者Kubernetes等。

但是,目前Hadoop的实现中,只支持YARN。

参考资料

《Hadoop: The Definitive Guide》