这篇文章,实际上并不是源码解析,而是翻译的一篇文章。
在我读过这部分的代码之后,看到了这么一篇文章。介绍的也不错,于是,就直接翻译过来了。
AuxiliaryService是NodeManager上的一个Service。它用yarn.nodemanager.aux-services来定义。默认值是mapreduce_shuffle,比如,在MR2中,是ShuffleHandler。这就是你为什么总是从NodeManager的log中看到如下输出:
2014-06-22 05:29:59,115 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Registering class org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType for class org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices
2014-06-22 05:29:59,409 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Adding auxiliary service httpshuffle, "mapreduce_shuffle"
2014-06-22 05:29:59,612 INFO org.apache.hadoop.mapred.ShuffleHandler: httpshuffle listening on port 13562
2014-06-22 05:31:24,846 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got event CONTAINER_INIT for appId application_1403414881997_0003
2014-06-22 05:31:24,848 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got event APPLICATION_INIT for appId application_1403414881997_0003
2014-06-22 05:31:24,848 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got APPLICATION_INIT for service
2014-06-22 05:33:04,413 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got event CONTAINER_STOP for appId application_1403414881997_0003
2014-06-22 05:37:27,017 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got event APPLICATION_STOP for appId application_1403414881997_0002
AuxliaryService会在收到Application/Container初始化以及停止事件时,会进行相应的处理。
public abstract class AuxiliaryService extends AbstractService {
/**
* A new application is started on this NodeManager. This is a signal to
* this {@link AuxiliaryService} about the application initialization.
*
* @param initAppContext context for the application's initialization
*/
public abstract void initializeApplication(
ApplicationInitializationContext initAppContext);
/**
* An application is finishing on this NodeManager. This is a signal to this
* {@link AuxiliaryService} about the same.
*
* @param stopAppContext context for the application termination
*/
public abstract void stopApplication(
ApplicationTerminationContext stopAppContext);
/**
* Retrieve meta-data for this {@link AuxiliaryService}. Applications using
* this {@link AuxiliaryService} SHOULD know the format of the meta-data -
* ideally each service should provide a method to parse out the information
* to the applications. One example of meta-data is contact information so
* that applications can access the service remotely. This will only be called
* after the service's {@link #start()} method has finished. the result may be
* cached.
*
* * The information is passed along to applications via
* {@link StartContainersResponse#getAllServicesMetaData()} that is returned by
* {@link ContainerManagementProtocol#startContainers(StartContainersRequest)}
*
*
* @return meta-data for this service that should be made available to
* applications.
*/
public abstract ByteBuffer getMetaData();
/**
* A new container is started on this NodeManager. This is a signal to
* this {@link AuxiliaryService} about the container initialization.
* This method is called when the NodeManager receives the container launch
* command from the ApplicationMaster and before the container process is
* launched.
*
* @param initContainerContext context for the container's initialization
*/
public void initializeContainer(ContainerInitializationContext
initContainerContext) {
}
/**
* A container is finishing on this NodeManager. This is a signal to this
* {@link AuxiliaryService} about the same.
*
* @param stopContainerContext context for the container termination
*/
public void stopContainer(ContainerTerminationContext stopContainerContext) {
}
}
Hadoop2中提供了一个内置的AuxiliaryService(实际上,这也是我在源码中看到的唯一一个),叫做ShuffleHandler,用于将Mapper的输出传输给Reducer.
NodeManager中,可以有多个AuxiliaryServices。有一个叫做AuxServices,专门用于处理这些AuxiliaryServices。
public class AuxServices extends AbstractService
implements ServiceStateChangeListener, EventHandler {
protected final Map serviceMap;
protected final Map serviceMetaData;
protected final synchronized void addService(String name,
AuxiliaryService service) {
LOG.info("Adding auxiliary service " +
service.getName() + ", \"" + name + "\"");
serviceMap.put(name, service);
}
}
当AuxServices启动时,它会从YarnConfiguration.NM_AUX_SERVICES中加载AuxiliaryService的信息。比如,yarn.nodemanager.aux-services的对应的service名称是aux-services.%s.class。
public class AuxServices extends AbstractService
public void serviceInit(Configuration conf) throws Exception {
Collection auxNames = conf.getStringCollection(
YarnConfiguration.NM_AUX_SERVICES);
for (final String sName : auxNames) {
try {
Preconditions
.checkArgument(
validateAuxServiceName(sName),
"The ServiceName: " + sName + " set in " +
YarnConfiguration.NM_AUX_SERVICES +" is invalid." +
"The valid service name should only contain a-zA-Z0-9_ " +
"and can not start with numbers");
Class sClass = conf.getClass(
String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, sName), null,
AuxiliaryService.class);
if (null == sClass) {
throw new RuntimeException("No class defined for " + sName);
}
AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf);
if(!sName.equals(s.getName())) {
LOG.warn("The Auxilurary Service named '"+sName+"' in the "
+"configuration is for class "+sClass+" which has "
+"a name of '"+s.getName()+"'. Because these are "
+"not the same tools trying to send ServiceData and read "
+"Service Meta Data may have issues unless the refer to "
+"the name in the config.");
}
addService(sName, s);
s.init(conf);
} catch (RuntimeException e) {
LOG.fatal("Failed to initialize " + sName, e);
throw e;
}
}
super.serviceInit(conf);
}
}
AuxServices是实现了ServiceStateChangeListener一个EventHandler接口,能够处理AuxServicesEventType事件。
public enum AuxServicesEventType {
APPLICATION_INIT,
APPLICATION_STOP,
CONTAINER_INIT,
CONTAINER_STOP
}
AuxServicesEvent中有下面的字段:
public class AuxServicesEvent extends AbstractEvent {
private final String user;
private final String serviceId;
private final ByteBuffer serviceData;
private final ApplicationId appId;
private final Container container;
}
public abstract class AbstractEvent>
implements Event {
private final TYPE type;
private final long timestamp;
}
AuxiliaryService中的handle()方法处理事件:
public class AuxServices extends AbstractService
implements ServiceStateChangeListener, EventHandler {
public void handle(AuxServicesEvent event) {
LOG.info("Got event " + event.getType() + " for appId "
+ event.getApplicationID());
switch (event.getType()) {
case APPLICATION_INIT:
LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
AuxiliaryService service = null;
try {
service = serviceMap.get(event.getServiceID());
service
.initializeApplication(new ApplicationInitializationContext(event
.getUser(), event.getApplicationID(), event.getServiceData()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(service,
AuxServicesEventType.APPLICATION_INIT, th);
}
break;
case APPLICATION_STOP:
for (AuxiliaryService serv : serviceMap.values()) {
try {
serv.stopApplication(new ApplicationTerminationContext(event
.getApplicationID()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.APPLICATION_STOP, th);
}
}
break;
case CONTAINER_INIT:
for (AuxiliaryService serv : serviceMap.values()) {
try {
serv.initializeContainer(new ContainerInitializationContext(
event.getUser(), event.getContainer().getContainerId(),
event.getContainer().getResource()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.CONTAINER_INIT, th);
}
}
break;
case CONTAINER_STOP:
for (AuxiliaryService serv : serviceMap.values()) {
try {
serv.stopContainer(new ContainerTerminationContext(
event.getUser(), event.getContainer().getContainerId(),
event.getContainer().getResource()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.CONTAINER_STOP, th);
}
}
break;
default:
throw new RuntimeException("Unknown type: " + event.getType());
}
}
}
那么,事件是如何被发送到AuxServices的呢?
这就要说到ContainerManagerImpl了:
public class NodeManager extends CompositeService
implements EventHandler {
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
protected void serviceInit(Configuration conf) throws Exception {
// NodeManager level dispatcher
this.dispatcher = new AsyncDispatcher();
containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
((NMContext) context).setContainerManager(containerManager);
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
}
}
ContainerManagerImpl有它自己的AsyncDispatcher,它会把全部的AuxServicesEventType事件转发到AuxServices:
public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol,
EventHandler {
private final AuxServices auxiliaryServices;
protected final AsyncDispatcher dispatcher;
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
super(ContainerManagerImpl.class.getName());
...
// ContainerManager level dispatcher.
dispatcher = new AsyncDispatcher();
// Start configurable services
auxiliaryServices = new AuxServices();
auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices);
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
addService(dispatcher);
}
ApplicationImpl会创建AuxServicesEventType.APPLICATION_STOP事件:
public class ApplicationImpl implements Application {
static class AppFinishTransition implements
MultipleArcTransition {
@Override
public ApplicationState transition(ApplicationImpl app,
ApplicationEvent event) {
ApplicationContainerFinishedEvent containerFinishEvent =
(ApplicationContainerFinishedEvent) event;
LOG.info("Removing " + containerFinishEvent.getContainerID()
+ " from application " + app.toString());
app.containers.remove(containerFinishEvent.getContainerID());
if (app.containers.isEmpty()) {
// All containers are cleanedup.
app.handleAppFinishWithContainersCleanedup();
return ApplicationState.APPLICATION_RESOURCES_CLEANINGUP;
}
return ApplicationState.FINISHING_CONTAINERS_WAIT;
}
}
void handleAppFinishWithContainersCleanedup() {
// Delete Application level resources
this.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
// tell any auxiliary services that the app is done
this.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
}
}
其他的三个AuxServicesEventType,APPLICATION_INIT,CONTAINER_INIT以及CONTAINER_STOP,在ContainerImpl的不同阶段会抛出:
public class ContainerImpl implements Container {
/**
* State transition when a NEW container receives the INIT_CONTAINER
* message.
*/
static class RequestResourcesTransition implements
MultipleArcTransition {
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_INIT, container));
// Inform the AuxServices about the opaque serviceData
Map csd = ctxt.getServiceData();
if (csd != null) {
// This can happen more than once per Application as each container may
// have distinct service data
for (Map.Entry service : csd.entrySet()) {
container.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
container.user, container.containerId
.getApplicationAttemptId().getApplicationId(),
service.getKey().toString(), service.getValue()));
}
}
...
}
}
/**
* Handle the following transitions:
* - NEW -> DONE upon KILL_CONTAINER
* - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
* KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
* -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ContainerDoneTransition implements
SingleArcTransition {
@Override
@SuppressWarnings("unchecked")
public void transition(ContainerImpl container, ContainerEvent event) {
container.finished();
//if the current state is NEW it means the CONTAINER_INIT was never
// sent for the event, thus no need to send the CONTAINER_STOP
if (container.getCurrentState()
!= org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_STOP, container));
}
}
}
}