|
3.4 服务库与事件库7 O% X8 G# y5 k3 p
本节介绍服务库和事件库。
4 L: @6 u) h& H9 A" L4 ]. U3.4.1 服务库- I6 s2 E: R. z' D
对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
& N+ s/ Y7 y" R m: K: y❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、
6 X9 p- |0 X& K6 ?% {STOPPED( 已停止) 。; k0 Z5 j/ t1 q9 ~
❑任何服务状态变化都可以触发另外一些动作。
; K* P* D8 S5 W1 p8 f, L❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。8 z' Z( W g, ~( r, Q- \7 K6 p! H
YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
) F+ W( a: p6 [$ n+ @8 Z9 C对象最终均实现了接口Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
; |3 S5 p+ \9 P. a; kService实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于6 [" k4 M) ?, x
ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMService、 ApplicationMasterLauncher、1 d2 I- X7 C. p! f- q! h
ApplicationMasterService等。
0 `2 ]- G& z" u% [" `# x图3-13 YARN中服务模型的类图
?; R+ n% s9 z8 f: k9 W' r在YARN中, ResourceManager和NodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服2 {0 Z9 N4 H+ q- D; D
务的统一管理。
2 \9 S+ F3 h9 P3.4.2 事件库
% r8 z# o6 M; n1 \, kYARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN将: t' K! a, u1 Q0 v( a
各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。 YARN中的事件处& D7 t/ E& o7 k5 E; S* f3 Q& `
理模型可概括为图3-14所示。
' X$ u d; }( [% L7 u6 h图3-14 YARN的事件处理模型% F+ Z* v$ @- a. @7 _; S
整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器( Async-Dispatcher) 负责传递给相应事件调度器
6 P' ]) a( J8 A, r% b4 D( Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其3 C9 b T+ Q N: N6 C6 x
处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成% j& n; j0 `% J, L/ Y
( 达到终止条件) 。; ^! m e4 X0 ?
在YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManager、 NodeManager、6 H" y, H2 t1 D) P4 m
MRAppMaster( MapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型4 e% `4 y( M# O
驱动服务的运行。4 k3 O$ x9 P+ x5 g8 u4 p5 w) |
YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先
6 B: m' \: [2 B0 v! Y4 i要定义一个中央异步调度器AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器# A% e: x6 B5 I& ~' u- z q+ u
EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器: u% K& w7 ^! Y/ A5 ^" j2 E
AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImpl、 TaskEvent/TaskImpl、 JobEvent/JobImpl等一系列事件/事件处理器, 由
, @$ R# T8 Z/ b; Y1 y$ j中央异步调度器统一管理和调度。
' d7 P. z; B0 s8 M4 P* ~9 J8 F5 {- b服务化和事件驱动软件设计思想的引入, 使得YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间& E3 @! F4 ]" C# h, L% Y
则采用事件联系起来, 系统设计简单且维护方便。& r3 {5 Q1 \1 J5 \% L# M+ C
图3-15 事件与事件处理器/ c4 A- i# p$ C+ A( O
3.4.3 YARN服务库和事件库的使用方法
3 g$ E2 Z( R& U! m- B为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce7 n; V+ S: j$ N s- B+ C6 \
ApplicationMaster( MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
) a, X0 v- B. n9 I) q: Q* Q; ~( @1) 定义Task事件。
' T* B6 i) _; npublic class TaskEvent extends AbstractEvent<TaskEventType> {, Y- C9 c- V: N4 s* W% H, n {" ?0 ]
private String taskID; //Task ID! a A+ ^" G) @8 G5 C' D2 q# R
public TaskEvent(String taskID, TaskEventType type) {
0 Z0 Y w! l, t6 `super(type);8 F0 m( w% E. H" [
this.taskID = taskID;
' [4 \' w: a* }( h) o! l1 P} p# R: f2 _5 {! d( c5 @
ublic String getTaskID() {
; R0 H/ g* {9 o) o, kreturn taskID;% \( b& O2 N; N4 A
} 其
1 Z- x4 V3 i" g2 Q中, Task事件类型定义如下:
. H$ c1 U1 U. p& apublic enum TaskEventType {6 f6 _9 v0 p; T0 q$ u/ i% A
T_KILL,
; C- K; {0 X1 } G5 z# J6 X+ f9 X% n, cT_SCHEDULE6 ]4 t! s" Q: a# r* @1 M
}
( [, q( G+ J+ } _9 m' l k2) 定义Job事件。
! ]5 s( p" h' ^, B9 s- Zpublic class JobEvent extends AbstractEvent<JobEventType> {3 e [# X8 B: X; n
private String jobID;, V6 K8 `! H2 C: V1 A; N8 i: R
public JobEvent(String jobID, JobEventType type) {
6 h/ q. Y9 V. {7 t: ^5 ]* Wsuper(type);
3 q/ y% H; Q9 e& _4 H! Gthis.jobID = jobID;
# N0 B; t/ n# _. N. a' m8 E} p
- h# @* v$ ?& Z& p3 a% E, f2 Aublic String getJobId() {
4 D: @' x8 U/ o {( ]2 D% ^5 f* ^return jobID;
4 J5 w( G/ F2 K+ j) z}& J% K+ t" P# D( U
}
: m/ o0 k H$ y- ?8 `其中, Job事件类型定义如下:: d7 F) A5 {. O" N( {
public enum JobEventType {
+ S* W9 z2 N6 gJOB_KILL,
" d& B% P3 l7 L7 a3 e, b6 G' ~5 ]" iJOB_INIT,4 n/ b" q z! y5 c
JOB_START! U1 ~& D: N4 C" d# U; W
}* q0 S p4 [; o$ H5 L2 B) o8 G
3) 事件调度器。
w( {3 F2 ~' k) n* g接下来定义一个中央异步调度器, 它接收Job和Task两种类型事件, 并交给对应的事件处理器处理, 代码如下:
2 k- J& ?+ ~/ o@SuppressWarnings("unchecked")3 ^; ~, V! w7 |- s' O
public class SimpleMRAppMaster extends CompositeService {2 S; p; p: U' |1 l* B1 u" {/ w+ H
private Dispatcher dispatcher; //中央异步调度器
# O) ^, H" R, y- {2 x/ Iprivate String jobID;: \! p v3 x ^8 k6 l' S8 _
private int taskNumber; //该作业包含的任务数目
" V# T, g1 }* V% Gprivate String[] taskIDs; //该作业内部包含的所有任务
& T) G1 J/ ]8 S2 W/ ipublic SimpleMRAppMaster(String name, String jobID, int taskNumber) {" A3 n' }# d8 R
super(name);
: g# d$ V! C' i3 tthis.jobID = jobID;* _4 }) k7 e( d: e
this.taskNumber = taskNumber;0 Q, y, M7 }+ e
taskIDs = new String[taskNumber];1 }; ~8 G% Y' z% k+ U/ u
for(int i = 0; i < taskNumber; i++) {* l3 D! Z/ z9 T2 n! h# D, n
taskIDs = new String(jobID + "_task_" + i);9 m5 O) d& e# G' ?6 J+ z d# D
}9 {) l) {, z5 i6 Z' Z( w
} p P! A- t9 G; Y& o, X: H+ e3 y3 z$ Q# u
ublic void serviceInit(final Configuration conf) throws Exception {
x' }& e" l7 q! @dispatcher = new AsyncDispatcher();//定义一个中央异步调度器0 Z1 [( e! I# s; ]7 g
//分别注册Job和Task事件调度器
1 `5 _* E7 F. F! P7 k$ ]dispatcher.register(JobEventType.class, new JobEventDispatcher());
P& U3 s* ]' u0 |. Ddispatcher.register(TaskEventType.class, new TaskEventDispatcher());1 X( `% x9 P1 J2 {, H7 Y" Z" P+ a. t
addService((Service) dispatcher); V4 Z" p$ h% S1 ^; d$ M
super.serviceInit(conf);
$ {" k+ N7 P/ ]( Y7 i8 g# M} p
0 V# x" M0 ?' t) B. }1 fublic Dispatcher getDispatcher() {
4 |, T8 j! T! t! D. freturn dispatcher;3 V1 z9 K8 c* W9 X# l
} p
( z1 x9 s7 T& Arivate class JobEventDispatcher implements EventHandler<JobEvent> {8 S0 w8 Y9 d$ L+ T" m& @
@Override
9 M6 d! ~" ~/ V+ zpublic void handle(JobEvent event) {
0 [( X6 x4 W6 k8 kif(event.getType() == JobEventType.JOB_KILL) {3 u: I( ~- y" \
System.out.println("Receive JOB_KILL event, killing all the tasks");
8 O$ _% H* s% P; c# D8 F: p$ hfor(int i = 0; i < taskNumber; i++) {
! }% E, i3 O1 K, Rdispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
# A0 o: T7 o& S: I ^TaskEventType.T_KILL));
4 o! g% ?( z/ [}
; Z% D4 w, A7 H6 `2 k" W3 c1 K} else if(event.getType() == JobEventType.JOB_INIT) {
2 e* Z8 |+ F8 O& Y/ nSystem.out.println("Receive JOB_INIT event, scheduling tasks");
! n* _( q! u( L. a: Q Jfor(int i = 0; i < taskNumber; i++) {
4 `' y$ ~, R* X8 Idispatcher.getEventHandler().handle(new TaskEvent(taskIDs,4 H8 ?6 T) C% i6 q _4 Q
TaskEventType.T_SCHEDULE));
. p$ M% c1 I1 O, _}
8 b. F& |. A; O5 M6 [1 S/ {}; s, M' T+ h ~, a
}
: e% @0 l6 e9 ^}p
, _' x/ I R# k& Q: D' grivate class TaskEventDispatcher implements EventHandler<TaskEvent> {
/ ?5 b1 R( Z% Z@Override( P6 v8 h$ z* u6 Y4 K( |
public void handle(TaskEvent event) {
5 }& H$ s5 A* D, D: T( X% Y+ nif(event.getType() == TaskEventType.T_KILL) {
! n P% @' A5 r# ^3 TSystem.out.println("Receive T_KILL event of task " + event.getTaskID());
0 c$ L( e8 Q3 r} else if(event.getType() == TaskEventType.T_SCHEDULE) {' f/ t* P2 X! M, G3 d4 J( N
System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());6 f: n! _; D2 ^1 S
}
2 Z, S; t3 w9 p}" m0 @$ k" @+ h- T( e: ?& e; j2 P
}
% i1 I7 `; p8 `+ ~! ~0 ^- }5 n0 k E}" h% F7 ?) Z n
4) 测试程序。( O+ g! F' Y- ?# q0 ?( h& j3 ~
@SuppressWarnings("unchecked")
2 i# }& ^) p9 `( g% N5 u' Jpublic class SimpleMRAppMasterTest {2 ]& }) G; [% F6 r" G3 t3 S
public static void main(String[] args) throws Exception {
0 A( m+ \' o0 Q. P1 a7 V) C) uString jobID = "job_20131215_12";
. j6 d! A3 I( g6 zSimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);/ L0 d3 d8 t5 S1 B% ? `
YarnConfiguration conf = new YarnConfiguration(new Configuration());
2 G* s* m4 i ?$ B. Y2 n: h8 CappMaster.serviceInit(conf);
* i5 |, d! A7 f; IappMaster.serviceStart();" o! v/ W: m5 k! m$ P
appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, g# F' w9 g. g4 p' J/ E
JobEventType.JOB_KILL));
& m9 v. [9 ~- \0 n) C3 u* b- AappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
% T6 N7 _( B: W) ^2 m# y& tJobEventType.JOB_INIT));
v# ?* {$ P: g5 ^/ U% P}
[0 ~9 z; W( I; i3.4.4 事件驱动带来的变化# E. C; {( @* j/ N( X" H0 @) @
在MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的
5 P+ h2 o( t$ o4 z2 r8 s方式, 且整个过程是串行的。 比如, 当TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
5 ^6 y; @0 y! t3 y字典文件等) 、 然后执行Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
7 b" p4 k7 O. s. a0 `2 @是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一$ V0 `. I1 Y/ {; ]- G) z
个独立进程运行该任务。 尽管后来MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决; |; V3 `/ ?6 u# c- Y9 X
问题之道, 必须引入新的编程模型。2 G; y$ u- e! j3 U& x! x5 m
图3-16 基于函数调用的工作流程" ~' ^# t: t9 V- I7 l d
基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下, MRv2引入的事件驱动编程模型则+ [3 y. J. i! C; b
是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关5 a. \1 q* Q6 i( k9 N! X
联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图3-17所示, 当A需要下载文件时, 只需0 Q8 R0 Q7 p' R! h" E% P
向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理9 ]% S$ q" D- G2 x
器B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A。& @' j9 s. u/ D0 [
图3-17 基于事件驱动的工作流程* _, v0 X$ Q& O: z o8 q2 W: Q+ i @
相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
2 C4 G, A, W; m4 u; l5 h* d5 }[15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705。 1 j- e1 |4 w* L# _; b
* e% l: ^4 s/ k0 [! K9 i" M
7 ^. x* T2 V2 h0 f% U
|
|