YARN中事件驱动编程的设计与实现

YARN中事件驱动编程的实现


YARN的事件驱动编程模型是YARN的3大核心之一(其他两个分别是RPC和状态机),在YARN的架构和设计中有很重要的地位,本篇将浅析YARN中事件驱动的实现原理和相应的代码解读。


什么是事件驱动编程:

事件驱动编程是一种编程范式,程序的执行流程是由动作(actions,例如用户交互,其他线程发送的+消息等等)触发的事件(event)决定的。

事件驱动编程的优势

  • 事件驱动原理上相比于多线程编程还是简单很多,容易开发
  • 模块间的解耦,消息通过事件来传递,不会造成模块间耦合度过高
  • 易于开发,易于调试

事件驱动编程的劣势

  • 难以利用多核和多线程的优势(YARN的AysncDispather的代码注释中虽标明后期会用多线程和线程池来优化队列还一直没有实现)
  • 如果采用同步事件驱动的方式,可能会出现阻塞或部分任务处理事件超出期望

YARN中事件驱动编程的架构设计

YARN事件驱动原理图

YARN中事件驱动的工作流程(略,后期补)

YARN中事件驱动的类设计

YARN中事件驱动编程的具体实现

YARN中事件库核心包为hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event。
event下的中最重要的是AsyncDispatcher类。
AsyncDispatcher类是一个实现类,他继承自AbstractService并且实现了Dispatcher的接口。

其中和事件处理相关的属性有:

  • eventHandlingThread 事件处理线程,通过createThread实现的事件处理的方法,后续详解
  • eventDispatchers 事件处理的字典,通过字典的方式实现代码的解耦合
  • handlerInstance handler处理的接口,根据实际情况进行实现,实现了代码的解耦合
  • eventQueue 事件队列,里面放的是待处理的事件

和事件处理相关的方法有:

  • createThread() 事件处理的核心线程
  • dispatch() 事件处理和分发的方法
  • register() 用于注册事件和处理函数的关联类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//createThread的核心代码如下
Runnable createThread() {
return new Runnable() {
@Override
public void run() {

//while循环不停的从eventQueuez中获取事件
while (!stopped && !Thread.currentThread().isInterrupted()) {

Event event;
try {
//take在无数据的时候会阻塞直到有数据供使用
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
//获取到可用的事件后,调用dispath()方法进行任务分发
dispatch(event);
}
}
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//dispatch核心代码
protected void dispatch(Event event) {
//all events go thru this loop
// getDeclaringClass 和getClass的区别是什么?
Class<? extends Enum> type = event.getType().getDeclaringClass();

try{
//eventDispatchers的声明是Map<Class<? extends Enum>, EventHandler> eventDispatchers类型的字典,其key为枚举类型,值为handler处理类,这块应该是策略模式
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
//实际任务的处理交给EventHandler对应的实现类的handler方法
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//注册事件和事件处理器关联
//eventDispatchers是一个Map类型的变量,用于记录事件类型和事件处理的关系
//
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}

事件处理器服务的启动和停止

YARN中的事件处理器的启动相对简单,值得一说是YARN中关于事件处理器停止的处理,个人觉得是相当优雅并值得借鉴的

事件处理器服务的启动和停止相关的属性有

  • volatile boolean stopped 当服务停止的时候将stopped参数置为false
  • volatile boolean drainEventsOnStop 用于设置是否开启停止服务时将队列中的消息消费完
  • volatile boolean drained 当服务停止时通过该参数判断队列中消息是否已消费完
  • Object waitForDrained 用于停止任务时消费任务的等待服务所创建的对象
  • volatile boolean blockNewEvents 当服务在停止中时,用该变量阻塞新消息的接收

事件处理器服务的启动和停止相关的方法有

  • serviceStart 启动事件处理服务
  • serviceStop 停止事件处理服务
  • setDrainEventsOnStop 设定drainEventsOnStop参数用于
1
2
3
4
5
6
7
8
protected void serviceStart() throws Exception {
//start all the components
super.serviceStart();
//通过该处创建并启动线程
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName("AsyncDispatcher event handler");
eventHandlingThread.start();
}
1
2
3
public void setDrainEventsOnStop() {
drainEventsOnStop = true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
synchronized (waitForDrained) {
while (!drained && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain.");
}
}
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}

// stop all the components
super.serviceStop();
}

值得一提是的在方法createThread中关于消息的处理上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
//判断事件队列是否为空的FLAG
drained = eventQueue.isEmpty();

if (blockNewEvents) {
synchronized (waitForDrained) {
//在事件被处理完的时候唤醒serviceStop方法中处于wait状态的waitForDrained
if (drained) {
waitForDrained.notify();
}
}
}

//此处关于消息处理的代码省略

}
}
};
}


达成成就,此博客的第一篇文章,有很多写的不如意和匆忙的地方,我会继续改进,优化

按照个人计划,后续会继续补充如下章节:

  • YARN状态机的设计与实现
  • HADOOP与设计模式系列
  • HADOOP与数据结构系列
  • HADOOP Launch Container的设计与实现
  • HADOOP Container Monitor的设计与实现

坚持,共勉。